Tunnel progress messages from the daemon to the client
This makes the progress bar work for non-root users.
This commit is contained in:
parent
e681b1f064
commit
fe34b91289
4 changed files with 124 additions and 30 deletions
src
|
@ -629,17 +629,37 @@ RemoteStore::Connection::~Connection()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static Logger::Fields readFields(Source & from)
|
||||||
|
{
|
||||||
|
Logger::Fields fields;
|
||||||
|
size_t size = readInt(from);
|
||||||
|
for (size_t n = 0; n < size; n++) {
|
||||||
|
auto type = (decltype(Logger::Field::type)) readInt(from);
|
||||||
|
if (type == Logger::Field::tInt)
|
||||||
|
fields.push_back(readNum<uint64_t>(from));
|
||||||
|
else if (type == Logger::Field::tString)
|
||||||
|
fields.push_back(readString(from));
|
||||||
|
else
|
||||||
|
throw Error("got unsupported field type %x from Nix daemon", (int) type);
|
||||||
|
}
|
||||||
|
return fields;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void RemoteStore::Connection::processStderr(Sink * sink, Source * source)
|
void RemoteStore::Connection::processStderr(Sink * sink, Source * source)
|
||||||
{
|
{
|
||||||
to.flush();
|
to.flush();
|
||||||
unsigned int msg;
|
|
||||||
while ((msg = readInt(from)) == STDERR_NEXT
|
while (true) {
|
||||||
|| msg == STDERR_READ || msg == STDERR_WRITE) {
|
|
||||||
|
auto msg = readNum<uint64_t>(from);
|
||||||
|
|
||||||
if (msg == STDERR_WRITE) {
|
if (msg == STDERR_WRITE) {
|
||||||
string s = readString(from);
|
string s = readString(from);
|
||||||
if (!sink) throw Error("no sink");
|
if (!sink) throw Error("no sink");
|
||||||
(*sink)(s);
|
(*sink)(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
else if (msg == STDERR_READ) {
|
else if (msg == STDERR_READ) {
|
||||||
if (!source) throw Error("no source");
|
if (!source) throw Error("no source");
|
||||||
size_t len = readNum<size_t>(from);
|
size_t len = readNum<size_t>(from);
|
||||||
|
@ -647,16 +667,43 @@ void RemoteStore::Connection::processStderr(Sink * sink, Source * source)
|
||||||
writeString(buf.get(), source->read(buf.get(), len), to);
|
writeString(buf.get(), source->read(buf.get(), len), to);
|
||||||
to.flush();
|
to.flush();
|
||||||
}
|
}
|
||||||
else
|
|
||||||
|
else if (msg == STDERR_ERROR) {
|
||||||
|
string error = readString(from);
|
||||||
|
unsigned int status = readInt(from);
|
||||||
|
throw Error(status, error);
|
||||||
|
}
|
||||||
|
|
||||||
|
else if (msg == STDERR_NEXT)
|
||||||
printError(chomp(readString(from)));
|
printError(chomp(readString(from)));
|
||||||
|
|
||||||
|
else if (msg == STDERR_START_ACTIVITY) {
|
||||||
|
auto act = readNum<ActivityId>(from);
|
||||||
|
auto type = (ActivityType) readInt(from);
|
||||||
|
auto s = readString(from);
|
||||||
|
auto fields = readFields(from);
|
||||||
|
auto parent = readNum<ActivityId>(from);
|
||||||
|
logger->startActivity(act, type, s, fields, parent);
|
||||||
|
}
|
||||||
|
|
||||||
|
else if (msg == STDERR_STOP_ACTIVITY) {
|
||||||
|
auto act = readNum<ActivityId>(from);
|
||||||
|
logger->stopActivity(act);
|
||||||
|
}
|
||||||
|
|
||||||
|
else if (msg == STDERR_RESULT) {
|
||||||
|
auto act = readNum<ActivityId>(from);
|
||||||
|
auto type = (ResultType) readInt(from);
|
||||||
|
auto fields = readFields(from);
|
||||||
|
logger->result(act, type, fields);
|
||||||
|
}
|
||||||
|
|
||||||
|
else if (msg == STDERR_LAST)
|
||||||
|
break;
|
||||||
|
|
||||||
|
else
|
||||||
|
throw Error("got unknown message type %x from Nix daemon", msg);
|
||||||
}
|
}
|
||||||
if (msg == STDERR_ERROR) {
|
|
||||||
string error = readString(from);
|
|
||||||
unsigned int status = readInt(from);
|
|
||||||
throw Error(status, error);
|
|
||||||
}
|
|
||||||
else if (msg != STDERR_LAST)
|
|
||||||
throw Error("protocol error processing standard error");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -57,6 +57,9 @@ typedef enum {
|
||||||
#define STDERR_WRITE 0x64617416 // data for sink
|
#define STDERR_WRITE 0x64617416 // data for sink
|
||||||
#define STDERR_LAST 0x616c7473
|
#define STDERR_LAST 0x616c7473
|
||||||
#define STDERR_ERROR 0x63787470
|
#define STDERR_ERROR 0x63787470
|
||||||
|
#define STDERR_START_ACTIVITY 0x53545254
|
||||||
|
#define STDERR_STOP_ACTIVITY 0x53544f50
|
||||||
|
#define STDERR_RESULT 0x52534c54
|
||||||
|
|
||||||
|
|
||||||
Path readStorePath(Store & store, Source & from);
|
Path readStorePath(Store & store, Source & from);
|
||||||
|
|
|
@ -47,7 +47,7 @@ public:
|
||||||
struct Field
|
struct Field
|
||||||
{
|
{
|
||||||
// FIXME: use std::variant.
|
// FIXME: use std::variant.
|
||||||
enum { tInt, tString } type;
|
enum { tInt = 0, tString = 1 } type;
|
||||||
uint64_t i = 0;
|
uint64_t i = 0;
|
||||||
std::string s;
|
std::string s;
|
||||||
Field(const std::string & s) : type(tString), s(s) { }
|
Field(const std::string & s) : type(tString), s(s) { }
|
||||||
|
|
|
@ -56,6 +56,21 @@ static FdSource from(STDIN_FILENO);
|
||||||
static FdSink to(STDOUT_FILENO);
|
static FdSink to(STDOUT_FILENO);
|
||||||
|
|
||||||
|
|
||||||
|
Sink & operator << (Sink & sink, const Logger::Fields & fields)
|
||||||
|
{
|
||||||
|
sink << fields.size();
|
||||||
|
for (auto & f : fields) {
|
||||||
|
sink << f.type;
|
||||||
|
if (f.type == Logger::Field::tInt)
|
||||||
|
sink << f.i;
|
||||||
|
else if (f.type == Logger::Field::tString)
|
||||||
|
sink << f.s;
|
||||||
|
else abort();
|
||||||
|
}
|
||||||
|
return sink;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* Logger that forwards log messages to the client, *if* we're in a
|
/* Logger that forwards log messages to the client, *if* we're in a
|
||||||
state where the protocol allows it (i.e., when canSendStderr is
|
state where the protocol allows it (i.e., when canSendStderr is
|
||||||
true). */
|
true). */
|
||||||
|
@ -69,15 +84,14 @@ struct TunnelLogger : public Logger
|
||||||
|
|
||||||
Sync<State> state_;
|
Sync<State> state_;
|
||||||
|
|
||||||
void log(Verbosity lvl, const FormatOrString & fs) override
|
void enqueueMsg(const std::string & s)
|
||||||
{
|
{
|
||||||
if (lvl > verbosity) return;
|
|
||||||
|
|
||||||
auto state(state_.lock());
|
auto state(state_.lock());
|
||||||
|
|
||||||
if (state->canSendStderr) {
|
if (state->canSendStderr) {
|
||||||
|
assert(state->pendingMsgs.empty());
|
||||||
try {
|
try {
|
||||||
to << STDERR_NEXT << (fs.s + "\n");
|
to(s);
|
||||||
to.flush();
|
to.flush();
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
/* Write failed; that means that the other side is
|
/* Write failed; that means that the other side is
|
||||||
|
@ -86,7 +100,16 @@ struct TunnelLogger : public Logger
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
} else
|
} else
|
||||||
state->pendingMsgs.push_back(fs.s);
|
state->pendingMsgs.push_back(s);
|
||||||
|
}
|
||||||
|
|
||||||
|
void log(Verbosity lvl, const FormatOrString & fs) override
|
||||||
|
{
|
||||||
|
if (lvl > verbosity) return;
|
||||||
|
|
||||||
|
StringSink buf;
|
||||||
|
buf << STDERR_NEXT << (fs.s + "\n");
|
||||||
|
enqueueMsg(*buf.s);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* startWork() means that we're starting an operation for which we
|
/* startWork() means that we're starting an operation for which we
|
||||||
|
@ -99,7 +122,7 @@ struct TunnelLogger : public Logger
|
||||||
state->canSendStderr = true;
|
state->canSendStderr = true;
|
||||||
|
|
||||||
for (auto & msg : state->pendingMsgs)
|
for (auto & msg : state->pendingMsgs)
|
||||||
to << STDERR_NEXT << (msg + "\n");
|
to(msg);
|
||||||
|
|
||||||
state->pendingMsgs.clear();
|
state->pendingMsgs.clear();
|
||||||
|
|
||||||
|
@ -121,6 +144,28 @@ struct TunnelLogger : public Logger
|
||||||
if (status != 0) to << status;
|
if (status != 0) to << status;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void startActivity(ActivityId act, ActivityType type,
|
||||||
|
const std::string & s, const Fields & fields, ActivityId parent) override
|
||||||
|
{
|
||||||
|
StringSink buf;
|
||||||
|
buf << STDERR_START_ACTIVITY << act << type << s << fields << parent;
|
||||||
|
enqueueMsg(*buf.s);
|
||||||
|
}
|
||||||
|
|
||||||
|
void stopActivity(ActivityId act) override
|
||||||
|
{
|
||||||
|
StringSink buf;
|
||||||
|
buf << STDERR_STOP_ACTIVITY << act;
|
||||||
|
enqueueMsg(*buf.s);
|
||||||
|
}
|
||||||
|
|
||||||
|
void result(ActivityId act, ResultType type, const Fields & fields) override
|
||||||
|
{
|
||||||
|
StringSink buf;
|
||||||
|
buf << STDERR_RESULT << act << type << fields;
|
||||||
|
enqueueMsg(*buf.s);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -665,16 +710,15 @@ static void processConnection(bool trusted)
|
||||||
{
|
{
|
||||||
MonitorFdHup monitor(from.fd);
|
MonitorFdHup monitor(from.fd);
|
||||||
|
|
||||||
TunnelLogger tunnelLogger;
|
auto tunnelLogger = new TunnelLogger();
|
||||||
auto prevLogger = nix::logger;
|
auto prevLogger = nix::logger;
|
||||||
logger = &tunnelLogger;
|
logger = tunnelLogger;
|
||||||
|
|
||||||
unsigned int opCount = 0;
|
unsigned int opCount = 0;
|
||||||
|
|
||||||
Finally finally([&]() {
|
Finally finally([&]() {
|
||||||
logger = prevLogger;
|
|
||||||
_isInterrupted = false;
|
_isInterrupted = false;
|
||||||
debug("%d operations", opCount);
|
prevLogger->log(lvlDebug, fmt("%d operations", opCount));
|
||||||
});
|
});
|
||||||
|
|
||||||
/* Exchange the greeting. */
|
/* Exchange the greeting. */
|
||||||
|
@ -693,7 +737,7 @@ static void processConnection(bool trusted)
|
||||||
readInt(from); // obsolete reserveSpace
|
readInt(from); // obsolete reserveSpace
|
||||||
|
|
||||||
/* Send startup error messages to the client. */
|
/* Send startup error messages to the client. */
|
||||||
tunnelLogger.startWork();
|
tunnelLogger->startWork();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
@ -713,7 +757,7 @@ static void processConnection(bool trusted)
|
||||||
params["path-info-cache-size"] = "0";
|
params["path-info-cache-size"] = "0";
|
||||||
auto store = make_ref<LocalStore>(params);
|
auto store = make_ref<LocalStore>(params);
|
||||||
|
|
||||||
tunnelLogger.stopWork();
|
tunnelLogger->stopWork();
|
||||||
to.flush();
|
to.flush();
|
||||||
|
|
||||||
/* Process client requests. */
|
/* Process client requests. */
|
||||||
|
@ -730,28 +774,28 @@ static void processConnection(bool trusted)
|
||||||
opCount++;
|
opCount++;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
performOp(&tunnelLogger, store, trusted, clientVersion, from, to, op);
|
performOp(tunnelLogger, store, trusted, clientVersion, from, to, op);
|
||||||
} catch (Error & e) {
|
} catch (Error & e) {
|
||||||
/* If we're not in a state where we can send replies, then
|
/* If we're not in a state where we can send replies, then
|
||||||
something went wrong processing the input of the
|
something went wrong processing the input of the
|
||||||
client. This can happen especially if I/O errors occur
|
client. This can happen especially if I/O errors occur
|
||||||
during addTextToStore() / importPath(). If that
|
during addTextToStore() / importPath(). If that
|
||||||
happens, just send the error message and exit. */
|
happens, just send the error message and exit. */
|
||||||
bool errorAllowed = tunnelLogger.state_.lock()->canSendStderr;
|
bool errorAllowed = tunnelLogger->state_.lock()->canSendStderr;
|
||||||
tunnelLogger.stopWork(false, e.msg(), e.status);
|
tunnelLogger->stopWork(false, e.msg(), e.status);
|
||||||
if (!errorAllowed) throw;
|
if (!errorAllowed) throw;
|
||||||
} catch (std::bad_alloc & e) {
|
} catch (std::bad_alloc & e) {
|
||||||
tunnelLogger.stopWork(false, "Nix daemon out of memory", 1);
|
tunnelLogger->stopWork(false, "Nix daemon out of memory", 1);
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
to.flush();
|
to.flush();
|
||||||
|
|
||||||
assert(!tunnelLogger.state_.lock()->canSendStderr);
|
assert(!tunnelLogger->state_.lock()->canSendStderr);
|
||||||
};
|
};
|
||||||
|
|
||||||
} catch (std::exception & e) {
|
} catch (std::exception & e) {
|
||||||
tunnelLogger.stopWork(false, e.what(), 1);
|
tunnelLogger->stopWork(false, e.what(), 1);
|
||||||
to.flush();
|
to.flush();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue