forked from lix-project/lix
* Handle exceptions and stderr for all protocol functions.
* SIGIO -> SIGPOLL (POSIX calls it that). * Use sigaction instead of signal to register the SIGPOLL handler. Sigaction is better defined, and a handler registered with signal appears not to interrupt fcntl(..., F_SETLKW, ...), which is bad.
This commit is contained in:
parent
0130ef88ea
commit
40c3529909
2 changed files with 64 additions and 19 deletions
|
@ -147,6 +147,7 @@ bool RemoteStore::isValidPath(const Path & path)
|
||||||
{
|
{
|
||||||
writeInt(wopIsValidPath, to);
|
writeInt(wopIsValidPath, to);
|
||||||
writeString(path, to);
|
writeString(path, to);
|
||||||
|
processStderr();
|
||||||
unsigned int reply = readInt(from);
|
unsigned int reply = readInt(from);
|
||||||
return reply != 0;
|
return reply != 0;
|
||||||
}
|
}
|
||||||
|
@ -162,6 +163,7 @@ bool RemoteStore::hasSubstitutes(const Path & path)
|
||||||
{
|
{
|
||||||
writeInt(wopHasSubstitutes, to);
|
writeInt(wopHasSubstitutes, to);
|
||||||
writeString(path, to);
|
writeString(path, to);
|
||||||
|
processStderr();
|
||||||
unsigned int reply = readInt(from);
|
unsigned int reply = readInt(from);
|
||||||
return reply != 0;
|
return reply != 0;
|
||||||
}
|
}
|
||||||
|
@ -171,6 +173,7 @@ Hash RemoteStore::queryPathHash(const Path & path)
|
||||||
{
|
{
|
||||||
writeInt(wopQueryPathHash, to);
|
writeInt(wopQueryPathHash, to);
|
||||||
writeString(path, to);
|
writeString(path, to);
|
||||||
|
processStderr();
|
||||||
string hash = readString(from);
|
string hash = readString(from);
|
||||||
return parseHash(htSHA256, hash);
|
return parseHash(htSHA256, hash);
|
||||||
}
|
}
|
||||||
|
@ -181,6 +184,7 @@ void RemoteStore::queryReferences(const Path & path,
|
||||||
{
|
{
|
||||||
writeInt(wopQueryReferences, to);
|
writeInt(wopQueryReferences, to);
|
||||||
writeString(path, to);
|
writeString(path, to);
|
||||||
|
processStderr();
|
||||||
PathSet references2 = readStringSet(from);
|
PathSet references2 = readStringSet(from);
|
||||||
references.insert(references2.begin(), references2.end());
|
references.insert(references2.begin(), references2.end());
|
||||||
}
|
}
|
||||||
|
@ -191,6 +195,7 @@ void RemoteStore::queryReferrers(const Path & path,
|
||||||
{
|
{
|
||||||
writeInt(wopQueryReferrers, to);
|
writeInt(wopQueryReferrers, to);
|
||||||
writeString(path, to);
|
writeString(path, to);
|
||||||
|
processStderr();
|
||||||
PathSet referrers2 = readStringSet(from);
|
PathSet referrers2 = readStringSet(from);
|
||||||
referrers.insert(referrers2.begin(), referrers2.end());
|
referrers.insert(referrers2.begin(), referrers2.end());
|
||||||
}
|
}
|
||||||
|
@ -207,6 +212,7 @@ Path RemoteStore::addToStore(const Path & _srcPath, bool fixed,
|
||||||
writeInt(recursive ? 1 : 0, to);
|
writeInt(recursive ? 1 : 0, to);
|
||||||
writeString(hashAlgo, to);
|
writeString(hashAlgo, to);
|
||||||
dumpPath(srcPath, to);
|
dumpPath(srcPath, to);
|
||||||
|
processStderr();
|
||||||
Path path = readString(from);
|
Path path = readString(from);
|
||||||
return path;
|
return path;
|
||||||
}
|
}
|
||||||
|
@ -220,6 +226,7 @@ Path RemoteStore::addTextToStore(const string & suffix, const string & s,
|
||||||
writeString(s, to);
|
writeString(s, to);
|
||||||
writeStringSet(references, to);
|
writeStringSet(references, to);
|
||||||
|
|
||||||
|
processStderr();
|
||||||
Path path = readString(from);
|
Path path = readString(from);
|
||||||
return path;
|
return path;
|
||||||
}
|
}
|
||||||
|
@ -238,6 +245,7 @@ void RemoteStore::ensurePath(const Path & path)
|
||||||
{
|
{
|
||||||
writeInt(wopEnsurePath, to);
|
writeInt(wopEnsurePath, to);
|
||||||
writeString(path, to);
|
writeString(path, to);
|
||||||
|
processStderr();
|
||||||
readInt(from);
|
readInt(from);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -246,6 +254,7 @@ void RemoteStore::addTempRoot(const Path & path)
|
||||||
{
|
{
|
||||||
writeInt(wopAddTempRoot, to);
|
writeInt(wopAddTempRoot, to);
|
||||||
writeString(path, to);
|
writeString(path, to);
|
||||||
|
processStderr();
|
||||||
readInt(from);
|
readInt(from);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -253,6 +262,7 @@ void RemoteStore::addTempRoot(const Path & path)
|
||||||
void RemoteStore::syncWithGC()
|
void RemoteStore::syncWithGC()
|
||||||
{
|
{
|
||||||
writeInt(wopSyncWithGC, to);
|
writeInt(wopSyncWithGC, to);
|
||||||
|
processStderr();
|
||||||
readInt(from);
|
readInt(from);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -61,28 +61,39 @@ static void tunnelStderr(const unsigned char * buf, size_t count)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* A SIGIO signal is received when data is available on the client
|
/* A SIGPOLL signal is received when data is available on the client
|
||||||
communication scoket, or when the client has closed its side of the
|
communication scoket, or when the client has closed its side of the
|
||||||
socket. This handler is enabled at precisely those moments in the
|
socket. This handler is enabled at precisely those moments in the
|
||||||
protocol when we're doing work and the client is supposed to be
|
protocol when we're doing work and the client is supposed to be
|
||||||
quiet. Thus, if we get a SIGIO signal, it means that the client
|
quiet. Thus, if we get a SIGPOLL signal, it means that the client
|
||||||
has quit. So we should quit as well.
|
has quit. So we should quit as well.
|
||||||
|
|
||||||
Too bad most operating systems don't support the POLL_HUP value for
|
Too bad most operating systems don't support the POLL_HUP value for
|
||||||
si_code in siginfo_t. That would make most of the SIGIO complexity
|
si_code in siginfo_t. That would make most of the SIGPOLL
|
||||||
unnecessary, i.e., we could just enable SIGIO all the time and
|
complexity unnecessary, i.e., we could just enable SIGPOLL all the
|
||||||
wouldn't have to worry about races. */
|
time and wouldn't have to worry about races. */
|
||||||
static void sigioHandler(int sigNo)
|
static void sigioHandler(int sigNo)
|
||||||
{
|
{
|
||||||
if (!blockInt) {
|
if (!blockInt) {
|
||||||
_isInterrupted = 1;
|
_isInterrupted = 1;
|
||||||
blockInt = 1;
|
blockInt = 1;
|
||||||
canSendStderr = false;
|
canSendStderr = false;
|
||||||
write(STDERR_FILENO, "SIGIO\n", 6);
|
write(STDERR_FILENO, "SIGPOLL\n", 8);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void setSigPollAction(bool ignore)
|
||||||
|
{
|
||||||
|
struct sigaction act, oact;
|
||||||
|
act.sa_handler = ignore ? SIG_IGN : sigioHandler;
|
||||||
|
sigfillset(&act.sa_mask);
|
||||||
|
act.sa_flags = 0;
|
||||||
|
if (sigaction(SIGPOLL, &act, &oact))
|
||||||
|
throw SysError("setting handler for SIGPOLL");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* startWork() means that we're starting an operation for which we
|
/* startWork() means that we're starting an operation for which we
|
||||||
want to send out stderr to the client. */
|
want to send out stderr to the client. */
|
||||||
static void startWork()
|
static void startWork()
|
||||||
|
@ -90,13 +101,12 @@ static void startWork()
|
||||||
canSendStderr = true;
|
canSendStderr = true;
|
||||||
|
|
||||||
/* Handle client death asynchronously. */
|
/* Handle client death asynchronously. */
|
||||||
if (signal(SIGIO, sigioHandler) == SIG_ERR)
|
setSigPollAction(false);
|
||||||
throw SysError("setting handler for SIGIO");
|
|
||||||
|
|
||||||
/* Of course, there is a race condition here: the socket could
|
/* Of course, there is a race condition here: the socket could
|
||||||
have closed between when we last read from / wrote to it, and
|
have closed between when we last read from / wrote to it, and
|
||||||
between the time we set the handler for SIGIO. In that case we
|
between the time we set the handler for SIGPOLL. In that case
|
||||||
won't get the signal. So do a non-blocking select() to find
|
we won't get the signal. So do a non-blocking select() to find
|
||||||
out if any input is available on the socket. If there is, it
|
out if any input is available on the socket. If there is, it
|
||||||
has to be the 0-byte read that indicates that the socket has
|
has to be the 0-byte read that indicates that the socket has
|
||||||
closed. */
|
closed. */
|
||||||
|
@ -128,8 +138,7 @@ static void stopWork(bool success = true, const string & msg = "")
|
||||||
/* Stop handling async client death; we're going to a state where
|
/* Stop handling async client death; we're going to a state where
|
||||||
we're either sending or receiving from the client, so we'll be
|
we're either sending or receiving from the client, so we'll be
|
||||||
notified of client death anyway. */
|
notified of client death anyway. */
|
||||||
if (signal(SIGIO, SIG_IGN) == SIG_ERR)
|
setSigPollAction(true);
|
||||||
throw SysError("ignoring SIGIO");
|
|
||||||
|
|
||||||
canSendStderr = false;
|
canSendStderr = false;
|
||||||
|
|
||||||
|
@ -157,30 +166,41 @@ static void performOp(Source & from, Sink & to, unsigned int op)
|
||||||
|
|
||||||
case wopIsValidPath: {
|
case wopIsValidPath: {
|
||||||
Path path = readStorePath(from);
|
Path path = readStorePath(from);
|
||||||
writeInt(store->isValidPath(path), to);
|
startWork();
|
||||||
|
bool result = store->isValidPath(path);
|
||||||
|
stopWork();
|
||||||
|
writeInt(result, to);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case wopHasSubstitutes: {
|
case wopHasSubstitutes: {
|
||||||
Path path = readStorePath(from);
|
Path path = readStorePath(from);
|
||||||
writeInt(store->hasSubstitutes(path), to);
|
startWork();
|
||||||
|
bool result = store->hasSubstitutes(path);
|
||||||
|
stopWork();
|
||||||
|
writeInt(result, to);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case wopQueryPathHash: {
|
case wopQueryPathHash: {
|
||||||
Path path = readStorePath(from);
|
Path path = readStorePath(from);
|
||||||
writeString(printHash(store->queryPathHash(path)), to);
|
startWork();
|
||||||
|
Hash hash = store->queryPathHash(path);
|
||||||
|
stopWork();
|
||||||
|
writeString(printHash(hash), to);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case wopQueryReferences:
|
case wopQueryReferences:
|
||||||
case wopQueryReferrers: {
|
case wopQueryReferrers: {
|
||||||
Path path = readStorePath(from);
|
Path path = readStorePath(from);
|
||||||
|
startWork();
|
||||||
PathSet paths;
|
PathSet paths;
|
||||||
if (op == wopQueryReferences)
|
if (op == wopQueryReferences)
|
||||||
store->queryReferences(path, paths);
|
store->queryReferences(path, paths);
|
||||||
else
|
else
|
||||||
store->queryReferrers(path, paths);
|
store->queryReferrers(path, paths);
|
||||||
|
stopWork();
|
||||||
writeStringSet(paths, to);
|
writeStringSet(paths, to);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -196,7 +216,11 @@ static void performOp(Source & from, Sink & to, unsigned int op)
|
||||||
Path tmp2 = tmp + "/" + baseName;
|
Path tmp2 = tmp + "/" + baseName;
|
||||||
restorePath(tmp2, from);
|
restorePath(tmp2, from);
|
||||||
|
|
||||||
writeString(store->addToStore(tmp2, fixed, recursive, hashAlgo), to);
|
startWork();
|
||||||
|
Path path = store->addToStore(tmp2, fixed, recursive, hashAlgo);
|
||||||
|
stopWork();
|
||||||
|
|
||||||
|
writeString(path, to);
|
||||||
|
|
||||||
deletePath(tmp);
|
deletePath(tmp);
|
||||||
break;
|
break;
|
||||||
|
@ -206,7 +230,10 @@ static void performOp(Source & from, Sink & to, unsigned int op)
|
||||||
string suffix = readString(from);
|
string suffix = readString(from);
|
||||||
string s = readString(from);
|
string s = readString(from);
|
||||||
PathSet refs = readStorePaths(from);
|
PathSet refs = readStorePaths(from);
|
||||||
writeString(store->addTextToStore(suffix, s, refs), to);
|
startWork();
|
||||||
|
Path path = store->addTextToStore(suffix, s, refs);
|
||||||
|
stopWork();
|
||||||
|
writeString(path, to);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -221,20 +248,26 @@ static void performOp(Source & from, Sink & to, unsigned int op)
|
||||||
|
|
||||||
case wopEnsurePath: {
|
case wopEnsurePath: {
|
||||||
Path path = readStorePath(from);
|
Path path = readStorePath(from);
|
||||||
|
startWork();
|
||||||
store->ensurePath(path);
|
store->ensurePath(path);
|
||||||
|
stopWork();
|
||||||
writeInt(1, to);
|
writeInt(1, to);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case wopAddTempRoot: {
|
case wopAddTempRoot: {
|
||||||
Path path = readStorePath(from);
|
Path path = readStorePath(from);
|
||||||
|
startWork();
|
||||||
store->addTempRoot(path);
|
store->addTempRoot(path);
|
||||||
|
stopWork();
|
||||||
writeInt(1, to);
|
writeInt(1, to);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case wopSyncWithGC: {
|
case wopSyncWithGC: {
|
||||||
|
startWork();
|
||||||
store->syncWithGC();
|
store->syncWithGC();
|
||||||
|
stopWork();
|
||||||
writeInt(1, to);
|
writeInt(1, to);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -250,8 +283,8 @@ static void processConnection()
|
||||||
canSendStderr = false;
|
canSendStderr = false;
|
||||||
writeToStderr = tunnelStderr;
|
writeToStderr = tunnelStderr;
|
||||||
|
|
||||||
/* Allow us to receive SIGIO for events on the client socket. */
|
/* Allow us to receive SIGPOLL for events on the client socket. */
|
||||||
signal(SIGIO, SIG_IGN);
|
setSigPollAction(true);
|
||||||
if (fcntl(from.fd, F_SETOWN, getpid()) == -1)
|
if (fcntl(from.fd, F_SETOWN, getpid()) == -1)
|
||||||
throw SysError("F_SETOWN");
|
throw SysError("F_SETOWN");
|
||||||
if (fcntl(from.fd, F_SETFL, fcntl(from.fd, F_GETFL, 0) | FASYNC) == -1)
|
if (fcntl(from.fd, F_SETFL, fcntl(from.fd, F_GETFL, 0) | FASYNC) == -1)
|
||||||
|
@ -301,6 +334,8 @@ static void processConnection()
|
||||||
} catch (Error & e) {
|
} catch (Error & e) {
|
||||||
stopWork(false, e.msg());
|
stopWork(false, e.msg());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert(!canSendStderr);
|
||||||
};
|
};
|
||||||
|
|
||||||
printMsg(lvlError, format("%1% worker operations") % opCount);
|
printMsg(lvlError, format("%1% worker operations") % opCount);
|
||||||
|
|
Loading…
Reference in a new issue