From 7951c3c5460324c652d42f5f92bcae44e0a0b9c7 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Sun, 3 Dec 2006 02:08:13 +0000 Subject: [PATCH] * Some hackery to propagate the worker's stderr and exceptions to the client. --- src/libstore/build.cc | 6 +- src/libstore/remote-store.cc | 39 ++++- src/libstore/remote-store.hh | 2 + src/libstore/worker-protocol.hh | 5 + src/libutil/util.cc | 11 +- src/libutil/util.hh | 2 + src/nix-worker/main.cc | 251 +++++++++++++++++++------------- 7 files changed, 209 insertions(+), 107 deletions(-) diff --git a/src/libstore/build.cc b/src/libstore/build.cc index 71560b2d0..d8b90252b 100644 --- a/src/libstore/build.cc +++ b/src/libstore/build.cc @@ -872,7 +872,7 @@ static void drain(int fd) if (errno != EINTR) throw SysError("draining"); } else if (rd == 0) break; - else writeFull(STDERR_FILENO, buffer, rd); + else writeToStderr(buffer, rd); } } @@ -1610,7 +1610,7 @@ void DerivationGoal::handleChildOutput(int fd, const string & data) { if (fd == logPipe.readSide) { if (verbosity >= buildVerbosity) - writeFull(STDERR_FILENO, (unsigned char *) data.c_str(), data.size()); + writeToStderr((unsigned char *) data.c_str(), data.size()); writeFull(fdLogFile, (unsigned char *) data.c_str(), data.size()); } @@ -1923,7 +1923,7 @@ void SubstitutionGoal::handleChildOutput(int fd, const string & data) { assert(fd == logPipe.readSide); if (verbosity >= buildVerbosity) - writeFull(STDERR_FILENO, (unsigned char *) data.c_str(), data.size()); + writeToStderr((unsigned char *) data.c_str(), data.size()); /* Don't write substitution output to a log file for now. We probably should, though. */ } diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 9b9d74f7e..87547ce91 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -4,6 +4,10 @@ #include "worker-protocol.hh" #include "archive.hh" +#include +#include +#include + #include #include @@ -38,9 +42,15 @@ RemoteStore::RemoteStore() if (dup2(toChild.readSide, STDIN_FILENO) == -1) throw SysError("dupping read side"); - execlp(worker.c_str(), worker.c_str(), - "--slave", NULL); + int fdDebug = open("/tmp/worker-log", O_WRONLY | O_CREAT | O_TRUNC, 0644); + assert(fdDebug != -1); + if (dup2(fdDebug, STDERR_FILENO) == -1) + throw SysError("dupping stderr"); + close(fdDebug); + execlp(worker.c_str(), worker.c_str(), + "-vvv", "--slave", NULL); + throw SysError(format("executing `%1%'") % worker); } catch (std::exception & e) { @@ -66,9 +76,13 @@ RemoteStore::RemoteStore() RemoteStore::~RemoteStore() { - writeInt(wopQuit, to); - readInt(from); - child.wait(true); + try { + fromChild.readSide.close(); + toChild.writeSide.close(); + child.wait(true); + } catch (Error & e) { + printMsg(lvlError, format("error (ignored): %1%") % e.msg()); + } } @@ -158,6 +172,7 @@ void RemoteStore::buildDerivations(const PathSet & drvPaths) { writeInt(wopBuildDerivations, to); writeStringSet(drvPaths, to); + processStderr(); readInt(from); } @@ -185,4 +200,18 @@ void RemoteStore::syncWithGC() } +void RemoteStore::processStderr() +{ + unsigned int msg; + while ((msg = readInt(from)) == STDERR_NEXT) { + string s = readString(from); + writeToStderr((unsigned char *) s.c_str(), s.size()); + } + if (msg == STDERR_ERROR) + throw Error(readString(from)); + else if (msg != STDERR_LAST) + throw Error("protocol error processing standard error"); +} + + } diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh index b11191c09..05d2a21ec 100644 --- a/src/libstore/remote-store.hh +++ b/src/libstore/remote-store.hh @@ -57,6 +57,8 @@ private: FdSink to; FdSource from; Pid child; + + void processStderr(); }; diff --git a/src/libstore/worker-protocol.hh b/src/libstore/worker-protocol.hh index 2700b6719..284477483 100644 --- a/src/libstore/worker-protocol.hh +++ b/src/libstore/worker-protocol.hh @@ -23,4 +23,9 @@ typedef enum { } WorkerOp; +#define STDERR_NEXT 0x6f6c6d67 +#define STDERR_LAST 0x616c7473 +#define STDERR_ERROR 0x63787470 + + #endif /* !__WORKER_PROTOCOL_H */ diff --git a/src/libutil/util.cc b/src/libutil/util.cc index 6d96310da..4460d95b8 100644 --- a/src/libutil/util.cc +++ b/src/libutil/util.cc @@ -437,7 +437,7 @@ void printMsg_(Verbosity level, const format & f) else if (logType == ltEscapes && level != lvlInfo) prefix = "\033[" + escVerbosity(level) + "s"; string s = (format("%1%%2%\n") % prefix % f.str()).str(); - writeFull(STDERR_FILENO, (const unsigned char *) s.c_str(), s.size()); + writeToStderr((const unsigned char *) s.c_str(), s.size()); } @@ -450,6 +450,15 @@ void warnOnce(bool & haveWarned, const format & f) } +static void defaultWriteToStderr(const unsigned char * buf, size_t count) +{ + writeFull(STDERR_FILENO, buf, count); +} + + +void (*writeToStderr) (const unsigned char * buf, size_t count) = defaultWriteToStderr; + + void readFull(int fd, unsigned char * buf, size_t count) { while (count) { diff --git a/src/libutil/util.hh b/src/libutil/util.hh index d49067dfe..0d39ffee9 100644 --- a/src/libutil/util.hh +++ b/src/libutil/util.hh @@ -131,6 +131,8 @@ void printMsg_(Verbosity level, const format & f); void warnOnce(bool & haveWarned, const format & f); +extern void (*writeToStderr) (const unsigned char * buf, size_t count); + /* Wrappers arount read()/write() that read/write exactly the requested number of bytes. */ diff --git a/src/nix-worker/main.cc b/src/nix-worker/main.cc index cf550895e..17e892c64 100644 --- a/src/nix-worker/main.cc +++ b/src/nix-worker/main.cc @@ -10,7 +10,7 @@ using namespace nix; -Path readStorePath(Source & from) +static Path readStorePath(Source & from) { Path path = readString(from); assertStorePath(path); @@ -18,7 +18,7 @@ Path readStorePath(Source & from) } -PathSet readStorePaths(Source & from) +static PathSet readStorePaths(Source & from) { PathSet paths = readStringSet(from); for (PathSet::iterator i = paths.begin(); i != paths.end(); ++i) @@ -27,7 +27,148 @@ PathSet readStorePaths(Source & from) } -void processConnection(Source & from, Sink & to) +static Sink * _to; /* !!! should make writeToStderr an object */ +bool canSendStderr; + + +static void tunnelStderr(const unsigned char * buf, size_t count) +{ + writeFull(STDERR_FILENO, buf, count); + if (canSendStderr) { + try { + writeInt(STDERR_NEXT, *_to); + writeString(string((char *) buf, count), *_to); + } catch (...) { + /* Write failed; that means that the other side is + gone. */ + canSendStderr = false; + throw; + } + } +} + + +/* startWork() means that we're starting an operation for which we + want to send out stderr to the client. */ +static void startWork() +{ + canSendStderr = true; +} + + +/* stopWork() means that we're done; stop sending stderr to the + client. */ +static void stopWork() +{ + canSendStderr = false; + writeInt(STDERR_LAST, *_to); +} + + +static void performOp(Source & from, Sink & to, unsigned int op) +{ + switch (op) { + +#if 0 + case wopQuit: { + /* Close the database. */ + store.reset((StoreAPI *) 0); + writeInt(1, to); + break; + } +#endif + + case wopIsValidPath: { + Path path = readStorePath(from); + writeInt(store->isValidPath(path), to); + break; + } + + case wopHasSubstitutes: { + Path path = readStorePath(from); + writeInt(store->hasSubstitutes(path), to); + break; + } + + case wopQueryPathHash: { + Path path = readStorePath(from); + writeString(printHash(store->queryPathHash(path)), to); + break; + } + + case wopQueryReferences: + case wopQueryReferrers: { + Path path = readStorePath(from); + PathSet paths; + if (op == wopQueryReferences) + store->queryReferences(path, paths); + else + store->queryReferrers(path, paths); + writeStringSet(paths, to); + break; + } + + case wopAddToStore: { + /* !!! uberquick hack */ + string baseName = readString(from); + bool fixed = readInt(from) == 1; + bool recursive = readInt(from) == 1; + string hashAlgo = readString(from); + + Path tmp = createTempDir(); + Path tmp2 = tmp + "/" + baseName; + restorePath(tmp2, from); + + writeString(store->addToStore(tmp2, fixed, recursive, hashAlgo), to); + + deletePath(tmp); + break; + } + + case wopAddTextToStore: { + string suffix = readString(from); + string s = readString(from); + PathSet refs = readStorePaths(from); + writeString(store->addTextToStore(suffix, s, refs), to); + break; + } + + case wopBuildDerivations: { + PathSet drvs = readStorePaths(from); + startWork(); + store->buildDerivations(drvs); + stopWork(); + writeInt(1, to); + break; + } + + case wopEnsurePath: { + Path path = readStorePath(from); + store->ensurePath(path); + writeInt(1, to); + break; + } + + case wopAddTempRoot: { + Path path = readStorePath(from); + store->addTempRoot(path); + writeInt(1, to); + break; + } + + case wopSyncWithGC: { + store->syncWithGC(); + writeInt(1, to); + break; + } + + default: + throw Error(format("invalid operation %1%") % op); + } +} + + +static void processConnection(Source & from, Sink & to) { store = boost::shared_ptr(new LocalStore(true)); @@ -38,112 +179,26 @@ void processConnection(Source & from, Sink & to) debug("greeting exchanged"); + _to = &to; + canSendStderr = false; + writeToStderr = tunnelStderr; + bool quit = false; unsigned int opCount = 0; do { - WorkerOp op = (WorkerOp) readInt(from); opCount++; - switch (op) { - - case wopQuit: { - /* Close the database. */ - store.reset((StoreAPI *) 0); - writeInt(1, to); - quit = true; - break; + try { + performOp(from, to, op); + } catch (Error & e) { + writeInt(STDERR_ERROR, *_to); + writeString(e.msg(), to); } - case wopIsValidPath: { - Path path = readStorePath(from); - writeInt(store->isValidPath(path), to); - break; - } - - case wopHasSubstitutes: { - Path path = readStorePath(from); - writeInt(store->hasSubstitutes(path), to); - break; - } - - case wopQueryPathHash: { - Path path = readStorePath(from); - writeString(printHash(store->queryPathHash(path)), to); - break; - } - - case wopQueryReferences: - case wopQueryReferrers: { - Path path = readStorePath(from); - PathSet paths; - if (op == wopQueryReferences) - store->queryReferences(path, paths); - else - store->queryReferrers(path, paths); - writeStringSet(paths, to); - break; - } - - case wopAddToStore: { - /* !!! uberquick hack */ - string baseName = readString(from); - bool fixed = readInt(from) == 1; - bool recursive = readInt(from) == 1; - string hashAlgo = readString(from); - - Path tmp = createTempDir(); - Path tmp2 = tmp + "/" + baseName; - restorePath(tmp2, from); - - writeString(store->addToStore(tmp2, fixed, recursive, hashAlgo), to); - - deletePath(tmp); - break; - } - - case wopAddTextToStore: { - string suffix = readString(from); - string s = readString(from); - PathSet refs = readStorePaths(from); - writeString(store->addTextToStore(suffix, s, refs), to); - break; - } - - case wopBuildDerivations: { - PathSet drvs = readStorePaths(from); - store->buildDerivations(drvs); - writeInt(1, to); - break; - } - - case wopEnsurePath: { - Path path = readStorePath(from); - store->ensurePath(path); - writeInt(1, to); - break; - } - - case wopAddTempRoot: { - Path path = readStorePath(from); - store->addTempRoot(path); - writeInt(1, to); - break; - } - - case wopSyncWithGC: { - store->syncWithGC(); - writeInt(1, to); - break; - } - - default: - throw Error(format("invalid operation %1%") % op); - } - } while (!quit); printMsg(lvlError, format("%1% worker operations") % opCount);