Make nix copy parallel again
FILLME
This commit is contained in:
parent
1dd7253133
commit
95f47c28fb
|
@ -258,6 +258,86 @@ StorePath Store::addToStore(
|
|||
return addToStoreFromDump(*source, name, method, hashAlgo, repair, references);
|
||||
}
|
||||
|
||||
void Store::addMultipleToStore(
|
||||
std::vector<std::pair<ValidPathInfo, std::unique_ptr<Source>>> & pathsToCopy,
|
||||
Activity & act,
|
||||
RepairFlag repair,
|
||||
CheckSigsFlag checkSigs)
|
||||
{
|
||||
std::atomic<size_t> nrDone{0};
|
||||
std::atomic<size_t> nrFailed{0};
|
||||
std::atomic<uint64_t> bytesExpected{0};
|
||||
std::atomic<uint64_t> nrRunning{0};
|
||||
|
||||
using PathWithInfo = std::pair<ValidPathInfo, std::unique_ptr<Source>>;
|
||||
|
||||
std::map<StorePath, PathWithInfo*> infosMap;
|
||||
StorePathSet storePathsToAdd;
|
||||
for (auto & thingToAdd : pathsToCopy) {
|
||||
infosMap.insert_or_assign(thingToAdd.first.path, &thingToAdd);
|
||||
storePathsToAdd.insert(thingToAdd.first.path);
|
||||
}
|
||||
|
||||
auto showProgress = [&]() {
|
||||
act.progress(nrDone, pathsToCopy.size(), nrRunning, nrFailed);
|
||||
};
|
||||
|
||||
ThreadPool pool;
|
||||
|
||||
processGraph<StorePath>(pool,
|
||||
storePathsToAdd,
|
||||
|
||||
[&](const StorePath & path) {
|
||||
auto & [info, source] = *infosMap.at(path);
|
||||
/* auto storePathForDst = info.storePath; */
|
||||
/* if (info->ca && info->references.empty()) { */
|
||||
/* storePathForDst = dstStore.makeFixedOutputPathFromCA(storePath.name(), *info->ca); */
|
||||
/* if (dstStore.storeDir == srcStore.storeDir) */
|
||||
/* assert(storePathForDst == storePath); */
|
||||
/* if (storePathForDst != storePath) */
|
||||
/* debug("replaced path '%s' to '%s' for substituter '%s'", */
|
||||
/* srcStore.printStorePath(storePath), */
|
||||
/* dstStore.printStorePath(storePathForDst), */
|
||||
/* dstStore.getUri()); */
|
||||
/* } */
|
||||
/* pathsMap.insert_or_assign(storePath, storePathForDst); */
|
||||
|
||||
if (isValidPath(info.path)) {
|
||||
nrDone++;
|
||||
showProgress();
|
||||
return StorePathSet();
|
||||
}
|
||||
|
||||
bytesExpected += info.narSize;
|
||||
act.setExpected(actCopyPath, bytesExpected);
|
||||
|
||||
return info.references;
|
||||
},
|
||||
|
||||
[&](const StorePath & path) {
|
||||
checkInterrupt();
|
||||
|
||||
auto & [info, source] = *infosMap.at(path);
|
||||
|
||||
if (!isValidPath(info.path)) {
|
||||
MaintainCount<decltype(nrRunning)> mc(nrRunning);
|
||||
showProgress();
|
||||
try {
|
||||
addToStore(info, *source, repair, checkSigs);
|
||||
} catch (Error &e) {
|
||||
nrFailed++;
|
||||
if (!settings.keepGoing)
|
||||
throw e;
|
||||
printMsg(lvlError, "could not copy %s: %s", printStorePath(path), e.what());
|
||||
showProgress();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
nrDone++;
|
||||
showProgress();
|
||||
});
|
||||
}
|
||||
|
||||
void Store::addMultipleToStore(
|
||||
Source & source,
|
||||
|
@ -998,106 +1078,39 @@ std::map<StorePath, StorePath> copyPaths(
|
|||
|
||||
Activity act(*logger, lvlInfo, actCopyPaths, fmt("copying %d paths", missing.size()));
|
||||
|
||||
auto sorted = srcStore.topoSortPaths(missing);
|
||||
std::reverse(sorted.begin(), sorted.end());
|
||||
/* auto sorted = srcStore.topoSortPaths(missing); */
|
||||
/* std::reverse(sorted.begin(), sorted.end()); */
|
||||
|
||||
auto source = sinkToSource([&](Sink & sink) {
|
||||
sink << sorted.size();
|
||||
for (auto & storePath : sorted) {
|
||||
auto srcUri = srcStore.getUri();
|
||||
auto dstUri = dstStore.getUri();
|
||||
auto storePathS = srcStore.printStorePath(storePath);
|
||||
Activity act(*logger, lvlInfo, actCopyPath,
|
||||
makeCopyPathMessage(srcUri, dstUri, storePathS),
|
||||
{storePathS, srcUri, dstUri});
|
||||
PushActivity pact(act.id);
|
||||
/* auto source = sinkToSource([&](Sink & sink) { */
|
||||
/* sink << sorted.size(); */
|
||||
/* for (auto & storePath : sorted) { */
|
||||
/* auto srcUri = srcStore.getUri(); */
|
||||
/* auto dstUri = dstStore.getUri(); */
|
||||
/* auto storePathS = srcStore.printStorePath(storePath); */
|
||||
/* Activity act(*logger, lvlInfo, actCopyPath, */
|
||||
/* makeCopyPathMessage(srcUri, dstUri, storePathS), */
|
||||
/* {storePathS, srcUri, dstUri}); */
|
||||
/* PushActivity pact(act.id); */
|
||||
|
||||
auto info = srcStore.queryPathInfo(storePath);
|
||||
info->write(sink, srcStore, 16);
|
||||
srcStore.narFromPath(storePath, sink);
|
||||
}
|
||||
});
|
||||
/* auto info = srcStore.queryPathInfo(storePath); */
|
||||
/* info->write(sink, srcStore, 16); */
|
||||
/* srcStore.narFromPath(storePath, sink); */
|
||||
/* } */
|
||||
/* }); */
|
||||
|
||||
dstStore.addMultipleToStore(*source, repair, checkSigs);
|
||||
std::vector<std::pair<ValidPathInfo, std::unique_ptr<Source>>> pathsToCopy;
|
||||
|
||||
for (auto & missingPath : missing) {
|
||||
auto info = srcStore.queryPathInfo(missingPath);
|
||||
auto source = sinkToSource([&](Sink & sink) {
|
||||
srcStore.narFromPath(missingPath, sink);
|
||||
});
|
||||
pathsToCopy.push_back(std::pair{*info, std::move(source)});
|
||||
}
|
||||
|
||||
dstStore.addMultipleToStore(pathsToCopy, act, repair, checkSigs);
|
||||
|
||||
#if 0
|
||||
std::atomic<size_t> nrDone{0};
|
||||
std::atomic<size_t> nrFailed{0};
|
||||
std::atomic<uint64_t> bytesExpected{0};
|
||||
std::atomic<uint64_t> nrRunning{0};
|
||||
|
||||
auto showProgress = [&]() {
|
||||
act.progress(nrDone, missing.size(), nrRunning, nrFailed);
|
||||
};
|
||||
|
||||
ThreadPool pool;
|
||||
|
||||
processGraph<StorePath>(pool,
|
||||
StorePathSet(missing.begin(), missing.end()),
|
||||
|
||||
[&](const StorePath & storePath) {
|
||||
auto info = srcStore.queryPathInfo(storePath);
|
||||
auto storePathForDst = storePath;
|
||||
if (info->ca && info->references.empty()) {
|
||||
storePathForDst = dstStore.makeFixedOutputPathFromCA(storePath.name(), *info->ca);
|
||||
if (dstStore.storeDir == srcStore.storeDir)
|
||||
assert(storePathForDst == storePath);
|
||||
if (storePathForDst != storePath)
|
||||
debug("replaced path '%s' to '%s' for substituter '%s'",
|
||||
srcStore.printStorePath(storePath),
|
||||
dstStore.printStorePath(storePathForDst),
|
||||
dstStore.getUri());
|
||||
}
|
||||
pathsMap.insert_or_assign(storePath, storePathForDst);
|
||||
|
||||
if (dstStore.isValidPath(storePath)) {
|
||||
nrDone++;
|
||||
showProgress();
|
||||
return StorePathSet();
|
||||
}
|
||||
|
||||
bytesExpected += info->narSize;
|
||||
act.setExpected(actCopyPath, bytesExpected);
|
||||
|
||||
return info->references;
|
||||
},
|
||||
|
||||
[&](const StorePath & storePath) {
|
||||
checkInterrupt();
|
||||
|
||||
auto info = srcStore.queryPathInfo(storePath);
|
||||
|
||||
auto storePathForDst = storePath;
|
||||
if (info->ca && info->references.empty()) {
|
||||
storePathForDst = dstStore.makeFixedOutputPathFromCA(storePath.name(), *info->ca);
|
||||
if (dstStore.storeDir == srcStore.storeDir)
|
||||
assert(storePathForDst == storePath);
|
||||
if (storePathForDst != storePath)
|
||||
debug("replaced path '%s' to '%s' for substituter '%s'",
|
||||
srcStore.printStorePath(storePath),
|
||||
dstStore.printStorePath(storePathForDst),
|
||||
dstStore.getUri());
|
||||
}
|
||||
pathsMap.insert_or_assign(storePath, storePathForDst);
|
||||
|
||||
if (!dstStore.isValidPath(storePathForDst)) {
|
||||
MaintainCount<decltype(nrRunning)> mc(nrRunning);
|
||||
showProgress();
|
||||
try {
|
||||
copyStorePath(srcStore, dstStore, storePath, repair, checkSigs);
|
||||
} catch (Error &e) {
|
||||
nrFailed++;
|
||||
if (!settings.keepGoing)
|
||||
throw e;
|
||||
printMsg(lvlError, "could not copy %s: %s", dstStore.printStorePath(storePath), e.what());
|
||||
showProgress();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
nrDone++;
|
||||
showProgress();
|
||||
});
|
||||
#endif
|
||||
|
||||
return pathsMap;
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
#pragma once
|
||||
|
||||
#include "nar-info.hh"
|
||||
#include "realisation.hh"
|
||||
#include "path.hh"
|
||||
#include "derived-path.hh"
|
||||
|
@ -364,6 +365,12 @@ public:
|
|||
Source & source,
|
||||
RepairFlag repair = NoRepair,
|
||||
CheckSigsFlag checkSigs = CheckSigs);
|
||||
virtual void addMultipleToStore(
|
||||
std::vector<std::pair<ValidPathInfo, std::unique_ptr<Source>>> & pathsToCopy,
|
||||
Activity & act,
|
||||
RepairFlag repair = NoRepair,
|
||||
CheckSigsFlag checkSigs = CheckSigs
|
||||
);
|
||||
|
||||
/* Copy the contents of a path to the store and register the
|
||||
validity the resulting path. The resulting path is returned.
|
||||
|
|
Loading…
Reference in a new issue