forked from lix-project/lix
nix-daemon: Use a thread instead of SIGPOLL to catch client disconnects
The thread calls poll() to wait until a HUP (or other error event) happens on the client connection. If so, it sends SIGINT to the main thread, which is then cleaned up normally. This is much nicer than messing around with SIGPOLL.
This commit is contained in:
parent
fdee1ced43
commit
49fe9592a4
3 changed files with 51 additions and 146 deletions
1
local.mk
1
local.mk
|
@ -5,6 +5,7 @@ endif
|
||||||
dist-files += configure config.h.in nix.spec
|
dist-files += configure config.h.in nix.spec
|
||||||
|
|
||||||
GLOBAL_CXXFLAGS += -I . -I src -I src/libutil -I src/libstore -I src/libmain -I src/libexpr
|
GLOBAL_CXXFLAGS += -I . -I src -I src/libutil -I src/libstore -I src/libmain -I src/libexpr
|
||||||
|
GLOBAL_LDFLAGS += -pthread
|
||||||
|
|
||||||
$(foreach i, config.h $(call rwildcard, src/lib*, *.hh), $(eval $(call install-file-in, $(i), $(includedir)/nix, 0644)))
|
$(foreach i, config.h $(call rwildcard, src/lib*, *.hh), $(eval $(call install-file-in, $(i), $(includedir)/nix, 0644)))
|
||||||
|
|
||||||
|
|
43
src/libutil/monitor-fd.hh
Normal file
43
src/libutil/monitor-fd.hh
Normal file
|
@ -0,0 +1,43 @@
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
|
#include <poll.h>
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <signal.h>
|
||||||
|
|
||||||
|
namespace nix {
|
||||||
|
|
||||||
|
|
||||||
|
class MonitorFdHup
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
std::thread thread;
|
||||||
|
|
||||||
|
public:
|
||||||
|
MonitorFdHup(int fd)
|
||||||
|
{
|
||||||
|
thread = std::thread([&]() {
|
||||||
|
/* Wait indefinitely until a POLLHUP occurs. */
|
||||||
|
struct pollfd fds[1];
|
||||||
|
fds[0].fd = fd;
|
||||||
|
fds[0].events = 0;
|
||||||
|
if (poll(fds, 1, -1) == -1) {
|
||||||
|
if (errno != EINTR) abort(); // can't happen
|
||||||
|
return; // destructor is asking us to exit
|
||||||
|
}
|
||||||
|
/* We got POLLHUP, so send an INT signal to the main thread. */
|
||||||
|
kill(getpid(), SIGINT);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
~MonitorFdHup()
|
||||||
|
{
|
||||||
|
pthread_kill(thread.native_handle(), SIGINT);
|
||||||
|
thread.join();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -6,6 +6,7 @@
|
||||||
#include "archive.hh"
|
#include "archive.hh"
|
||||||
#include "affinity.hh"
|
#include "affinity.hh"
|
||||||
#include "globals.hh"
|
#include "globals.hh"
|
||||||
|
#include "monitor-fd.hh"
|
||||||
|
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
|
||||||
|
@ -17,7 +18,6 @@
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <sys/un.h>
|
#include <sys/un.h>
|
||||||
#include <fcntl.h>
|
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <pwd.h>
|
#include <pwd.h>
|
||||||
#include <grp.h>
|
#include <grp.h>
|
||||||
|
@ -25,25 +25,6 @@
|
||||||
using namespace nix;
|
using namespace nix;
|
||||||
|
|
||||||
|
|
||||||
/* On platforms that have O_ASYNC, we can detect when a client
|
|
||||||
disconnects and immediately kill any ongoing builds. On platforms
|
|
||||||
that lack it, we only notice the disconnection the next time we try
|
|
||||||
to write to the client. So if you have a builder that never
|
|
||||||
generates output on stdout/stderr, the daemon will never notice
|
|
||||||
that the client has disconnected until the builder terminates.
|
|
||||||
|
|
||||||
GNU/Hurd does have O_ASYNC, but its Unix-domain socket translator
|
|
||||||
(pflocal) does not implement F_SETOWN. See
|
|
||||||
<http://lists.gnu.org/archive/html/bug-guix/2013-07/msg00021.html> for
|
|
||||||
details.*/
|
|
||||||
#if defined O_ASYNC && !defined __GNU__
|
|
||||||
#define HAVE_HUP_NOTIFICATION
|
|
||||||
#ifndef SIGPOLL
|
|
||||||
#define SIGPOLL SIGIO
|
|
||||||
#endif
|
|
||||||
#endif
|
|
||||||
|
|
||||||
|
|
||||||
static FdSource from(STDIN_FILENO);
|
static FdSource from(STDIN_FILENO);
|
||||||
static FdSink to(STDOUT_FILENO);
|
static FdSink to(STDOUT_FILENO);
|
||||||
|
|
||||||
|
@ -51,7 +32,6 @@ bool canSendStderr;
|
||||||
pid_t myPid;
|
pid_t myPid;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/* This function is called anytime we want to write something to
|
/* This function is called anytime we want to write something to
|
||||||
stderr. If we're in a state where the protocol allows it (i.e.,
|
stderr. If we're in a state where the protocol allows it (i.e.,
|
||||||
when canSendStderr), send the message to the client over the
|
when canSendStderr), send the message to the client over the
|
||||||
|
@ -78,111 +58,11 @@ static void tunnelStderr(const unsigned char * buf, size_t count)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* Return true if the remote side has closed its end of the
|
|
||||||
connection, false otherwise. Should not be called on any socket on
|
|
||||||
which we expect input! */
|
|
||||||
static bool isFarSideClosed(int socket)
|
|
||||||
{
|
|
||||||
struct timeval timeout;
|
|
||||||
timeout.tv_sec = timeout.tv_usec = 0;
|
|
||||||
|
|
||||||
fd_set fds;
|
|
||||||
FD_ZERO(&fds);
|
|
||||||
FD_SET(socket, &fds);
|
|
||||||
|
|
||||||
while (select(socket + 1, &fds, 0, 0, &timeout) == -1)
|
|
||||||
if (errno != EINTR) throw SysError("select()");
|
|
||||||
|
|
||||||
if (!FD_ISSET(socket, &fds)) return false;
|
|
||||||
|
|
||||||
/* Destructive read to determine whether the select() marked the
|
|
||||||
socket as readable because there is actual input or because
|
|
||||||
we've reached EOF (i.e., a read of size 0 is available). */
|
|
||||||
char c;
|
|
||||||
int rd;
|
|
||||||
if ((rd = read(socket, &c, 1)) > 0)
|
|
||||||
throw Error("EOF expected (protocol error?)");
|
|
||||||
else if (rd == -1 && errno != ECONNRESET)
|
|
||||||
throw SysError("expected connection reset or EOF");
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* A SIGPOLL signal is received when data is available on the client
|
|
||||||
communication socket, or when the client has closed its side of the
|
|
||||||
socket. This handler is enabled at precisely those moments in the
|
|
||||||
protocol when we're doing work and the client is supposed to be
|
|
||||||
quiet. Thus, if we get a SIGPOLL signal, it means that the client
|
|
||||||
has quit. So we should quit as well.
|
|
||||||
|
|
||||||
Too bad most operating systems don't support the POLL_HUP value for
|
|
||||||
si_code in siginfo_t. That would make most of the SIGPOLL
|
|
||||||
complexity unnecessary, i.e., we could just enable SIGPOLL all the
|
|
||||||
time and wouldn't have to worry about races. */
|
|
||||||
static void sigPollHandler(int sigNo)
|
|
||||||
{
|
|
||||||
using namespace std;
|
|
||||||
try {
|
|
||||||
/* Check that the far side actually closed. We're still
|
|
||||||
getting spurious signals every once in a while. I.e.,
|
|
||||||
there is no input available, but we get a signal with
|
|
||||||
POLL_IN set. Maybe it's delayed or something. */
|
|
||||||
if (isFarSideClosed(from.fd)) {
|
|
||||||
if (!blockInt) {
|
|
||||||
_isInterrupted = 1;
|
|
||||||
blockInt = 1;
|
|
||||||
canSendStderr = false;
|
|
||||||
const char * s = "SIGPOLL\n";
|
|
||||||
write(STDERR_FILENO, s, strlen(s));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
const char * s = "spurious SIGPOLL\n";
|
|
||||||
write(STDERR_FILENO, s, strlen(s));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (Error & e) {
|
|
||||||
/* Shouldn't happen. */
|
|
||||||
string s = "impossible: " + e.msg() + '\n';
|
|
||||||
write(STDERR_FILENO, s.data(), s.size());
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void setSigPollAction(bool enable)
|
|
||||||
{
|
|
||||||
#ifdef HAVE_HUP_NOTIFICATION
|
|
||||||
struct sigaction act, oact;
|
|
||||||
act.sa_handler = enable ? sigPollHandler : SIG_IGN;
|
|
||||||
sigfillset(&act.sa_mask);
|
|
||||||
act.sa_flags = 0;
|
|
||||||
if (sigaction(SIGPOLL, &act, &oact))
|
|
||||||
throw SysError("setting handler for SIGPOLL");
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* startWork() means that we're starting an operation for which we
|
/* startWork() means that we're starting an operation for which we
|
||||||
want to send out stderr to the client. */
|
want to send out stderr to the client. */
|
||||||
static void startWork()
|
static void startWork()
|
||||||
{
|
{
|
||||||
canSendStderr = true;
|
canSendStderr = true;
|
||||||
|
|
||||||
/* Handle client death asynchronously. */
|
|
||||||
setSigPollAction(true);
|
|
||||||
|
|
||||||
/* Of course, there is a race condition here: the socket could
|
|
||||||
have closed between when we last read from / wrote to it, and
|
|
||||||
between the time we set the handler for SIGPOLL. In that case
|
|
||||||
we won't get the signal. So do a non-blocking select() to find
|
|
||||||
out if any input is available on the socket. If there is, it
|
|
||||||
has to be the 0-byte read that indicates that the socket has
|
|
||||||
closed. */
|
|
||||||
if (isFarSideClosed(from.fd)) {
|
|
||||||
_isInterrupted = 1;
|
|
||||||
checkInterrupt();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -190,11 +70,6 @@ static void startWork()
|
||||||
client. */
|
client. */
|
||||||
static void stopWork(bool success = true, const string & msg = "", unsigned int status = 0)
|
static void stopWork(bool success = true, const string & msg = "", unsigned int status = 0)
|
||||||
{
|
{
|
||||||
/* Stop handling async client death; we're going to a state where
|
|
||||||
we're either sending or receiving from the client, so we'll be
|
|
||||||
notified of client death anyway. */
|
|
||||||
setSigPollAction(false);
|
|
||||||
|
|
||||||
canSendStderr = false;
|
canSendStderr = false;
|
||||||
|
|
||||||
if (success)
|
if (success)
|
||||||
|
@ -225,17 +100,10 @@ struct TunnelSource : BufferedSource
|
||||||
TunnelSource(Source & from) : from(from) { }
|
TunnelSource(Source & from) : from(from) { }
|
||||||
size_t readUnbuffered(unsigned char * data, size_t len)
|
size_t readUnbuffered(unsigned char * data, size_t len)
|
||||||
{
|
{
|
||||||
/* Careful: we're going to receive data from the client now,
|
|
||||||
so we have to disable the SIGPOLL handler. */
|
|
||||||
setSigPollAction(false);
|
|
||||||
canSendStderr = false;
|
|
||||||
|
|
||||||
writeInt(STDERR_READ, to);
|
writeInt(STDERR_READ, to);
|
||||||
writeInt(len, to);
|
writeInt(len, to);
|
||||||
to.flush();
|
to.flush();
|
||||||
size_t n = readString(data, len, from);
|
size_t n = readString(data, len, from);
|
||||||
|
|
||||||
startWork();
|
|
||||||
if (n == 0) throw EndOfFile("unexpected end-of-file");
|
if (n == 0) throw EndOfFile("unexpected end-of-file");
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
@ -662,19 +530,12 @@ static void performOp(bool trusted, unsigned int clientVersion,
|
||||||
|
|
||||||
static void processConnection(bool trusted)
|
static void processConnection(bool trusted)
|
||||||
{
|
{
|
||||||
|
MonitorFdHup monitor(from.fd);
|
||||||
|
|
||||||
canSendStderr = false;
|
canSendStderr = false;
|
||||||
myPid = getpid();
|
myPid = getpid();
|
||||||
_writeToStderr = tunnelStderr;
|
_writeToStderr = tunnelStderr;
|
||||||
|
|
||||||
#ifdef HAVE_HUP_NOTIFICATION
|
|
||||||
/* Allow us to receive SIGPOLL for events on the client socket. */
|
|
||||||
setSigPollAction(false);
|
|
||||||
if (fcntl(from.fd, F_SETOWN, getpid()) == -1)
|
|
||||||
throw SysError("F_SETOWN");
|
|
||||||
if (fcntl(from.fd, F_SETFL, fcntl(from.fd, F_GETFL, 0) | O_ASYNC) == -1)
|
|
||||||
throw SysError("F_SETFL");
|
|
||||||
#endif
|
|
||||||
|
|
||||||
/* Exchange the greeting. */
|
/* Exchange the greeting. */
|
||||||
unsigned int magic = readInt(from);
|
unsigned int magic = readInt(from);
|
||||||
if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch");
|
if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch");
|
||||||
|
@ -724,6 +585,8 @@ static void processConnection(bool trusted)
|
||||||
WorkerOp op;
|
WorkerOp op;
|
||||||
try {
|
try {
|
||||||
op = (WorkerOp) readInt(from);
|
op = (WorkerOp) readInt(from);
|
||||||
|
} catch (Interrupted & e) {
|
||||||
|
break;
|
||||||
} catch (EndOfFile & e) {
|
} catch (EndOfFile & e) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -871,9 +734,7 @@ static void daemonLoop()
|
||||||
(struct sockaddr *) &remoteAddr, &remoteAddrLen);
|
(struct sockaddr *) &remoteAddr, &remoteAddrLen);
|
||||||
checkInterrupt();
|
checkInterrupt();
|
||||||
if (remote == -1) {
|
if (remote == -1) {
|
||||||
if (errno == EINTR)
|
if (errno == EINTR) continue;
|
||||||
continue;
|
|
||||||
else
|
|
||||||
throw SysError("accepting connection");
|
throw SysError("accepting connection");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue