forked from lix-project/lix
Merge pull request #6612 from NixOS/parallel-nix-copy
Make nix copy parallel again
This commit is contained in:
commit
04e74f7c8b
5 changed files with 156 additions and 93 deletions
|
@ -1,2 +1,5 @@
|
||||||
# Release X.Y (202?-??-??)
|
# Release X.Y (202?-??-??)
|
||||||
|
|
||||||
|
* `nix copy` now copies the store paths in parallel as much as possible (again).
|
||||||
|
This doesn't apply for the `daemon` and `ssh-ng` stores which copy everything
|
||||||
|
in one batch to avoid latencies issues.
|
|
@ -671,6 +671,23 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void RemoteStore::addMultipleToStore(
|
||||||
|
PathsSource & pathsToCopy,
|
||||||
|
Activity & act,
|
||||||
|
RepairFlag repair,
|
||||||
|
CheckSigsFlag checkSigs)
|
||||||
|
{
|
||||||
|
auto source = sinkToSource([&](Sink & sink) {
|
||||||
|
sink << pathsToCopy.size();
|
||||||
|
for (auto & [pathInfo, pathSource] : pathsToCopy) {
|
||||||
|
pathInfo.write(sink, *this, 16);
|
||||||
|
pathSource->drainInto(sink);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
addMultipleToStore(*source, repair, checkSigs);
|
||||||
|
}
|
||||||
|
|
||||||
void RemoteStore::addMultipleToStore(
|
void RemoteStore::addMultipleToStore(
|
||||||
Source & source,
|
Source & source,
|
||||||
RepairFlag repair,
|
RepairFlag repair,
|
||||||
|
|
|
@ -88,6 +88,12 @@ public:
|
||||||
RepairFlag repair,
|
RepairFlag repair,
|
||||||
CheckSigsFlag checkSigs) override;
|
CheckSigsFlag checkSigs) override;
|
||||||
|
|
||||||
|
void addMultipleToStore(
|
||||||
|
PathsSource & pathsToCopy,
|
||||||
|
Activity & act,
|
||||||
|
RepairFlag repair,
|
||||||
|
CheckSigsFlag checkSigs) override;
|
||||||
|
|
||||||
StorePath addTextToStore(
|
StorePath addTextToStore(
|
||||||
std::string_view name,
|
std::string_view name,
|
||||||
std::string_view s,
|
std::string_view s,
|
||||||
|
|
|
@ -258,6 +258,84 @@ StorePath Store::addToStore(
|
||||||
return addToStoreFromDump(*source, name, method, hashAlgo, repair, references);
|
return addToStoreFromDump(*source, name, method, hashAlgo, repair, references);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Store::addMultipleToStore(
|
||||||
|
PathsSource & pathsToCopy,
|
||||||
|
Activity & act,
|
||||||
|
RepairFlag repair,
|
||||||
|
CheckSigsFlag checkSigs)
|
||||||
|
{
|
||||||
|
std::atomic<size_t> nrDone{0};
|
||||||
|
std::atomic<size_t> nrFailed{0};
|
||||||
|
std::atomic<uint64_t> bytesExpected{0};
|
||||||
|
std::atomic<uint64_t> nrRunning{0};
|
||||||
|
|
||||||
|
using PathWithInfo = std::pair<ValidPathInfo, std::unique_ptr<Source>>;
|
||||||
|
|
||||||
|
std::map<StorePath, PathWithInfo *> infosMap;
|
||||||
|
StorePathSet storePathsToAdd;
|
||||||
|
for (auto & thingToAdd : pathsToCopy) {
|
||||||
|
infosMap.insert_or_assign(thingToAdd.first.path, &thingToAdd);
|
||||||
|
storePathsToAdd.insert(thingToAdd.first.path);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto showProgress = [&]() {
|
||||||
|
act.progress(nrDone, pathsToCopy.size(), nrRunning, nrFailed);
|
||||||
|
};
|
||||||
|
|
||||||
|
ThreadPool pool;
|
||||||
|
|
||||||
|
processGraph<StorePath>(pool,
|
||||||
|
storePathsToAdd,
|
||||||
|
|
||||||
|
[&](const StorePath & path) {
|
||||||
|
|
||||||
|
auto & [info, _] = *infosMap.at(path);
|
||||||
|
|
||||||
|
if (isValidPath(info.path)) {
|
||||||
|
nrDone++;
|
||||||
|
showProgress();
|
||||||
|
return StorePathSet();
|
||||||
|
}
|
||||||
|
|
||||||
|
bytesExpected += info.narSize;
|
||||||
|
act.setExpected(actCopyPath, bytesExpected);
|
||||||
|
|
||||||
|
return info.references;
|
||||||
|
},
|
||||||
|
|
||||||
|
[&](const StorePath & path) {
|
||||||
|
checkInterrupt();
|
||||||
|
|
||||||
|
auto & [info_, source_] = *infosMap.at(path);
|
||||||
|
auto info = info_;
|
||||||
|
info.ultimate = false;
|
||||||
|
|
||||||
|
/* Make sure that the Source object is destroyed when
|
||||||
|
we're done. In particular, a SinkToSource object must
|
||||||
|
be destroyed to ensure that the destructors on its
|
||||||
|
stack frame are run; this includes
|
||||||
|
LegacySSHStore::narFromPath()'s connection lock. */
|
||||||
|
auto source = std::move(source_);
|
||||||
|
|
||||||
|
if (!isValidPath(info.path)) {
|
||||||
|
MaintainCount<decltype(nrRunning)> mc(nrRunning);
|
||||||
|
showProgress();
|
||||||
|
try {
|
||||||
|
addToStore(info, *source, repair, checkSigs);
|
||||||
|
} catch (Error & e) {
|
||||||
|
nrFailed++;
|
||||||
|
if (!settings.keepGoing)
|
||||||
|
throw e;
|
||||||
|
printMsg(lvlError, "could not copy %s: %s", printStorePath(path), e.what());
|
||||||
|
showProgress();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
nrDone++;
|
||||||
|
showProgress();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
void Store::addMultipleToStore(
|
void Store::addMultipleToStore(
|
||||||
Source & source,
|
Source & source,
|
||||||
|
@ -992,113 +1070,61 @@ std::map<StorePath, StorePath> copyPaths(
|
||||||
for (auto & path : storePaths)
|
for (auto & path : storePaths)
|
||||||
if (!valid.count(path)) missing.insert(path);
|
if (!valid.count(path)) missing.insert(path);
|
||||||
|
|
||||||
|
Activity act(*logger, lvlInfo, actCopyPaths, fmt("copying %d paths", missing.size()));
|
||||||
|
|
||||||
|
// In the general case, `addMultipleToStore` requires a sorted list of
|
||||||
|
// store paths to add, so sort them right now
|
||||||
|
auto sortedMissing = srcStore.topoSortPaths(missing);
|
||||||
|
std::reverse(sortedMissing.begin(), sortedMissing.end());
|
||||||
|
|
||||||
std::map<StorePath, StorePath> pathsMap;
|
std::map<StorePath, StorePath> pathsMap;
|
||||||
for (auto & path : storePaths)
|
for (auto & path : storePaths)
|
||||||
pathsMap.insert_or_assign(path, path);
|
pathsMap.insert_or_assign(path, path);
|
||||||
|
|
||||||
Activity act(*logger, lvlInfo, actCopyPaths, fmt("copying %d paths", missing.size()));
|
Store::PathsSource pathsToCopy;
|
||||||
|
|
||||||
auto sorted = srcStore.topoSortPaths(missing);
|
auto computeStorePathForDst = [&](const ValidPathInfo & currentPathInfo) -> StorePath {
|
||||||
std::reverse(sorted.begin(), sorted.end());
|
auto storePathForSrc = currentPathInfo.path;
|
||||||
|
auto storePathForDst = storePathForSrc;
|
||||||
|
if (currentPathInfo.ca && currentPathInfo.references.empty()) {
|
||||||
|
storePathForDst = dstStore.makeFixedOutputPathFromCA(storePathForSrc.name(), *currentPathInfo.ca);
|
||||||
|
if (dstStore.storeDir == srcStore.storeDir)
|
||||||
|
assert(storePathForDst == storePathForSrc);
|
||||||
|
if (storePathForDst != storePathForSrc)
|
||||||
|
debug("replaced path '%s' to '%s' for substituter '%s'",
|
||||||
|
srcStore.printStorePath(storePathForSrc),
|
||||||
|
dstStore.printStorePath(storePathForDst),
|
||||||
|
dstStore.getUri());
|
||||||
|
}
|
||||||
|
return storePathForDst;
|
||||||
|
};
|
||||||
|
|
||||||
auto source = sinkToSource([&](Sink & sink) {
|
for (auto & missingPath : sortedMissing) {
|
||||||
sink << sorted.size();
|
auto info = srcStore.queryPathInfo(missingPath);
|
||||||
for (auto & storePath : sorted) {
|
|
||||||
|
auto storePathForDst = computeStorePathForDst(*info);
|
||||||
|
pathsMap.insert_or_assign(missingPath, storePathForDst);
|
||||||
|
|
||||||
|
ValidPathInfo infoForDst = *info;
|
||||||
|
infoForDst.path = storePathForDst;
|
||||||
|
|
||||||
|
auto source = sinkToSource([&](Sink & sink) {
|
||||||
|
// We can reasonably assume that the copy will happen whenever we
|
||||||
|
// read the path, so log something about that at that point
|
||||||
auto srcUri = srcStore.getUri();
|
auto srcUri = srcStore.getUri();
|
||||||
auto dstUri = dstStore.getUri();
|
auto dstUri = dstStore.getUri();
|
||||||
auto storePathS = srcStore.printStorePath(storePath);
|
auto storePathS = srcStore.printStorePath(missingPath);
|
||||||
Activity act(*logger, lvlInfo, actCopyPath,
|
Activity act(*logger, lvlInfo, actCopyPath,
|
||||||
makeCopyPathMessage(srcUri, dstUri, storePathS),
|
makeCopyPathMessage(srcUri, dstUri, storePathS),
|
||||||
{storePathS, srcUri, dstUri});
|
{storePathS, srcUri, dstUri});
|
||||||
PushActivity pact(act.id);
|
PushActivity pact(act.id);
|
||||||
|
|
||||||
auto info = srcStore.queryPathInfo(storePath);
|
srcStore.narFromPath(missingPath, sink);
|
||||||
info->write(sink, srcStore, 16);
|
|
||||||
srcStore.narFromPath(storePath, sink);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
dstStore.addMultipleToStore(*source, repair, checkSigs);
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
std::atomic<size_t> nrDone{0};
|
|
||||||
std::atomic<size_t> nrFailed{0};
|
|
||||||
std::atomic<uint64_t> bytesExpected{0};
|
|
||||||
std::atomic<uint64_t> nrRunning{0};
|
|
||||||
|
|
||||||
auto showProgress = [&]() {
|
|
||||||
act.progress(nrDone, missing.size(), nrRunning, nrFailed);
|
|
||||||
};
|
|
||||||
|
|
||||||
ThreadPool pool;
|
|
||||||
|
|
||||||
processGraph<StorePath>(pool,
|
|
||||||
StorePathSet(missing.begin(), missing.end()),
|
|
||||||
|
|
||||||
[&](const StorePath & storePath) {
|
|
||||||
auto info = srcStore.queryPathInfo(storePath);
|
|
||||||
auto storePathForDst = storePath;
|
|
||||||
if (info->ca && info->references.empty()) {
|
|
||||||
storePathForDst = dstStore.makeFixedOutputPathFromCA(storePath.name(), *info->ca);
|
|
||||||
if (dstStore.storeDir == srcStore.storeDir)
|
|
||||||
assert(storePathForDst == storePath);
|
|
||||||
if (storePathForDst != storePath)
|
|
||||||
debug("replaced path '%s' to '%s' for substituter '%s'",
|
|
||||||
srcStore.printStorePath(storePath),
|
|
||||||
dstStore.printStorePath(storePathForDst),
|
|
||||||
dstStore.getUri());
|
|
||||||
}
|
|
||||||
pathsMap.insert_or_assign(storePath, storePathForDst);
|
|
||||||
|
|
||||||
if (dstStore.isValidPath(storePath)) {
|
|
||||||
nrDone++;
|
|
||||||
showProgress();
|
|
||||||
return StorePathSet();
|
|
||||||
}
|
|
||||||
|
|
||||||
bytesExpected += info->narSize;
|
|
||||||
act.setExpected(actCopyPath, bytesExpected);
|
|
||||||
|
|
||||||
return info->references;
|
|
||||||
},
|
|
||||||
|
|
||||||
[&](const StorePath & storePath) {
|
|
||||||
checkInterrupt();
|
|
||||||
|
|
||||||
auto info = srcStore.queryPathInfo(storePath);
|
|
||||||
|
|
||||||
auto storePathForDst = storePath;
|
|
||||||
if (info->ca && info->references.empty()) {
|
|
||||||
storePathForDst = dstStore.makeFixedOutputPathFromCA(storePath.name(), *info->ca);
|
|
||||||
if (dstStore.storeDir == srcStore.storeDir)
|
|
||||||
assert(storePathForDst == storePath);
|
|
||||||
if (storePathForDst != storePath)
|
|
||||||
debug("replaced path '%s' to '%s' for substituter '%s'",
|
|
||||||
srcStore.printStorePath(storePath),
|
|
||||||
dstStore.printStorePath(storePathForDst),
|
|
||||||
dstStore.getUri());
|
|
||||||
}
|
|
||||||
pathsMap.insert_or_assign(storePath, storePathForDst);
|
|
||||||
|
|
||||||
if (!dstStore.isValidPath(storePathForDst)) {
|
|
||||||
MaintainCount<decltype(nrRunning)> mc(nrRunning);
|
|
||||||
showProgress();
|
|
||||||
try {
|
|
||||||
copyStorePath(srcStore, dstStore, storePath, repair, checkSigs);
|
|
||||||
} catch (Error &e) {
|
|
||||||
nrFailed++;
|
|
||||||
if (!settings.keepGoing)
|
|
||||||
throw e;
|
|
||||||
printMsg(lvlError, "could not copy %s: %s", dstStore.printStorePath(storePath), e.what());
|
|
||||||
showProgress();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
nrDone++;
|
|
||||||
showProgress();
|
|
||||||
});
|
});
|
||||||
#endif
|
pathsToCopy.push_back(std::pair{infoForDst, std::move(source)});
|
||||||
|
}
|
||||||
|
|
||||||
|
dstStore.addMultipleToStore(pathsToCopy, act, repair, checkSigs);
|
||||||
|
|
||||||
return pathsMap;
|
return pathsMap;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include "nar-info.hh"
|
||||||
#include "realisation.hh"
|
#include "realisation.hh"
|
||||||
#include "path.hh"
|
#include "path.hh"
|
||||||
#include "derived-path.hh"
|
#include "derived-path.hh"
|
||||||
|
@ -359,12 +360,22 @@ public:
|
||||||
virtual void addToStore(const ValidPathInfo & info, Source & narSource,
|
virtual void addToStore(const ValidPathInfo & info, Source & narSource,
|
||||||
RepairFlag repair = NoRepair, CheckSigsFlag checkSigs = CheckSigs) = 0;
|
RepairFlag repair = NoRepair, CheckSigsFlag checkSigs = CheckSigs) = 0;
|
||||||
|
|
||||||
|
// A list of paths infos along with a source providing the content of the
|
||||||
|
// associated store path
|
||||||
|
using PathsSource = std::vector<std::pair<ValidPathInfo, std::unique_ptr<Source>>>;
|
||||||
|
|
||||||
/* Import multiple paths into the store. */
|
/* Import multiple paths into the store. */
|
||||||
virtual void addMultipleToStore(
|
virtual void addMultipleToStore(
|
||||||
Source & source,
|
Source & source,
|
||||||
RepairFlag repair = NoRepair,
|
RepairFlag repair = NoRepair,
|
||||||
CheckSigsFlag checkSigs = CheckSigs);
|
CheckSigsFlag checkSigs = CheckSigs);
|
||||||
|
|
||||||
|
virtual void addMultipleToStore(
|
||||||
|
PathsSource & pathsToCopy,
|
||||||
|
Activity & act,
|
||||||
|
RepairFlag repair = NoRepair,
|
||||||
|
CheckSigsFlag checkSigs = CheckSigs);
|
||||||
|
|
||||||
/* Copy the contents of a path to the store and register the
|
/* Copy the contents of a path to the store and register the
|
||||||
validity the resulting path. The resulting path is returned.
|
validity the resulting path. The resulting path is returned.
|
||||||
The function object `filter' can be used to exclude files (see
|
The function object `filter' can be used to exclude files (see
|
||||||
|
|
Loading…
Reference in a new issue