Make TunnelLogger thread-safe

Now that we use threads in lots of places, it's possible for
TunnelLogger::log() to be called asynchronously from other threads
than the main loop. So we need to ensure that STDERR_NEXT messages
don't clobber other messages.
This commit is contained in:
Eelco Dolstra 2017-08-28 14:17:07 +02:00
parent 94a0548dc4
commit 8fff3e7bb5
No known key found for this signature in database
GPG key ID: 8170B4726D7198DE

View file

@ -8,6 +8,7 @@
#include "globals.hh" #include "globals.hh"
#include "monitor-fd.hh" #include "monitor-fd.hh"
#include "derivations.hh" #include "derivations.hh"
#include "finally.hh"
#include <algorithm> #include <algorithm>
@ -54,49 +55,64 @@ static ssize_t splice(int fd_in, void *off_in, int fd_out, void *off_out, size_t
static FdSource from(STDIN_FILENO); static FdSource from(STDIN_FILENO);
static FdSink to(STDOUT_FILENO); static FdSink to(STDOUT_FILENO);
static bool canSendStderr;
static Logger * defaultLogger;
/* Logger that forwards log messages to the client, *if* we're in a /* Logger that forwards log messages to the client, *if* we're in a
state where the protocol allows it (i.e., when canSendStderr is state where the protocol allows it (i.e., when canSendStderr is
true). */ true). */
class TunnelLogger : public Logger struct TunnelLogger : public Logger
{ {
struct State
{
bool canSendStderr = false;
std::vector<std::string> pendingMsgs;
};
Sync<State> state_;
void log(Verbosity lvl, const FormatOrString & fs) override void log(Verbosity lvl, const FormatOrString & fs) override
{ {
if (lvl > verbosity) return; if (lvl > verbosity) return;
if (canSendStderr) { auto state(state_.lock());
if (state->canSendStderr) {
try { try {
to << STDERR_NEXT << (fs.s + "\n"); to << STDERR_NEXT << (fs.s + "\n");
to.flush(); to.flush();
} catch (...) { } catch (...) {
/* Write failed; that means that the other side is /* Write failed; that means that the other side is
gone. */ gone. */
canSendStderr = false; state->canSendStderr = false;
throw; throw;
} }
} else } else
defaultLogger->log(lvl, fs); state->pendingMsgs.push_back(fs.s);
} }
};
/* startWork() means that we're starting an operation for which we /* startWork() means that we're starting an operation for which we
want to send out stderr to the client. */ want to send out stderr to the client. */
static void startWork() void startWork()
{ {
canSendStderr = true; std::vector<std::string> pendingMsgs;
}
auto state(state_.lock());
state->canSendStderr = true;
for (auto & msg : state->pendingMsgs)
to << STDERR_NEXT << (msg + "\n");
state->pendingMsgs.clear();
to.flush();
}
/* stopWork() means that we're done; stop sending stderr to the /* stopWork() means that we're done; stop sending stderr to the
client. */ client. */
static void stopWork(bool success = true, const string & msg = "", unsigned int status = 0) void stopWork(bool success = true, const string & msg = "", unsigned int status = 0)
{ {
canSendStderr = false; auto state(state_.lock());
state->canSendStderr = false;
if (success) if (success)
to << STDERR_LAST; to << STDERR_LAST;
@ -105,6 +121,7 @@ static void stopWork(bool success = true, const string & msg = "", unsigned int
if (status != 0) to << status; if (status != 0) to << status;
} }
} }
};
struct TunnelSink : Sink struct TunnelSink : Sink
@ -160,7 +177,8 @@ struct RetrieveRegularNARSink : ParseSink
}; };
static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVersion, static void performOp(TunnelLogger * logger, ref<LocalStore> store,
bool trusted, unsigned int clientVersion,
Source & from, Sink & to, unsigned int op) Source & from, Sink & to, unsigned int op)
{ {
switch (op) { switch (op) {
@ -172,46 +190,46 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
that the 'Error' exception handler doesn't close the that the 'Error' exception handler doesn't close the
connection. */ connection. */
Path path = readString(from); Path path = readString(from);
startWork(); logger->startWork();
store->assertStorePath(path); store->assertStorePath(path);
bool result = store->isValidPath(path); bool result = store->isValidPath(path);
stopWork(); logger->stopWork();
to << result; to << result;
break; break;
} }
case wopQueryValidPaths: { case wopQueryValidPaths: {
PathSet paths = readStorePaths<PathSet>(*store, from); PathSet paths = readStorePaths<PathSet>(*store, from);
startWork(); logger->startWork();
PathSet res = store->queryValidPaths(paths); PathSet res = store->queryValidPaths(paths);
stopWork(); logger->stopWork();
to << res; to << res;
break; break;
} }
case wopHasSubstitutes: { case wopHasSubstitutes: {
Path path = readStorePath(*store, from); Path path = readStorePath(*store, from);
startWork(); logger->startWork();
PathSet res = store->querySubstitutablePaths({path}); PathSet res = store->querySubstitutablePaths({path});
stopWork(); logger->stopWork();
to << (res.find(path) != res.end()); to << (res.find(path) != res.end());
break; break;
} }
case wopQuerySubstitutablePaths: { case wopQuerySubstitutablePaths: {
PathSet paths = readStorePaths<PathSet>(*store, from); PathSet paths = readStorePaths<PathSet>(*store, from);
startWork(); logger->startWork();
PathSet res = store->querySubstitutablePaths(paths); PathSet res = store->querySubstitutablePaths(paths);
stopWork(); logger->stopWork();
to << res; to << res;
break; break;
} }
case wopQueryPathHash: { case wopQueryPathHash: {
Path path = readStorePath(*store, from); Path path = readStorePath(*store, from);
startWork(); logger->startWork();
auto hash = store->queryPathInfo(path)->narHash; auto hash = store->queryPathInfo(path)->narHash;
stopWork(); logger->stopWork();
to << hash.to_string(Base16, false); to << hash.to_string(Base16, false);
break; break;
} }
@ -221,7 +239,7 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
case wopQueryValidDerivers: case wopQueryValidDerivers:
case wopQueryDerivationOutputs: { case wopQueryDerivationOutputs: {
Path path = readStorePath(*store, from); Path path = readStorePath(*store, from);
startWork(); logger->startWork();
PathSet paths; PathSet paths;
if (op == wopQueryReferences) if (op == wopQueryReferences)
paths = store->queryPathInfo(path)->references; paths = store->queryPathInfo(path)->references;
@ -230,35 +248,35 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
else if (op == wopQueryValidDerivers) else if (op == wopQueryValidDerivers)
paths = store->queryValidDerivers(path); paths = store->queryValidDerivers(path);
else paths = store->queryDerivationOutputs(path); else paths = store->queryDerivationOutputs(path);
stopWork(); logger->stopWork();
to << paths; to << paths;
break; break;
} }
case wopQueryDerivationOutputNames: { case wopQueryDerivationOutputNames: {
Path path = readStorePath(*store, from); Path path = readStorePath(*store, from);
startWork(); logger->startWork();
StringSet names; StringSet names;
names = store->queryDerivationOutputNames(path); names = store->queryDerivationOutputNames(path);
stopWork(); logger->stopWork();
to << names; to << names;
break; break;
} }
case wopQueryDeriver: { case wopQueryDeriver: {
Path path = readStorePath(*store, from); Path path = readStorePath(*store, from);
startWork(); logger->startWork();
auto deriver = store->queryPathInfo(path)->deriver; auto deriver = store->queryPathInfo(path)->deriver;
stopWork(); logger->stopWork();
to << deriver; to << deriver;
break; break;
} }
case wopQueryPathFromHashPart: { case wopQueryPathFromHashPart: {
string hashPart = readString(from); string hashPart = readString(from);
startWork(); logger->startWork();
Path path = store->queryPathFromHashPart(hashPart); Path path = store->queryPathFromHashPart(hashPart);
stopWork(); logger->stopWork();
to << path; to << path;
break; break;
} }
@ -286,10 +304,10 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
} else } else
parseDump(savedRegular, from); parseDump(savedRegular, from);
startWork(); logger->startWork();
if (!savedRegular.regular) throw Error("regular file expected"); if (!savedRegular.regular) throw Error("regular file expected");
Path path = store->addToStoreFromDump(recursive ? *savedNAR.data : savedRegular.s, baseName, recursive, hashAlgo); Path path = store->addToStoreFromDump(recursive ? *savedNAR.data : savedRegular.s, baseName, recursive, hashAlgo);
stopWork(); logger->stopWork();
to << path; to << path;
break; break;
@ -299,9 +317,9 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
string suffix = readString(from); string suffix = readString(from);
string s = readString(from); string s = readString(from);
PathSet refs = readStorePaths<PathSet>(*store, from); PathSet refs = readStorePaths<PathSet>(*store, from);
startWork(); logger->startWork();
Path path = store->addTextToStore(suffix, s, refs, NoRepair); Path path = store->addTextToStore(suffix, s, refs, NoRepair);
stopWork(); logger->stopWork();
to << path; to << path;
break; break;
} }
@ -309,20 +327,20 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
case wopExportPath: { case wopExportPath: {
Path path = readStorePath(*store, from); Path path = readStorePath(*store, from);
readInt(from); // obsolete readInt(from); // obsolete
startWork(); logger->startWork();
TunnelSink sink(to); TunnelSink sink(to);
store->exportPath(path, sink); store->exportPath(path, sink);
stopWork(); logger->stopWork();
to << 1; to << 1;
break; break;
} }
case wopImportPaths: { case wopImportPaths: {
startWork(); logger->startWork();
TunnelSource source(from); TunnelSource source(from);
Paths paths = store->importPaths(source, nullptr, Paths paths = store->importPaths(source, nullptr,
trusted ? NoCheckSigs : CheckSigs); trusted ? NoCheckSigs : CheckSigs);
stopWork(); logger->stopWork();
to << paths; to << paths;
break; break;
} }
@ -338,9 +356,9 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
if (mode == bmRepair && !trusted) if (mode == bmRepair && !trusted)
throw Error("repairing is not supported when building through the Nix daemon"); throw Error("repairing is not supported when building through the Nix daemon");
} }
startWork(); logger->startWork();
store->buildPaths(drvs, mode); store->buildPaths(drvs, mode);
stopWork(); logger->stopWork();
to << 1; to << 1;
break; break;
} }
@ -350,54 +368,54 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
BasicDerivation drv; BasicDerivation drv;
readDerivation(from, *store, drv); readDerivation(from, *store, drv);
BuildMode buildMode = (BuildMode) readInt(from); BuildMode buildMode = (BuildMode) readInt(from);
startWork(); logger->startWork();
if (!trusted) if (!trusted)
throw Error("you are not privileged to build derivations"); throw Error("you are not privileged to build derivations");
auto res = store->buildDerivation(drvPath, drv, buildMode); auto res = store->buildDerivation(drvPath, drv, buildMode);
stopWork(); logger->stopWork();
to << res.status << res.errorMsg; to << res.status << res.errorMsg;
break; break;
} }
case wopEnsurePath: { case wopEnsurePath: {
Path path = readStorePath(*store, from); Path path = readStorePath(*store, from);
startWork(); logger->startWork();
store->ensurePath(path); store->ensurePath(path);
stopWork(); logger->stopWork();
to << 1; to << 1;
break; break;
} }
case wopAddTempRoot: { case wopAddTempRoot: {
Path path = readStorePath(*store, from); Path path = readStorePath(*store, from);
startWork(); logger->startWork();
store->addTempRoot(path); store->addTempRoot(path);
stopWork(); logger->stopWork();
to << 1; to << 1;
break; break;
} }
case wopAddIndirectRoot: { case wopAddIndirectRoot: {
Path path = absPath(readString(from)); Path path = absPath(readString(from));
startWork(); logger->startWork();
store->addIndirectRoot(path); store->addIndirectRoot(path);
stopWork(); logger->stopWork();
to << 1; to << 1;
break; break;
} }
case wopSyncWithGC: { case wopSyncWithGC: {
startWork(); logger->startWork();
store->syncWithGC(); store->syncWithGC();
stopWork(); logger->stopWork();
to << 1; to << 1;
break; break;
} }
case wopFindRoots: { case wopFindRoots: {
startWork(); logger->startWork();
Roots roots = store->findRoots(); Roots roots = store->findRoots();
stopWork(); logger->stopWork();
to << roots.size(); to << roots.size();
for (auto & i : roots) for (auto & i : roots)
to << i.first << i.second; to << i.first << i.second;
@ -416,11 +434,11 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
GCResults results; GCResults results;
startWork(); logger->startWork();
if (options.ignoreLiveness) if (options.ignoreLiveness)
throw Error("you are not allowed to ignore liveness"); throw Error("you are not allowed to ignore liveness");
store->collectGarbage(options, results); store->collectGarbage(options, results);
stopWork(); logger->stopWork();
to << results.paths << results.bytesFreed << 0 /* obsolete */; to << results.paths << results.bytesFreed << 0 /* obsolete */;
@ -451,7 +469,7 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
} }
} }
startWork(); logger->startWork();
for (auto & i : overrides) { for (auto & i : overrides) {
auto & name(i.first); auto & name(i.first);
@ -492,16 +510,16 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
} }
} }
stopWork(); logger->stopWork();
break; break;
} }
case wopQuerySubstitutablePathInfo: { case wopQuerySubstitutablePathInfo: {
Path path = absPath(readString(from)); Path path = absPath(readString(from));
startWork(); logger->startWork();
SubstitutablePathInfos infos; SubstitutablePathInfos infos;
store->querySubstitutablePathInfos({path}, infos); store->querySubstitutablePathInfos({path}, infos);
stopWork(); logger->stopWork();
SubstitutablePathInfos::iterator i = infos.find(path); SubstitutablePathInfos::iterator i = infos.find(path);
if (i == infos.end()) if (i == infos.end())
to << 0; to << 0;
@ -513,10 +531,10 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
case wopQuerySubstitutablePathInfos: { case wopQuerySubstitutablePathInfos: {
PathSet paths = readStorePaths<PathSet>(*store, from); PathSet paths = readStorePaths<PathSet>(*store, from);
startWork(); logger->startWork();
SubstitutablePathInfos infos; SubstitutablePathInfos infos;
store->querySubstitutablePathInfos(paths, infos); store->querySubstitutablePathInfos(paths, infos);
stopWork(); logger->stopWork();
to << infos.size(); to << infos.size();
for (auto & i : infos) { for (auto & i : infos) {
to << i.first << i.second.deriver << i.second.references to << i.first << i.second.deriver << i.second.references
@ -526,9 +544,9 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
} }
case wopQueryAllValidPaths: { case wopQueryAllValidPaths: {
startWork(); logger->startWork();
PathSet paths = store->queryAllValidPaths(); PathSet paths = store->queryAllValidPaths();
stopWork(); logger->stopWork();
to << paths; to << paths;
break; break;
} }
@ -536,13 +554,13 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
case wopQueryPathInfo: { case wopQueryPathInfo: {
Path path = readStorePath(*store, from); Path path = readStorePath(*store, from);
std::shared_ptr<const ValidPathInfo> info; std::shared_ptr<const ValidPathInfo> info;
startWork(); logger->startWork();
try { try {
info = store->queryPathInfo(path); info = store->queryPathInfo(path);
} catch (InvalidPath &) { } catch (InvalidPath &) {
if (GET_PROTOCOL_MINOR(clientVersion) < 17) throw; if (GET_PROTOCOL_MINOR(clientVersion) < 17) throw;
} }
stopWork(); logger->stopWork();
if (info) { if (info) {
if (GET_PROTOCOL_MINOR(clientVersion) >= 17) if (GET_PROTOCOL_MINOR(clientVersion) >= 17)
to << 1; to << 1;
@ -561,20 +579,20 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
} }
case wopOptimiseStore: case wopOptimiseStore:
startWork(); logger->startWork();
store->optimiseStore(); store->optimiseStore();
stopWork(); logger->stopWork();
to << 1; to << 1;
break; break;
case wopVerifyStore: { case wopVerifyStore: {
bool checkContents, repair; bool checkContents, repair;
from >> checkContents >> repair; from >> checkContents >> repair;
startWork(); logger->startWork();
if (repair && !trusted) if (repair && !trusted)
throw Error("you are not privileged to repair paths"); throw Error("you are not privileged to repair paths");
bool errors = store->verifyStore(checkContents, (RepairFlag) repair); bool errors = store->verifyStore(checkContents, (RepairFlag) repair);
stopWork(); logger->stopWork();
to << errors; to << errors;
break; break;
} }
@ -582,19 +600,19 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
case wopAddSignatures: { case wopAddSignatures: {
Path path = readStorePath(*store, from); Path path = readStorePath(*store, from);
StringSet sigs = readStrings<StringSet>(from); StringSet sigs = readStrings<StringSet>(from);
startWork(); logger->startWork();
if (!trusted) if (!trusted)
throw Error("you are not privileged to add signatures"); throw Error("you are not privileged to add signatures");
store->addSignatures(path, sigs); store->addSignatures(path, sigs);
stopWork(); logger->stopWork();
to << 1; to << 1;
break; break;
} }
case wopNarFromPath: { case wopNarFromPath: {
auto path = readStorePath(*store, from); auto path = readStorePath(*store, from);
startWork(); logger->startWork();
stopWork(); logger->stopWork();
dumpPath(path, to); dumpPath(path, to);
break; break;
} }
@ -619,20 +637,20 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
TeeSink tee(from); TeeSink tee(from);
parseDump(tee, tee.source); parseDump(tee, tee.source);
startWork(); logger->startWork();
store->addToStore(info, tee.source.data, (RepairFlag) repair, store->addToStore(info, tee.source.data, (RepairFlag) repair,
dontCheckSigs ? NoCheckSigs : CheckSigs, nullptr); dontCheckSigs ? NoCheckSigs : CheckSigs, nullptr);
stopWork(); logger->stopWork();
break; break;
} }
case wopQueryMissing: { case wopQueryMissing: {
PathSet targets = readStorePaths<PathSet>(*store, from); PathSet targets = readStorePaths<PathSet>(*store, from);
startWork(); logger->startWork();
PathSet willBuild, willSubstitute, unknown; PathSet willBuild, willSubstitute, unknown;
unsigned long long downloadSize, narSize; unsigned long long downloadSize, narSize;
store->queryMissing(targets, willBuild, willSubstitute, unknown, downloadSize, narSize); store->queryMissing(targets, willBuild, willSubstitute, unknown, downloadSize, narSize);
stopWork(); logger->stopWork();
to << willBuild << willSubstitute << unknown << downloadSize << narSize; to << willBuild << willSubstitute << unknown << downloadSize << narSize;
break; break;
} }
@ -647,9 +665,17 @@ static void processConnection(bool trusted)
{ {
MonitorFdHup monitor(from.fd); MonitorFdHup monitor(from.fd);
canSendStderr = false; TunnelLogger tunnelLogger;
defaultLogger = logger; auto prevLogger = nix::logger;
logger = new TunnelLogger(); logger = &tunnelLogger;
unsigned int opCount = 0;
Finally finally([&]() {
logger = prevLogger;
_isInterrupted = false;
debug("%d operations", opCount);
});
/* Exchange the greeting. */ /* Exchange the greeting. */
unsigned int magic = readInt(from); unsigned int magic = readInt(from);
@ -667,7 +693,7 @@ static void processConnection(bool trusted)
readInt(from); // obsolete reserveSpace readInt(from); // obsolete reserveSpace
/* Send startup error messages to the client. */ /* Send startup error messages to the client. */
startWork(); tunnelLogger.startWork();
try { try {
@ -687,12 +713,10 @@ static void processConnection(bool trusted)
params["path-info-cache-size"] = "0"; params["path-info-cache-size"] = "0";
auto store = make_ref<LocalStore>(params); auto store = make_ref<LocalStore>(params);
stopWork(); tunnelLogger.stopWork();
to.flush(); to.flush();
/* Process client requests. */ /* Process client requests. */
unsigned int opCount = 0;
while (true) { while (true) {
WorkerOp op; WorkerOp op;
try { try {
@ -706,32 +730,28 @@ static void processConnection(bool trusted)
opCount++; opCount++;
try { try {
performOp(store, trusted, clientVersion, from, to, op); performOp(&tunnelLogger, store, trusted, clientVersion, from, to, op);
} catch (Error & e) { } catch (Error & e) {
/* If we're not in a state where we can send replies, then /* If we're not in a state where we can send replies, then
something went wrong processing the input of the something went wrong processing the input of the
client. This can happen especially if I/O errors occur client. This can happen especially if I/O errors occur
during addTextToStore() / importPath(). If that during addTextToStore() / importPath(). If that
happens, just send the error message and exit. */ happens, just send the error message and exit. */
bool errorAllowed = canSendStderr; bool errorAllowed = tunnelLogger.state_.lock()->canSendStderr;
stopWork(false, e.msg(), e.status); tunnelLogger.stopWork(false, e.msg(), e.status);
if (!errorAllowed) throw; if (!errorAllowed) throw;
} catch (std::bad_alloc & e) { } catch (std::bad_alloc & e) {
stopWork(false, "Nix daemon out of memory", 1); tunnelLogger.stopWork(false, "Nix daemon out of memory", 1);
throw; throw;
} }
to.flush(); to.flush();
assert(!canSendStderr); assert(!tunnelLogger.state_.lock()->canSendStderr);
}; };
canSendStderr = false; } catch (std::exception & e) {
_isInterrupted = false; tunnelLogger.stopWork(false, e.what(), 1);
debug(format("%1% operations") % opCount);
} catch (Error & e) {
stopWork(false, e.msg(), 1);
to.flush(); to.flush();
return; return;
} }