diff --git a/src/hydra-queue-runner/Makefile.am b/src/hydra-queue-runner/Makefile.am index 12b7e01e..38b7d342 100644 --- a/src/hydra-queue-runner/Makefile.am +++ b/src/hydra-queue-runner/Makefile.am @@ -3,7 +3,6 @@ bin_PROGRAMS = hydra-queue-runner hydra_queue_runner_SOURCES = hydra-queue-runner.cc queue-monitor.cc dispatcher.cc \ builder.cc build-result.cc build-remote.cc \ build-result.hh counter.hh token-server.hh state.hh db.hh \ - s3-binary-cache-store.hh s3-binary-cache-store.cc \ finally.hh hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx diff --git a/src/hydra-queue-runner/s3-binary-cache-store.cc b/src/hydra-queue-runner/s3-binary-cache-store.cc deleted file mode 100644 index f03a8c4c..00000000 --- a/src/hydra-queue-runner/s3-binary-cache-store.cc +++ /dev/null @@ -1,198 +0,0 @@ -#include "s3-binary-cache-store.hh" - -#include "nar-info.hh" -#include "nar-info-disk-cache.hh" - -#include -#include -#include -#include -#include -#include -#include - -namespace nix { - -struct S3Error : public Error -{ - Aws::S3::S3Errors err; - S3Error(Aws::S3::S3Errors err, const FormatOrString & fs) - : Error(fs), err(err) { }; -}; - -/* Helper: given an Outcome, return R in case of success, or - throw an exception in case of an error. */ -template -R && checkAws(const FormatOrString & fs, Aws::Utils::Outcome && outcome) -{ - if (!outcome.IsSuccess()) - throw S3Error( - outcome.GetError().GetErrorType(), - fs.s + ": " + outcome.GetError().GetMessage()); - return outcome.GetResultWithOwnership(); -} - -S3BinaryCacheStore::S3BinaryCacheStore(std::shared_ptr localStore, - const Path & secretKeyFile, const std::string & bucketName) - : BinaryCacheStore(localStore, secretKeyFile) - , bucketName(bucketName) - , config(makeConfig()) - , client(make_ref(*config)) -{ - diskCache = getNarInfoDiskCache(); -} - -std::string S3BinaryCacheStore::getUri() -{ - return "s3://" + bucketName; -} - -ref S3BinaryCacheStore::makeConfig() -{ - auto res = make_ref(); - res->region = Aws::Region::US_EAST_1; - res->requestTimeoutMs = 600 * 1000; - return res; -} - -void S3BinaryCacheStore::init() -{ - if (!diskCache->cacheExists(getUri())) { - - /* Create the bucket if it doesn't already exists. */ - // FIXME: HeadBucket would be more appropriate, but doesn't return - // an easily parsed 404 message. - auto res = client->GetBucketLocation( - Aws::S3::Model::GetBucketLocationRequest().WithBucket(bucketName)); - - if (!res.IsSuccess()) { - if (res.GetError().GetErrorType() != Aws::S3::S3Errors::NO_SUCH_BUCKET) - throw Error(format("AWS error checking bucket ‘%s’: %s") % bucketName % res.GetError().GetMessage()); - - checkAws(format("AWS error creating bucket ‘%s’") % bucketName, - client->CreateBucket( - Aws::S3::Model::CreateBucketRequest() - .WithBucket(bucketName) - .WithCreateBucketConfiguration( - Aws::S3::Model::CreateBucketConfiguration() - /* .WithLocationConstraint( - Aws::S3::Model::BucketLocationConstraint::US) */ ))); - } - - diskCache->createCache(getUri()); - } - - BinaryCacheStore::init(); -} - -const S3BinaryCacheStore::Stats & S3BinaryCacheStore::getS3Stats() -{ - return stats; -} - -/* This is a specialisation of isValidPath() that optimistically - fetches the .narinfo file, rather than first checking for its - existence via a HEAD request. Since .narinfos are small, doing a - GET is unlikely to be slower than HEAD. */ -bool S3BinaryCacheStore::isValidPathUncached(const Path & storePath) -{ - try { - queryPathInfo(storePath); - return true; - } catch (InvalidPath & e) { - return false; - } -} - -bool S3BinaryCacheStore::fileExists(const std::string & path) -{ - stats.head++; - - auto res = client->HeadObject( - Aws::S3::Model::HeadObjectRequest() - .WithBucket(bucketName) - .WithKey(path)); - - if (!res.IsSuccess()) { - auto & error = res.GetError(); - if (error.GetErrorType() == Aws::S3::S3Errors::UNKNOWN // FIXME - && error.GetMessage().find("404") != std::string::npos) - return false; - throw Error(format("AWS error fetching ‘%s’: %s") % path % error.GetMessage()); - } - - return true; -} - -void S3BinaryCacheStore::upsertFile(const std::string & path, const std::string & data) -{ - auto request = - Aws::S3::Model::PutObjectRequest() - .WithBucket(bucketName) - .WithKey(path); - - auto stream = std::make_shared(data); - - request.SetBody(stream); - - stats.put++; - stats.putBytes += data.size(); - - auto now1 = std::chrono::steady_clock::now(); - - auto result = checkAws(format("AWS error uploading ‘%s’") % path, - client->PutObject(request)); - - auto now2 = std::chrono::steady_clock::now(); - - auto duration = std::chrono::duration_cast(now2 - now1).count(); - - printMsg(lvlInfo, format("uploaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms") - % bucketName % path % data.size() % duration); - - stats.putTimeMs += duration; -} - -std::shared_ptr S3BinaryCacheStore::getFile(const std::string & path) -{ - printMsg(lvlDebug, format("fetching ‘s3://%1%/%2%’...") % bucketName % path); - - auto request = - Aws::S3::Model::GetObjectRequest() - .WithBucket(bucketName) - .WithKey(path); - - request.SetResponseStreamFactory([&]() { - return Aws::New("STRINGSTREAM"); - }); - - stats.get++; - - try { - - auto now1 = std::chrono::steady_clock::now(); - - auto result = checkAws(format("AWS error fetching ‘%s’") % path, - client->GetObject(request)); - - auto now2 = std::chrono::steady_clock::now(); - - auto res = dynamic_cast(result.GetBody()).str(); - - auto duration = std::chrono::duration_cast(now2 - now1).count(); - - printMsg(lvlTalkative, format("downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms") - % bucketName % path % res.size() % duration); - - stats.getBytes += res.size(); - stats.getTimeMs += duration; - - return std::make_shared(res); - - } catch (S3Error & e) { - if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return 0; - throw; - } -} - -} diff --git a/src/hydra-queue-runner/s3-binary-cache-store.hh b/src/hydra-queue-runner/s3-binary-cache-store.hh deleted file mode 100644 index ffcce1eb..00000000 --- a/src/hydra-queue-runner/s3-binary-cache-store.hh +++ /dev/null @@ -1,61 +0,0 @@ -#pragma once - -#include "binary-cache-store.hh" - -#include - -namespace Aws { namespace Client { class ClientConfiguration; } } -namespace Aws { namespace S3 { class S3Client; } } - -namespace nix { - -class S3BinaryCacheStore : public BinaryCacheStore -{ -private: - - std::string bucketName; - - ref config; - ref client; - -public: - - S3BinaryCacheStore(std::shared_ptr localStore, - const Path & secretKeyFile, const std::string & bucketName); - - void init() override; - - std::string getUri(); - - struct Stats - { - std::atomic put{0}; - std::atomic putBytes{0}; - std::atomic putTimeMs{0}; - std::atomic get{0}; - std::atomic getBytes{0}; - std::atomic getTimeMs{0}; - std::atomic head{0}; - }; - - const Stats & getS3Stats(); - - bool isValidPathUncached(const Path & storePath) override; - -private: - - Stats stats; - - ref makeConfig(); - -protected: - - bool fileExists(const std::string & path) override; - - void upsertFile(const std::string & path, const std::string & data) override; - - std::shared_ptr getFile(const std::string & path) override; - -}; - -}