forked from lix-project/lix
a1355917ec
Recently aws-sdk-cpp quietly switched to using S3 virtual host URIs (https://github.com/aws/aws-sdk-cpp/commit/69d9c53882), i.e. it sends requests to http://<bucket>.<region>.s3.amazonaws.com rather than http://<region>.s3.amazonaws.com/<bucket>. However this interacts badly with curl connection reuse. For example, if we do the following: 1) Check whether a bucket exists using GetBucketLocation. 2) If it doesn't, create it using CreateBucket. 3) Do operations on the bucket. then 3) will fail for a minute or so with a NoSuchBucket exception, presumably because the server being hit is a fallback for cases when buckets don't exist. Disabling the use of virtual hosts ensures that 3) succeeds immediately. (I don't know what S3's consistency guarantees are for bucket creation, but in practice buckets appear to be available immediately.)
369 lines
12 KiB
C++
369 lines
12 KiB
C++
#if ENABLE_S3
|
||
|
||
#include "s3.hh"
|
||
#include "s3-binary-cache-store.hh"
|
||
#include "nar-info.hh"
|
||
#include "nar-info-disk-cache.hh"
|
||
#include "globals.hh"
|
||
#include "compression.hh"
|
||
#include "download.hh"
|
||
#include "istringstream_nocopy.hh"
|
||
|
||
#include <aws/core/Aws.h>
|
||
#include <aws/core/client/ClientConfiguration.h>
|
||
#include <aws/core/client/DefaultRetryStrategy.h>
|
||
#include <aws/core/utils/logging/FormattedLogSystem.h>
|
||
#include <aws/core/utils/logging/LogMacros.h>
|
||
#include <aws/s3/S3Client.h>
|
||
#include <aws/s3/model/CreateBucketRequest.h>
|
||
#include <aws/s3/model/GetBucketLocationRequest.h>
|
||
#include <aws/s3/model/GetObjectRequest.h>
|
||
#include <aws/s3/model/HeadObjectRequest.h>
|
||
#include <aws/s3/model/ListObjectsRequest.h>
|
||
#include <aws/s3/model/PutObjectRequest.h>
|
||
|
||
namespace nix {
|
||
|
||
struct S3Error : public Error
|
||
{
|
||
Aws::S3::S3Errors err;
|
||
S3Error(Aws::S3::S3Errors err, const FormatOrString & fs)
|
||
: Error(fs), err(err) { };
|
||
};
|
||
|
||
/* Helper: given an Outcome<R, E>, return R in case of success, or
|
||
throw an exception in case of an error. */
|
||
template<typename R, typename E>
|
||
R && checkAws(const FormatOrString & fs, Aws::Utils::Outcome<R, E> && outcome)
|
||
{
|
||
if (!outcome.IsSuccess())
|
||
throw S3Error(
|
||
outcome.GetError().GetErrorType(),
|
||
fs.s + ": " + outcome.GetError().GetMessage());
|
||
return outcome.GetResultWithOwnership();
|
||
}
|
||
|
||
class AwsLogger : public Aws::Utils::Logging::FormattedLogSystem
|
||
{
|
||
using Aws::Utils::Logging::FormattedLogSystem::FormattedLogSystem;
|
||
|
||
void ProcessFormattedStatement(Aws::String && statement) override
|
||
{
|
||
debug("AWS: %s", chomp(statement));
|
||
}
|
||
};
|
||
|
||
static void initAWS()
|
||
{
|
||
static std::once_flag flag;
|
||
std::call_once(flag, []() {
|
||
Aws::SDKOptions options;
|
||
|
||
/* We install our own OpenSSL locking function (see
|
||
shared.cc), so don't let aws-sdk-cpp override it. */
|
||
options.cryptoOptions.initAndCleanupOpenSSL = false;
|
||
|
||
if (verbosity >= lvlDebug) {
|
||
options.loggingOptions.logLevel =
|
||
verbosity == lvlDebug
|
||
? Aws::Utils::Logging::LogLevel::Debug
|
||
: Aws::Utils::Logging::LogLevel::Trace;
|
||
options.loggingOptions.logger_create_fn = [options]() {
|
||
return std::make_shared<AwsLogger>(options.loggingOptions.logLevel);
|
||
};
|
||
}
|
||
|
||
Aws::InitAPI(options);
|
||
});
|
||
}
|
||
|
||
S3Helper::S3Helper(const string & region)
|
||
: config(makeConfig(region))
|
||
, client(make_ref<Aws::S3::S3Client>(*config, true, false))
|
||
{
|
||
}
|
||
|
||
/* Log AWS retries. */
|
||
class RetryStrategy : public Aws::Client::DefaultRetryStrategy
|
||
{
|
||
bool ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error, long attemptedRetries) const override
|
||
{
|
||
auto retry = Aws::Client::DefaultRetryStrategy::ShouldRetry(error, attemptedRetries);
|
||
if (retry)
|
||
printError("AWS error '%s' (%s), will retry in %d ms",
|
||
error.GetExceptionName(), error.GetMessage(), CalculateDelayBeforeNextRetry(error, attemptedRetries));
|
||
return retry;
|
||
}
|
||
};
|
||
|
||
ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(const string & region)
|
||
{
|
||
initAWS();
|
||
auto res = make_ref<Aws::Client::ClientConfiguration>();
|
||
res->region = region;
|
||
res->requestTimeoutMs = 600 * 1000;
|
||
res->retryStrategy = std::make_shared<RetryStrategy>();
|
||
res->caFile = settings.caFile;
|
||
return res;
|
||
}
|
||
|
||
S3Helper::DownloadResult S3Helper::getObject(
|
||
const std::string & bucketName, const std::string & key)
|
||
{
|
||
debug("fetching ‘s3://%s/%s’...", bucketName, key);
|
||
|
||
auto request =
|
||
Aws::S3::Model::GetObjectRequest()
|
||
.WithBucket(bucketName)
|
||
.WithKey(key);
|
||
|
||
request.SetResponseStreamFactory([&]() {
|
||
return Aws::New<std::stringstream>("STRINGSTREAM");
|
||
});
|
||
|
||
DownloadResult res;
|
||
|
||
auto now1 = std::chrono::steady_clock::now();
|
||
|
||
try {
|
||
|
||
auto result = checkAws(fmt("AWS error fetching ‘%s’", key),
|
||
client->GetObject(request));
|
||
|
||
res.data = decodeContent(
|
||
result.GetContentEncoding(),
|
||
make_ref<std::string>(
|
||
dynamic_cast<std::stringstream &>(result.GetBody()).str()));
|
||
|
||
} catch (S3Error & e) {
|
||
if (e.err != Aws::S3::S3Errors::NO_SUCH_KEY) throw;
|
||
}
|
||
|
||
auto now2 = std::chrono::steady_clock::now();
|
||
|
||
res.durationMs = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
|
||
|
||
return res;
|
||
}
|
||
|
||
struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
|
||
{
|
||
const Setting<std::string> region{this, Aws::Region::US_EAST_1, "region", {"aws-region"}};
|
||
const Setting<std::string> narinfoCompression{this, "", "narinfo-compression", "compression method for .narinfo files"};
|
||
const Setting<std::string> lsCompression{this, "", "ls-compression", "compression method for .ls files"};
|
||
const Setting<std::string> logCompression{this, "", "log-compression", "compression method for log/* files"};
|
||
|
||
std::string bucketName;
|
||
|
||
Stats stats;
|
||
|
||
S3Helper s3Helper;
|
||
|
||
S3BinaryCacheStoreImpl(
|
||
const Params & params, const std::string & bucketName)
|
||
: S3BinaryCacheStore(params)
|
||
, bucketName(bucketName)
|
||
, s3Helper(region)
|
||
{
|
||
diskCache = getNarInfoDiskCache();
|
||
}
|
||
|
||
std::string getUri() override
|
||
{
|
||
return "s3://" + bucketName;
|
||
}
|
||
|
||
void init() override
|
||
{
|
||
if (!diskCache->cacheExists(getUri(), wantMassQuery_, priority)) {
|
||
|
||
/* Create the bucket if it doesn't already exists. */
|
||
// FIXME: HeadBucket would be more appropriate, but doesn't return
|
||
// an easily parsed 404 message.
|
||
auto res = s3Helper.client->GetBucketLocation(
|
||
Aws::S3::Model::GetBucketLocationRequest().WithBucket(bucketName));
|
||
|
||
if (!res.IsSuccess()) {
|
||
if (res.GetError().GetErrorType() != Aws::S3::S3Errors::NO_SUCH_BUCKET)
|
||
throw Error(format("AWS error checking bucket ‘%s’: %s") % bucketName % res.GetError().GetMessage());
|
||
|
||
printInfo("creating S3 bucket ‘%s’...", bucketName);
|
||
|
||
// Stupid S3 bucket locations.
|
||
auto bucketConfig = Aws::S3::Model::CreateBucketConfiguration();
|
||
if (s3Helper.config->region != "us-east-1")
|
||
bucketConfig.SetLocationConstraint(
|
||
Aws::S3::Model::BucketLocationConstraintMapper::GetBucketLocationConstraintForName(
|
||
s3Helper.config->region));
|
||
|
||
checkAws(format("AWS error creating bucket ‘%s’") % bucketName,
|
||
s3Helper.client->CreateBucket(
|
||
Aws::S3::Model::CreateBucketRequest()
|
||
.WithBucket(bucketName)
|
||
.WithCreateBucketConfiguration(bucketConfig)));
|
||
}
|
||
|
||
BinaryCacheStore::init();
|
||
|
||
diskCache->createCache(getUri(), storeDir, wantMassQuery_, priority);
|
||
}
|
||
}
|
||
|
||
const Stats & getS3Stats() override
|
||
{
|
||
return stats;
|
||
}
|
||
|
||
/* This is a specialisation of isValidPath() that optimistically
|
||
fetches the .narinfo file, rather than first checking for its
|
||
existence via a HEAD request. Since .narinfos are small, doing
|
||
a GET is unlikely to be slower than HEAD. */
|
||
bool isValidPathUncached(const Path & storePath) override
|
||
{
|
||
try {
|
||
queryPathInfo(storePath);
|
||
return true;
|
||
} catch (InvalidPath & e) {
|
||
return false;
|
||
}
|
||
}
|
||
|
||
bool fileExists(const std::string & path) override
|
||
{
|
||
stats.head++;
|
||
|
||
auto res = s3Helper.client->HeadObject(
|
||
Aws::S3::Model::HeadObjectRequest()
|
||
.WithBucket(bucketName)
|
||
.WithKey(path));
|
||
|
||
if (!res.IsSuccess()) {
|
||
auto & error = res.GetError();
|
||
if (error.GetErrorType() == Aws::S3::S3Errors::RESOURCE_NOT_FOUND
|
||
|| error.GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY
|
||
|| (error.GetErrorType() == Aws::S3::S3Errors::UNKNOWN // FIXME
|
||
&& error.GetMessage().find("404") != std::string::npos))
|
||
return false;
|
||
throw Error(format("AWS error fetching ‘%s’: %s") % path % error.GetMessage());
|
||
}
|
||
|
||
return true;
|
||
}
|
||
|
||
void uploadFile(const std::string & path, const std::string & data,
|
||
const std::string & mimeType,
|
||
const std::string & contentEncoding)
|
||
{
|
||
auto request =
|
||
Aws::S3::Model::PutObjectRequest()
|
||
.WithBucket(bucketName)
|
||
.WithKey(path);
|
||
|
||
request.SetContentType(mimeType);
|
||
|
||
if (contentEncoding != "")
|
||
request.SetContentEncoding(contentEncoding);
|
||
|
||
auto stream = std::make_shared<istringstream_nocopy>(data);
|
||
|
||
request.SetBody(stream);
|
||
|
||
stats.put++;
|
||
stats.putBytes += data.size();
|
||
|
||
auto now1 = std::chrono::steady_clock::now();
|
||
|
||
auto result = checkAws(format("AWS error uploading ‘%s’") % path,
|
||
s3Helper.client->PutObject(request));
|
||
|
||
auto now2 = std::chrono::steady_clock::now();
|
||
|
||
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
|
||
|
||
printInfo(format("uploaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms")
|
||
% bucketName % path % data.size() % duration);
|
||
|
||
stats.putTimeMs += duration;
|
||
}
|
||
|
||
void upsertFile(const std::string & path, const std::string & data,
|
||
const std::string & mimeType) override
|
||
{
|
||
if (narinfoCompression != "" && hasSuffix(path, ".narinfo"))
|
||
uploadFile(path, *compress(narinfoCompression, data), mimeType, narinfoCompression);
|
||
else if (lsCompression != "" && hasSuffix(path, ".ls"))
|
||
uploadFile(path, *compress(lsCompression, data), mimeType, lsCompression);
|
||
else if (logCompression != "" && hasPrefix(path, "log/"))
|
||
uploadFile(path, *compress(logCompression, data), mimeType, logCompression);
|
||
else
|
||
uploadFile(path, data, mimeType, "");
|
||
}
|
||
|
||
void getFile(const std::string & path,
|
||
std::function<void(std::shared_ptr<std::string>)> success,
|
||
std::function<void(std::exception_ptr exc)> failure) override
|
||
{
|
||
sync2async<std::shared_ptr<std::string>>(success, failure, [&]() {
|
||
stats.get++;
|
||
|
||
auto res = s3Helper.getObject(bucketName, path);
|
||
|
||
stats.getBytes += res.data ? res.data->size() : 0;
|
||
stats.getTimeMs += res.durationMs;
|
||
|
||
if (res.data)
|
||
printTalkative("downloaded ‘s3://%s/%s’ (%d bytes) in %d ms",
|
||
bucketName, path, res.data->size(), res.durationMs);
|
||
|
||
return res.data;
|
||
});
|
||
}
|
||
|
||
PathSet queryAllValidPaths() override
|
||
{
|
||
PathSet paths;
|
||
std::string marker;
|
||
|
||
do {
|
||
debug(format("listing bucket ‘s3://%s’ from key ‘%s’...") % bucketName % marker);
|
||
|
||
auto res = checkAws(format("AWS error listing bucket ‘%s’") % bucketName,
|
||
s3Helper.client->ListObjects(
|
||
Aws::S3::Model::ListObjectsRequest()
|
||
.WithBucket(bucketName)
|
||
.WithDelimiter("/")
|
||
.WithMarker(marker)));
|
||
|
||
auto & contents = res.GetContents();
|
||
|
||
debug(format("got %d keys, next marker ‘%s’")
|
||
% contents.size() % res.GetNextMarker());
|
||
|
||
for (auto object : contents) {
|
||
auto & key = object.GetKey();
|
||
if (key.size() != 40 || !hasSuffix(key, ".narinfo")) continue;
|
||
paths.insert(storeDir + "/" + key.substr(0, key.size() - 8));
|
||
}
|
||
|
||
marker = res.GetNextMarker();
|
||
} while (!marker.empty());
|
||
|
||
return paths;
|
||
}
|
||
|
||
};
|
||
|
||
static RegisterStoreImplementation regStore([](
|
||
const std::string & uri, const Store::Params & params)
|
||
-> std::shared_ptr<Store>
|
||
{
|
||
if (std::string(uri, 0, 5) != "s3://") return 0;
|
||
auto store = std::make_shared<S3BinaryCacheStoreImpl>(params, std::string(uri, 5));
|
||
store->init();
|
||
return store;
|
||
});
|
||
|
||
}
|
||
|
||
#endif
|