diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index e9f2cee80..f0e3502bf 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -629,17 +629,37 @@ RemoteStore::Connection::~Connection() } +static Logger::Fields readFields(Source & from) +{ + Logger::Fields fields; + size_t size = readInt(from); + for (size_t n = 0; n < size; n++) { + auto type = (decltype(Logger::Field::type)) readInt(from); + if (type == Logger::Field::tInt) + fields.push_back(readNum(from)); + else if (type == Logger::Field::tString) + fields.push_back(readString(from)); + else + throw Error("got unsupported field type %x from Nix daemon", (int) type); + } + return fields; +} + + void RemoteStore::Connection::processStderr(Sink * sink, Source * source) { to.flush(); - unsigned int msg; - while ((msg = readInt(from)) == STDERR_NEXT - || msg == STDERR_READ || msg == STDERR_WRITE) { + + while (true) { + + auto msg = readNum(from); + if (msg == STDERR_WRITE) { string s = readString(from); if (!sink) throw Error("no sink"); (*sink)(s); } + else if (msg == STDERR_READ) { if (!source) throw Error("no source"); size_t len = readNum(from); @@ -647,16 +667,43 @@ void RemoteStore::Connection::processStderr(Sink * sink, Source * source) writeString(buf.get(), source->read(buf.get(), len), to); to.flush(); } - else + + else if (msg == STDERR_ERROR) { + string error = readString(from); + unsigned int status = readInt(from); + throw Error(status, error); + } + + else if (msg == STDERR_NEXT) printError(chomp(readString(from))); + + else if (msg == STDERR_START_ACTIVITY) { + auto act = readNum(from); + auto type = (ActivityType) readInt(from); + auto s = readString(from); + auto fields = readFields(from); + auto parent = readNum(from); + logger->startActivity(act, type, s, fields, parent); + } + + else if (msg == STDERR_STOP_ACTIVITY) { + auto act = readNum(from); + logger->stopActivity(act); + } + + else if (msg == STDERR_RESULT) { + auto act = readNum(from); + auto type = (ResultType) readInt(from); + auto fields = readFields(from); + logger->result(act, type, fields); + } + + else if (msg == STDERR_LAST) + break; + + else + throw Error("got unknown message type %x from Nix daemon", msg); } - if (msg == STDERR_ERROR) { - string error = readString(from); - unsigned int status = readInt(from); - throw Error(status, error); - } - else if (msg != STDERR_LAST) - throw Error("protocol error processing standard error"); } diff --git a/src/libstore/worker-protocol.hh b/src/libstore/worker-protocol.hh index 6c6766b36..9daeb46ad 100644 --- a/src/libstore/worker-protocol.hh +++ b/src/libstore/worker-protocol.hh @@ -57,6 +57,9 @@ typedef enum { #define STDERR_WRITE 0x64617416 // data for sink #define STDERR_LAST 0x616c7473 #define STDERR_ERROR 0x63787470 +#define STDERR_START_ACTIVITY 0x53545254 +#define STDERR_STOP_ACTIVITY 0x53544f50 +#define STDERR_RESULT 0x52534c54 Path readStorePath(Store & store, Source & from); diff --git a/src/libutil/logging.hh b/src/libutil/logging.hh index 2ec15cb68..e3e7c8e6f 100644 --- a/src/libutil/logging.hh +++ b/src/libutil/logging.hh @@ -47,7 +47,7 @@ public: struct Field { // FIXME: use std::variant. - enum { tInt, tString } type; + enum { tInt = 0, tString = 1 } type; uint64_t i = 0; std::string s; Field(const std::string & s) : type(tString), s(s) { } diff --git a/src/nix-daemon/nix-daemon.cc b/src/nix-daemon/nix-daemon.cc index 3237333f4..65c88562c 100644 --- a/src/nix-daemon/nix-daemon.cc +++ b/src/nix-daemon/nix-daemon.cc @@ -56,6 +56,21 @@ static FdSource from(STDIN_FILENO); static FdSink to(STDOUT_FILENO); +Sink & operator << (Sink & sink, const Logger::Fields & fields) +{ + sink << fields.size(); + for (auto & f : fields) { + sink << f.type; + if (f.type == Logger::Field::tInt) + sink << f.i; + else if (f.type == Logger::Field::tString) + sink << f.s; + else abort(); + } + return sink; +} + + /* Logger that forwards log messages to the client, *if* we're in a state where the protocol allows it (i.e., when canSendStderr is true). */ @@ -69,15 +84,14 @@ struct TunnelLogger : public Logger Sync state_; - void log(Verbosity lvl, const FormatOrString & fs) override + void enqueueMsg(const std::string & s) { - if (lvl > verbosity) return; - auto state(state_.lock()); if (state->canSendStderr) { + assert(state->pendingMsgs.empty()); try { - to << STDERR_NEXT << (fs.s + "\n"); + to(s); to.flush(); } catch (...) { /* Write failed; that means that the other side is @@ -86,7 +100,16 @@ struct TunnelLogger : public Logger throw; } } else - state->pendingMsgs.push_back(fs.s); + state->pendingMsgs.push_back(s); + } + + void log(Verbosity lvl, const FormatOrString & fs) override + { + if (lvl > verbosity) return; + + StringSink buf; + buf << STDERR_NEXT << (fs.s + "\n"); + enqueueMsg(*buf.s); } /* startWork() means that we're starting an operation for which we @@ -99,7 +122,7 @@ struct TunnelLogger : public Logger state->canSendStderr = true; for (auto & msg : state->pendingMsgs) - to << STDERR_NEXT << (msg + "\n"); + to(msg); state->pendingMsgs.clear(); @@ -121,6 +144,28 @@ struct TunnelLogger : public Logger if (status != 0) to << status; } } + + void startActivity(ActivityId act, ActivityType type, + const std::string & s, const Fields & fields, ActivityId parent) override + { + StringSink buf; + buf << STDERR_START_ACTIVITY << act << type << s << fields << parent; + enqueueMsg(*buf.s); + } + + void stopActivity(ActivityId act) override + { + StringSink buf; + buf << STDERR_STOP_ACTIVITY << act; + enqueueMsg(*buf.s); + } + + void result(ActivityId act, ResultType type, const Fields & fields) override + { + StringSink buf; + buf << STDERR_RESULT << act << type << fields; + enqueueMsg(*buf.s); + } }; @@ -665,16 +710,15 @@ static void processConnection(bool trusted) { MonitorFdHup monitor(from.fd); - TunnelLogger tunnelLogger; + auto tunnelLogger = new TunnelLogger(); auto prevLogger = nix::logger; - logger = &tunnelLogger; + logger = tunnelLogger; unsigned int opCount = 0; Finally finally([&]() { - logger = prevLogger; _isInterrupted = false; - debug("%d operations", opCount); + prevLogger->log(lvlDebug, fmt("%d operations", opCount)); }); /* Exchange the greeting. */ @@ -693,7 +737,7 @@ static void processConnection(bool trusted) readInt(from); // obsolete reserveSpace /* Send startup error messages to the client. */ - tunnelLogger.startWork(); + tunnelLogger->startWork(); try { @@ -713,7 +757,7 @@ static void processConnection(bool trusted) params["path-info-cache-size"] = "0"; auto store = make_ref(params); - tunnelLogger.stopWork(); + tunnelLogger->stopWork(); to.flush(); /* Process client requests. */ @@ -730,28 +774,28 @@ static void processConnection(bool trusted) opCount++; try { - performOp(&tunnelLogger, store, trusted, clientVersion, from, to, op); + performOp(tunnelLogger, store, trusted, clientVersion, from, to, op); } catch (Error & e) { /* If we're not in a state where we can send replies, then something went wrong processing the input of the client. This can happen especially if I/O errors occur during addTextToStore() / importPath(). If that happens, just send the error message and exit. */ - bool errorAllowed = tunnelLogger.state_.lock()->canSendStderr; - tunnelLogger.stopWork(false, e.msg(), e.status); + bool errorAllowed = tunnelLogger->state_.lock()->canSendStderr; + tunnelLogger->stopWork(false, e.msg(), e.status); if (!errorAllowed) throw; } catch (std::bad_alloc & e) { - tunnelLogger.stopWork(false, "Nix daemon out of memory", 1); + tunnelLogger->stopWork(false, "Nix daemon out of memory", 1); throw; } to.flush(); - assert(!tunnelLogger.state_.lock()->canSendStderr); + assert(!tunnelLogger->state_.lock()->canSendStderr); }; } catch (std::exception & e) { - tunnelLogger.stopWork(false, e.what(), 1); + tunnelLogger->stopWork(false, e.what(), 1); to.flush(); return; }