Overhaul wopAddToStore

This commit is contained in:
Robert Hensing 2020-09-17 19:27:11 +02:00
parent 14b30b3f3d
commit e34fe47d0c
4 changed files with 133 additions and 84 deletions

View file

@ -349,47 +349,74 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
} }
case wopAddToStore: { case wopAddToStore: {
HashType hashAlgo; if (GET_PROTOCOL_MINOR(clientVersion) >= 25) {
std::string baseName; auto name = readString(from);
FileIngestionMethod method; auto camStr = readString(from);
{ auto refs = readStorePaths<StorePathSet>(*store, from);
bool fixed;
uint8_t recursive; logger->startWork();
std::string hashAlgoRaw; StorePath path = [&]() -> StorePath {
from >> baseName >> fixed /* obsolete */ >> recursive >> hashAlgoRaw; // NB: FramedSource must be out of scope before logger->stopWork();
if (recursive > (uint8_t) FileIngestionMethod::Recursive) ContentAddressMethod contentAddressMethod = parseContentAddressMethod(camStr);
throw Error("unsupported FileIngestionMethod with value of %i; you may need to upgrade nix-daemon", recursive); FramedSource source(from);
method = FileIngestionMethod { recursive }; return std::visit(overloaded {
/* Compatibility hack. */ [&](TextHashMethod &_) -> StorePath {
if (!fixed) { // We could stream this by changing Store
hashAlgoRaw = "sha256"; std::string contents = source.drain();
method = FileIngestionMethod::Recursive; return store->addTextToStore(name, contents, refs);
},
[&](FixedOutputHashMethod &fohm) -> StorePath {
return store->addToStoreFromDump(source, name, fohm.fileIngestionMethod, fohm.hashType);
},
}, contentAddressMethod);
}();
logger->stopWork();
to << store->printStorePath(path);
} else {
HashType hashAlgo;
std::string baseName;
FileIngestionMethod method;
{
bool fixed;
uint8_t recursive;
std::string hashAlgoRaw;
from >> baseName >> fixed /* obsolete */ >> recursive >> hashAlgoRaw;
if (recursive > (uint8_t) FileIngestionMethod::Recursive)
throw Error("unsupported FileIngestionMethod with value of %i; you may need to upgrade nix-daemon", recursive);
method = FileIngestionMethod { recursive };
/* Compatibility hack. */
if (!fixed) {
hashAlgoRaw = "sha256";
method = FileIngestionMethod::Recursive;
}
hashAlgo = parseHashType(hashAlgoRaw);
} }
hashAlgo = parseHashType(hashAlgoRaw);
StringSink saved;
TeeSource savedNARSource(from, saved);
RetrieveRegularNARSink savedRegular { saved };
if (method == FileIngestionMethod::Recursive) {
/* Get the entire NAR dump from the client and save it to
a string so that we can pass it to
addToStoreFromDump(). */
ParseSink sink; /* null sink; just parse the NAR */
parseDump(sink, savedNARSource);
} else
parseDump(savedRegular, from);
logger->startWork();
if (!savedRegular.regular) throw Error("regular file expected");
// FIXME: try to stream directly from `from`.
StringSource dumpSource { *saved.s };
auto path = store->addToStoreFromDump(dumpSource, baseName, method, hashAlgo);
logger->stopWork();
to << store->printStorePath(path);
} }
StringSink saved;
TeeSource savedNARSource(from, saved);
RetrieveRegularNARSink savedRegular { saved };
if (method == FileIngestionMethod::Recursive) {
/* Get the entire NAR dump from the client and save it to
a string so that we can pass it to
addToStoreFromDump(). */
ParseSink sink; /* null sink; just parse the NAR */
parseDump(sink, savedNARSource);
} else
parseDump(savedRegular, from);
logger->startWork();
if (!savedRegular.regular) throw Error("regular file expected");
// FIXME: try to stream directly from `from`.
StringSource dumpSource { *saved.s };
auto path = store->addToStoreFromDump(dumpSource, baseName, method, hashAlgo);
logger->stopWork();
to << store->printStorePath(path);
break; break;
} }

View file

