diff --git a/src/hydra-queue-runner/Makefile.am b/src/hydra-queue-runner/Makefile.am index 83242759..2cf4fcdb 100644 --- a/src/hydra-queue-runner/Makefile.am +++ b/src/hydra-queue-runner/Makefile.am @@ -2,11 +2,8 @@ bin_PROGRAMS = hydra-queue-runner hydra_queue_runner_SOURCES = hydra-queue-runner.cc queue-monitor.cc dispatcher.cc \ builder.cc build-result.cc build-remote.cc \ - build-result.hh counter.hh pool.hh sync.hh token-server.hh state.hh db.hh \ - binary-cache-store.hh binary-cache-store.cc \ - local-binary-cache-store.hh local-binary-cache-store.cc \ - s3-binary-cache-store.hh s3-binary-cache-store.cc \ - lru-cache.hh + build-result.hh counter.hh token-server.hh state.hh db.hh \ + s3-binary-cache-store.hh s3-binary-cache-store.cc hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx AM_CXXFLAGS = $(NIX_CFLAGS) -Wall -laws-cpp-sdk-s3 diff --git a/src/hydra-queue-runner/binary-cache-store.cc b/src/hydra-queue-runner/binary-cache-store.cc deleted file mode 100644 index 6bea0e6c..00000000 --- a/src/hydra-queue-runner/binary-cache-store.cc +++ /dev/null @@ -1,292 +0,0 @@ -#include "binary-cache-store.hh" -#include "sync.hh" - -#include "archive.hh" -#include "compression.hh" -#include "derivations.hh" -#include "globals.hh" -#include "nar-info.hh" -#include "worker-protocol.hh" - -#include - -namespace nix { - -BinaryCacheStore::BinaryCacheStore(std::shared_ptr localStore, - const Path & secretKeyFile, const Path & publicKeyFile) - : localStore(localStore) -{ - if (secretKeyFile != "") - secretKey = std::unique_ptr(new SecretKey(readFile(secretKeyFile))); - - if (publicKeyFile != "") { - publicKeys = std::unique_ptr(new PublicKeys); - auto key = PublicKey(readFile(publicKeyFile)); - publicKeys->emplace(key.name, key); - } -} - -void BinaryCacheStore::init() -{ - std::string cacheInfoFile = "nix-cache-info"; - if (!fileExists(cacheInfoFile)) - upsertFile(cacheInfoFile, "StoreDir: " + settings.nixStore + "\n"); -} - -const BinaryCacheStore::Stats & BinaryCacheStore::getStats() -{ - return stats; -} - -Path BinaryCacheStore::narInfoFileFor(const Path & storePath) -{ - assertStorePath(storePath); - return storePathToHash(storePath) + ".narinfo"; -} - -void BinaryCacheStore::addToCache(const ValidPathInfo & info, - const string & nar) -{ - auto narInfoFile = narInfoFileFor(info.path); - if (fileExists(narInfoFile)) return; - - auto narInfo = make_ref(info); - - narInfo->narSize = nar.size(); - narInfo->narHash = hashString(htSHA256, nar); - - if (info.narHash.type != htUnknown && info.narHash != narInfo->narHash) - throw Error(format("refusing to copy corrupted path ‘%1%’ to binary cache") % info.path); - - /* Compress the NAR. */ - narInfo->compression = "xz"; - auto now1 = std::chrono::steady_clock::now(); - string narXz = compressXZ(nar); - auto now2 = std::chrono::steady_clock::now(); - narInfo->fileHash = hashString(htSHA256, narXz); - narInfo->fileSize = narXz.size(); - - auto duration = std::chrono::duration_cast(now2 - now1).count(); - printMsg(lvlTalkative, format("copying path ‘%1%’ (%2% bytes, compressed %3$.1f%% in %4% ms) to binary cache") - % narInfo->path % narInfo->narSize - % ((1.0 - (double) narXz.size() / nar.size()) * 100.0) - % duration); - - /* Atomically write the NAR file. */ - narInfo->url = "nar/" + printHash32(narInfo->fileHash) + ".nar.xz"; - if (!fileExists(narInfo->url)) { - stats.narWrite++; - upsertFile(narInfo->url, narXz); - } else - stats.narWriteAverted++; - - stats.narWriteBytes += nar.size(); - stats.narWriteCompressedBytes += narXz.size(); - stats.narWriteCompressionTimeMs += duration; - - /* Atomically write the NAR info file.*/ - if (secretKey) narInfo->sign(*secretKey); - - upsertFile(narInfoFile, narInfo->to_string()); - - { - auto state_(state.lock()); - state_->narInfoCache.upsert(narInfo->path, narInfo); - stats.narInfoCacheSize = state_->narInfoCache.size(); - } - - stats.narInfoWrite++; -} - -NarInfo BinaryCacheStore::readNarInfo(const Path & storePath) -{ - { - auto state_(state.lock()); - auto res = state_->narInfoCache.get(storePath); - if (res) { - stats.narInfoReadAverted++; - return **res; - } - } - - auto narInfoFile = narInfoFileFor(storePath); - auto narInfo = make_ref(getFile(narInfoFile), narInfoFile); - assert(narInfo->path == storePath); - - stats.narInfoRead++; - - if (publicKeys) { - if (!narInfo->checkSignature(*publicKeys)) - throw Error(format("invalid signature on NAR info file ‘%1%’") % narInfoFile); - } - - { - auto state_(state.lock()); - state_->narInfoCache.upsert(storePath, narInfo); - stats.narInfoCacheSize = state_->narInfoCache.size(); - } - - return *narInfo; -} - -bool BinaryCacheStore::isValidPath(const Path & storePath) -{ - return fileExists(narInfoFileFor(storePath)); -} - -void BinaryCacheStore::exportPath(const Path & storePath, bool sign, Sink & sink) -{ - assert(!sign); - - auto res = readNarInfo(storePath); - - auto nar = getFile(res.url); - - stats.narRead++; - stats.narReadCompressedBytes += nar.size(); - - /* Decompress the NAR. FIXME: would be nice to have the remote - side do this. */ - if (res.compression == "none") - ; - else if (res.compression == "xz") - nar = decompressXZ(nar); - else - throw Error(format("unknown NAR compression type ‘%1%’") % nar); - - stats.narReadBytes += nar.size(); - - printMsg(lvlTalkative, format("exporting path ‘%1%’ (%2% bytes)") % storePath % nar.size()); - - assert(nar.size() % 8 == 0); - - sink((unsigned char *) nar.c_str(), nar.size()); - - // FIXME: check integrity of NAR. - - sink << exportMagic << storePath << res.references << res.deriver << 0; -} - -Paths BinaryCacheStore::importPaths(bool requireSignature, Source & source) -{ - assert(!requireSignature); - Paths res; - while (true) { - unsigned long long n = readLongLong(source); - if (n == 0) break; - if (n != 1) throw Error("input doesn't look like something created by ‘nix-store --export’"); - res.push_back(importPath(source)); - } - return res; -} - -struct TeeSource : Source -{ - Source & readSource; - std::string data; - TeeSource(Source & readSource) : readSource(readSource) - { - } - size_t read(unsigned char * data, size_t len) - { - size_t n = readSource.read(data, len); - this->data.append((char *) data, n); - return n; - } -}; - -struct NopSink : ParseSink -{ -}; - -Path BinaryCacheStore::importPath(Source & source) -{ - /* FIXME: some cut&paste of LocalStore::importPath(). */ - - /* Extract the NAR from the source. */ - TeeSource tee(source); - NopSink sink; - parseDump(sink, tee); - - uint32_t magic = readInt(source); - if (magic != exportMagic) - throw Error("Nix archive cannot be imported; wrong format"); - - ValidPathInfo info; - info.path = readStorePath(source); - - info.references = readStorePaths(source); - - readString(source); // deriver, don't care - - bool haveSignature = readInt(source) == 1; - assert(!haveSignature); - - addToCache(info, tee.data); - - return info.path; -} - -ValidPathInfo BinaryCacheStore::queryPathInfo(const Path & storePath) -{ - return ValidPathInfo(readNarInfo(storePath)); -} - -void BinaryCacheStore::querySubstitutablePathInfos(const PathSet & paths, - SubstitutablePathInfos & infos) -{ - PathSet left; - - if (!localStore) return; - - for (auto & storePath : paths) { - if (!localStore->isValidPath(storePath)) { - left.insert(storePath); - continue; - } - ValidPathInfo info = localStore->queryPathInfo(storePath); - SubstitutablePathInfo sub; - sub.references = info.references; - sub.downloadSize = 0; - sub.narSize = info.narSize; - infos.emplace(storePath, sub); - } - - if (settings.useSubstitutes) - localStore->querySubstitutablePathInfos(left, infos); -} - -void BinaryCacheStore::buildPaths(const PathSet & paths, BuildMode buildMode) -{ - for (auto & storePath : paths) { - assert(!isDerivation(storePath)); - - if (isValidPath(storePath)) continue; - - if (!localStore) - throw Error(format("don't know how to realise path ‘%1%’ in a binary cache") % storePath); - - localStore->addTempRoot(storePath); - - if (!localStore->isValidPath(storePath)) - localStore->ensurePath(storePath); - - ValidPathInfo info = localStore->queryPathInfo(storePath); - - for (auto & ref : info.references) - if (ref != storePath) - ensurePath(ref); - - StringSink sink; - dumpPath(storePath, sink); - - addToCache(info, sink.s); - } -} - -void BinaryCacheStore::ensurePath(const Path & path) -{ - buildPaths({path}); -} - -} diff --git a/src/hydra-queue-runner/binary-cache-store.hh b/src/hydra-queue-runner/binary-cache-store.hh deleted file mode 100644 index d02f46de..00000000 --- a/src/hydra-queue-runner/binary-cache-store.hh +++ /dev/null @@ -1,170 +0,0 @@ -#pragma once - -#include "crypto.hh" -#include "store-api.hh" - -#include "lru-cache.hh" -#include "sync.hh" -#include "pool.hh" - -#include - -namespace nix { - -struct NarInfo; - -class BinaryCacheStore : public Store -{ -private: - - std::unique_ptr secretKey; - std::unique_ptr publicKeys; - - std::shared_ptr localStore; - - struct State - { - LRUCache> narInfoCache{32 * 1024}; - }; - - Sync state; - -protected: - - BinaryCacheStore(std::shared_ptr localStore, - const Path & secretKeyFile, const Path & publicKeyFile); - - virtual bool fileExists(const std::string & path) = 0; - - virtual void upsertFile(const std::string & path, const std::string & data) = 0; - - virtual std::string getFile(const std::string & path) = 0; - -public: - - virtual void init(); - - struct Stats - { - std::atomic narInfoRead{0}; - std::atomic narInfoReadAverted{0}; - std::atomic narInfoWrite{0}; - std::atomic narInfoCacheSize{0}; - std::atomic narRead{0}; - std::atomic narReadBytes{0}; - std::atomic narReadCompressedBytes{0}; - std::atomic narWrite{0}; - std::atomic narWriteAverted{0}; - std::atomic narWriteBytes{0}; - std::atomic narWriteCompressedBytes{0}; - std::atomic narWriteCompressionTimeMs{0}; - }; - - const Stats & getStats(); - -private: - - Stats stats; - - std::string narInfoFileFor(const Path & storePath); - - void addToCache(const ValidPathInfo & info, const string & nar); - -protected: - - NarInfo readNarInfo(const Path & storePath); - -public: - - bool isValidPath(const Path & path) override; - - PathSet queryValidPaths(const PathSet & paths) override - { abort(); } - - PathSet queryAllValidPaths() override - { abort(); } - - ValidPathInfo queryPathInfo(const Path & path) override; - - Hash queryPathHash(const Path & path) override - { abort(); } - - void queryReferrers(const Path & path, - PathSet & referrers) override - { abort(); } - - Path queryDeriver(const Path & path) override - { abort(); } - - PathSet queryValidDerivers(const Path & path) override - { abort(); } - - PathSet queryDerivationOutputs(const Path & path) override - { abort(); } - - StringSet queryDerivationOutputNames(const Path & path) override - { abort(); } - - Path queryPathFromHashPart(const string & hashPart) override - { abort(); } - - PathSet querySubstitutablePaths(const PathSet & paths) override - { abort(); } - - void querySubstitutablePathInfos(const PathSet & paths, - SubstitutablePathInfos & infos) override; - - Path addToStore(const string & name, const Path & srcPath, - bool recursive = true, HashType hashAlgo = htSHA256, - PathFilter & filter = defaultPathFilter, bool repair = false) override - { abort(); } - - Path addTextToStore(const string & name, const string & s, - const PathSet & references, bool repair = false) override - { abort(); } - - void exportPath(const Path & path, bool sign, - Sink & sink) override; - - Paths importPaths(bool requireSignature, Source & source) override; - - Path importPath(Source & source); - - void buildPaths(const PathSet & paths, BuildMode buildMode = bmNormal) override; - - BuildResult buildDerivation(const Path & drvPath, const BasicDerivation & drv, - BuildMode buildMode = bmNormal) override - { abort(); } - - void ensurePath(const Path & path) override; - - void addTempRoot(const Path & path) override - { abort(); } - - void addIndirectRoot(const Path & path) override - { abort(); } - - void syncWithGC() override - { } - - Roots findRoots() override - { abort(); } - - void collectGarbage(const GCOptions & options, GCResults & results) override - { abort(); } - - PathSet queryFailedPaths() override - { return PathSet(); } - - void clearFailedPaths(const PathSet & paths) override - { } - - void optimiseStore() override - { } - - bool verifyStore(bool checkContents, bool repair) override - { return true; } - -}; - -} diff --git a/src/hydra-queue-runner/local-binary-cache-store.cc b/src/hydra-queue-runner/local-binary-cache-store.cc deleted file mode 100644 index 5714688e..00000000 --- a/src/hydra-queue-runner/local-binary-cache-store.cc +++ /dev/null @@ -1,44 +0,0 @@ -#include "local-binary-cache-store.hh" - -namespace nix { - -LocalBinaryCacheStore::LocalBinaryCacheStore(std::shared_ptr localStore, - const Path & secretKeyFile, const Path & publicKeyFile, - const Path & binaryCacheDir) - : BinaryCacheStore(localStore, secretKeyFile, publicKeyFile) - , binaryCacheDir(binaryCacheDir) -{ -} - -void LocalBinaryCacheStore::init() -{ - createDirs(binaryCacheDir + "/nar"); - BinaryCacheStore::init(); -} - -static void atomicWrite(const Path & path, const std::string & s) -{ - Path tmp = path + ".tmp." + std::to_string(getpid()); - AutoDelete del(tmp, false); - writeFile(tmp, s); - if (rename(tmp.c_str(), path.c_str())) - throw SysError(format("renaming ‘%1%’ to ‘%2%’") % tmp % path); - del.cancel(); -} - -bool LocalBinaryCacheStore::fileExists(const std::string & path) -{ - return pathExists(binaryCacheDir + "/" + path); -} - -void LocalBinaryCacheStore::upsertFile(const std::string & path, const std::string & data) -{ - atomicWrite(binaryCacheDir + "/" + path, data); -} - -std::string LocalBinaryCacheStore::getFile(const std::string & path) -{ - return readFile(binaryCacheDir + "/" + path); -} - -} diff --git a/src/hydra-queue-runner/local-binary-cache-store.hh b/src/hydra-queue-runner/local-binary-cache-store.hh deleted file mode 100644 index 0303ebe7..00000000 --- a/src/hydra-queue-runner/local-binary-cache-store.hh +++ /dev/null @@ -1,31 +0,0 @@ -#pragma once - -#include "binary-cache-store.hh" - -namespace nix { - -class LocalBinaryCacheStore : public BinaryCacheStore -{ -private: - - Path binaryCacheDir; - -public: - - LocalBinaryCacheStore(std::shared_ptr localStore, - const Path & secretKeyFile, const Path & publicKeyFile, - const Path & binaryCacheDir); - - void init() override; - -protected: - - bool fileExists(const std::string & path) override; - - void upsertFile(const std::string & path, const std::string & data) override; - - std::string getFile(const std::string & path) override; - -}; - -} diff --git a/src/hydra-queue-runner/lru-cache.hh b/src/hydra-queue-runner/lru-cache.hh deleted file mode 100644 index 113411bd..00000000 --- a/src/hydra-queue-runner/lru-cache.hh +++ /dev/null @@ -1,80 +0,0 @@ -#pragma once - -#include -#include - -/* A simple least-recently used cache. Not thread-safe. */ -template -class LRUCache -{ -private: - - size_t maxSize; - - // Stupid wrapper to get around circular dependency between Data - // and LRU. - struct LRUIterator; - - using Data = std::map>; - using LRU = std::list; - - struct LRUIterator { typename LRU::iterator it; }; - - Data data; - LRU lru; - -public: - - LRUCache(size_t maxSize) : maxSize(maxSize) { } - - /* Insert or upsert an item in the cache. */ - void upsert(const Key & key, const Value & value) - { - erase(key); - - if (data.size() >= maxSize) { - /* Retire the oldest item. */ - auto oldest = lru.begin(); - data.erase(*oldest); - lru.erase(oldest); - } - - auto res = data.emplace(key, std::make_pair(LRUIterator(), value)); - assert(res.second); - auto & i(res.first); - - auto j = lru.insert(lru.end(), i); - - i->second.first.it = j; - } - - bool erase(const Key & key) - { - auto i = data.find(key); - if (i == data.end()) return false; - lru.erase(i->second.first.it); - data.erase(i); - return true; - } - - /* Look up an item in the cache. If it's exists, it becomes the - most recently used item. */ - // FIXME: use boost::optional? - Value * get(const Key & key) - { - auto i = data.find(key); - if (i == data.end()) return 0; - - /* Move this item to the back of the LRU list. */ - lru.erase(i->second.first.it); - auto j = lru.insert(lru.end(), i); - i->second.first.it = j; - - return &i->second.second; - } - - size_t size() - { - return data.size(); - } -};