forked from lix-project/hydra
buildRemote(): Copy paths to the destination store in O(1) memory
This commit is contained in:
parent
cbcf6359b4
commit
7622cbfe37
1 changed files with 56 additions and 9 deletions
|
@ -124,6 +124,38 @@ static void copyClosureTo(std::timed_mutex & sendMutex, ref<Store> destStore,
|
|||
}
|
||||
|
||||
|
||||
// FIXME: use Store::topoSortPaths().
|
||||
StorePaths topoSortPaths(const std::map<StorePath, ValidPathInfo> & paths)
|
||||
{
|
||||
StorePaths sorted;
|
||||
StorePathSet visited;
|
||||
|
||||
std::function<void(const StorePath & path)> dfsVisit;
|
||||
|
||||
dfsVisit = [&](const StorePath & path) {
|
||||
if (!visited.insert(path).second) return;
|
||||
|
||||
auto info = paths.find(path);
|
||||
auto references = info == paths.end() ? StorePathSet() : info->second.references;
|
||||
|
||||
for (auto & i : references)
|
||||
/* Don't traverse into paths that don't exist. That can
|
||||
happen due to substitutes for non-existent paths. */
|
||||
if (i != path && paths.count(i))
|
||||
dfsVisit(i);
|
||||
|
||||
sorted.push_back(path);
|
||||
};
|
||||
|
||||
for (auto & i : paths)
|
||||
dfsVisit(i.first);
|
||||
|
||||
std::reverse(sorted.begin(), sorted.end());
|
||||
|
||||
return sorted;
|
||||
}
|
||||
|
||||
|
||||
void State::buildRemote(ref<Store> destStore,
|
||||
Machine::ptr machine, Step::ptr step,
|
||||
unsigned int maxSilentTime, unsigned int buildTimeout, unsigned int repeats,
|
||||
|
@ -148,6 +180,7 @@ void State::buildRemote(ref<Store> destStore,
|
|||
|
||||
updateStep(ssConnecting);
|
||||
|
||||
// FIXME: rewrite to use Store.
|
||||
Child child;
|
||||
openConnection(machine, tmpDir, logFD.get(), child);
|
||||
|
||||
|
@ -182,7 +215,7 @@ void State::buildRemote(ref<Store> destStore,
|
|||
unsigned int remoteVersion;
|
||||
|
||||
try {
|
||||
to << SERVE_MAGIC_1 << 0x203;
|
||||
to << SERVE_MAGIC_1 << 0x204;
|
||||
to.flush();
|
||||
|
||||
unsigned int magic = readInt(from);
|
||||
|
@ -405,17 +438,26 @@ void State::buildRemote(ref<Store> destStore,
|
|||
|
||||
auto outputs = step->drv->outputPaths();
|
||||
|
||||
/* Query the size of the output paths. */
|
||||
/* Get info about each output path. */
|
||||
std::map<StorePath, ValidPathInfo> infos;
|
||||
size_t totalNarSize = 0;
|
||||
to << cmdQueryPathInfos;
|
||||
writeStorePaths(*localStore, to, outputs);
|
||||
to.flush();
|
||||
while (true) {
|
||||
if (readString(from) == "") break;
|
||||
auto storePathS = readString(from);
|
||||
if (storePathS == "") break;
|
||||
ValidPathInfo info(localStore->parseStorePath(storePathS));
|
||||
assert(outputs.count(info.path));
|
||||
readString(from); // deriver
|
||||
readStrings<PathSet>(from); // references
|
||||
info.references = readStorePaths<StorePathSet>(*localStore, from);
|
||||
readLongLong(from); // download size
|
||||
totalNarSize += readLongLong(from);
|
||||
info.narSize = readLongLong(from);
|
||||
totalNarSize += info.narSize;
|
||||
info.narHash = Hash(readString(from), htSHA256);
|
||||
info.ca = parseContentAddressOpt(readString(from));
|
||||
readStrings<StringSet>(from); // sigs
|
||||
infos.insert_or_assign(info.path, info);
|
||||
}
|
||||
|
||||
if (totalNarSize > maxOutputSize) {
|
||||
|
@ -423,13 +465,18 @@ void State::buildRemote(ref<Store> destStore,
|
|||
return;
|
||||
}
|
||||
|
||||
/* Copy each path. */
|
||||
printMsg(lvlDebug, "copying outputs of ‘%s’ from ‘%s’ (%d bytes)",
|
||||
localStore->printStorePath(step->drvPath), machine->sshName, totalNarSize);
|
||||
|
||||
to << cmdExportPaths << 0;
|
||||
writeStorePaths(*localStore, to, outputs);
|
||||
auto pathsSorted = topoSortPaths(infos);
|
||||
|
||||
for (auto & path : pathsSorted) {
|
||||
auto & info = infos.find(path)->second;
|
||||
to << cmdDumpStorePath << localStore->printStorePath(path);
|
||||
to.flush();
|
||||
destStore->importPaths(from, /* result.accessor, */ NoCheckSigs);
|
||||
destStore->addToStore(info, from);
|
||||
}
|
||||
|
||||
auto now2 = std::chrono::steady_clock::now();
|
||||
|
||||
|
|
Loading…
Reference in a new issue