@ -526,6 +526,69 @@ std::optional<StorePath> RemoteStore::queryPathFromHashPart(const std::string &
} }
StorePath RemoteStore::addToStoreFromDump(Source & dump, const string & name,
FileIngestionMethod method, HashType hashType, RepairFlag repair)
{
if (repair) throw Error("repairing is not supported when adding to store through the Nix daemon");
StorePathSet references;
auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 25) {
conn->to
<< wopAddToStore
<< name
<< renderContentAddressMethod(FixedOutputHashMethod{ .fileIngestionMethod = method, .hashType = hashType });
writeStorePaths(*this, conn->to, references);
conn.withFramedSink([&](Sink & sink) {
dump.drainInto(sink);
});
return parseStorePath(readString(conn->from));
}
else {
if (repair) throw Error("repairing is not supported when building through the Nix daemon protocol < 1.25");
conn->to
<< wopAddToStore
<< name
<< ((hashType == htSHA256 && method == FileIngestionMethod::Recursive) ? 0 : 1) /* backwards compatibility hack */
<< (method == FileIngestionMethod::Recursive ? 1 : 0)
<< printHashType(hashType);
try {
conn->to.written = 0;
conn->to.warn = true;
connections->incCapacity();
{
Finally cleanup([&]() { connections->decCapacity(); });
if (method == FileIngestionMethod::Recursive) {
dump.drainInto(conn->to);
} else {
std::string contents = dump.drain();
dumpString(contents, conn->to);
}
}
conn->to.warn = false;
conn.processStderr();
} catch (SysError & e) {
/* Daemon closed while we were sending the path. Probably OOM
or I/O error. */
if (e.errNo == EPIPE)
try {
conn.processStderr();
} catch (EndOfFile & e) { }
throw;
}
return parseStorePath(readString(conn->from));
}
}
void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
RepairFlag repair, CheckSigsFlag checkSigs) RepairFlag repair, CheckSigsFlag checkSigs)
{ {
@ -579,46 +642,6 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
} }
StorePath RemoteStore::addToStore(const string & name, const Path & _srcPath,
FileIngestionMethod method, HashType hashAlgo, PathFilter & filter, RepairFlag repair)
{
if (repair) throw Error("repairing is not supported when building through the Nix daemon");
auto conn(getConnection());
Path srcPath(absPath(_srcPath));
conn->to
<< wopAddToStore
<< name
<< ((hashAlgo == htSHA256 && method == FileIngestionMethod::Recursive) ? 0 : 1) /* backwards compatibility hack */
<< (method == FileIngestionMethod::Recursive ? 1 : 0)
<< printHashType(hashAlgo);
try {
conn->to.written = 0;
conn->to.warn = true;
connections->incCapacity();
{
Finally cleanup([&]() { connections->decCapacity(); });
dumpPath(srcPath, conn->to, filter);
}
conn->to.warn = false;
conn.processStderr();
} catch (SysError & e) {
/* Daemon closed while we were sending the path. Probably OOM
or I/O error. */
if (e.errNo == EPIPE)
try {
conn.processStderr();
} catch (EndOfFile & e) { }
throw;
}
return parseStorePath(readString(conn->from));
}
StorePath RemoteStore::addTextToStore(const string & name, const string & s, StorePath RemoteStore::addTextToStore(const string & name, const string & s,
const StorePathSet & references, RepairFlag repair) const StorePathSet & references, RepairFlag repair)
{ {

View file

@ -63,13 +63,12 @@ public:
void querySubstitutablePathInfos(const StorePathCAMap & paths, void querySubstitutablePathInfos(const StorePathCAMap & paths,
SubstitutablePathInfos & infos) override; SubstitutablePathInfos & infos) override;
StorePath addToStoreFromDump(Source & dump, const string & name,
FileIngestionMethod method = FileIngestionMethod::Recursive, HashType hashAlgo = htSHA256, RepairFlag repair = NoRepair) override;
void addToStore(const ValidPathInfo & info, Source & nar, void addToStore(const ValidPathInfo & info, Source & nar,
RepairFlag repair, CheckSigsFlag checkSigs) override; RepairFlag repair, CheckSigsFlag checkSigs) override;
StorePath addToStore(const string & name, const Path & srcPath,
FileIngestionMethod method = FileIngestionMethod::Recursive, HashType hashAlgo = htSHA256,
PathFilter & filter = defaultPathFilter, RepairFlag repair = NoRepair) override;
StorePath addTextToStore(const string & name, const string & s, StorePath addTextToStore(const string & name, const string & s,
const StorePathSet & references, RepairFlag repair) override; const StorePathSet & references, RepairFlag repair) override;

View file

@ -6,7 +6,7 @@ namespace nix {
#define WORKER_MAGIC_1 0x6e697863 #define WORKER_MAGIC_1 0x6e697863
#define WORKER_MAGIC_2 0x6478696f #define WORKER_MAGIC_2 0x6478696f
#define PROTOCOL_VERSION 0x118 #define PROTOCOL_VERSION 0x119
#define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00) #define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00)
#define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff) #define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff)