* Daemon mode (`nix-worker --daemon'). Clients connect to the server

via the Unix domain socket in /nix/var/nix/daemon.socket.  The
  server forks a worker process per connection.
* readString(): use the heap, not the stack.
* Some protocol fixes.
This commit is contained in:
Eelco Dolstra 2006-12-04 17:17:13 +00:00
parent 4740baf3a6
commit 0130ef88ea
7 changed files with 182 additions and 53 deletions

View file

@ -3,6 +3,8 @@
#include "types.hh" #include "types.hh"
#include <signal.h>
/* These are not implemented here, but must be implemented by a /* These are not implemented here, but must be implemented by a
program linking against libmain. */ program linking against libmain. */
@ -27,6 +29,10 @@ void printGCWarning();
/* Whether we're running setuid. */ /* Whether we're running setuid. */
extern bool setuidMode; extern bool setuidMode;
extern volatile ::sig_atomic_t blockInt;
MakeError(UsageError, nix::Error)
} }

View file

@ -39,10 +39,11 @@ RemoteStore::RemoteStore()
/* Send the magic greeting, check for the reply. */ /* Send the magic greeting, check for the reply. */
try { try {
processStderr();
writeInt(WORKER_MAGIC_1, to); writeInt(WORKER_MAGIC_1, to);
writeInt(verbosity, to);
unsigned int magic = readInt(from); unsigned int magic = readInt(from);
if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch"); if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch");
processStderr();
} catch (Error & e) { } catch (Error & e) {
throw Error(format("cannot start worker (%1%)") throw Error(format("cannot start worker (%1%)")
% e.msg()); % e.msg());

View file

@ -85,10 +85,11 @@ unsigned int readInt(Source & source)
string readString(Source & source) string readString(Source & source)
{ {
unsigned int len = readInt(source); unsigned int len = readInt(source);
char buf[len]; unsigned char * buf = new unsigned char[len];
source((unsigned char *) buf, len); AutoDeleteArray<unsigned char> d(buf);
source(buf, len);
readPadding(len, source); readPadding(len, source);
return string(buf, len); return string((char *) buf, len);
} }

View file

@ -44,8 +44,6 @@ public:
newClass(const format & f) : superClass(f) { }; \ newClass(const format & f) : superClass(f) { }; \
}; };
MakeError(UsageError, Error)
typedef list<string> Strings; typedef list<string> Strings;
typedef set<string> StringSet; typedef set<string> StringSet;

View file

@ -191,18 +191,6 @@ Strings readDirectory(const Path & path)
} }
template <class T>
struct AutoDeleteArray
{
T * p;
AutoDeleteArray(T * p) : p(p) { }
~AutoDeleteArray()
{
delete [] p;
}
};
string readFile(int fd) string readFile(int fd)
{ {
struct stat st; struct stat st;
@ -468,7 +456,7 @@ void readFull(int fd, unsigned char * buf, size_t count)
if (errno == EINTR) continue; if (errno == EINTR) continue;
throw SysError("reading from file"); throw SysError("reading from file");
} }
if (res == 0) throw Error("unexpected end-of-file"); if (res == 0) throw EndOfFile("unexpected end-of-file");
count -= res; count -= res;
buf += res; buf += res;
} }
@ -707,6 +695,7 @@ int Pid::wait(bool block)
if (res == 0 && !block) return -1; if (res == 0 && !block) return -1;
if (errno != EINTR) if (errno != EINTR)
throw SysError("cannot get child exit status"); throw SysError("cannot get child exit status");
checkInterrupt();
} }
} }
@ -793,7 +782,7 @@ void _interrupted()
kills the program! */ kills the program! */
if (!std::uncaught_exception()) { if (!std::uncaught_exception()) {
_isInterrupted = 0; _isInterrupted = 0;
throw Error("interrupted by the user"); throw Interrupted("interrupted by the user");
} }
} }

View file

@ -139,6 +139,8 @@ extern void (*writeToStderr) (const unsigned char * buf, size_t count);
void readFull(int fd, unsigned char * buf, size_t count); void readFull(int fd, unsigned char * buf, size_t count);
void writeFull(int fd, const unsigned char * buf, size_t count); void writeFull(int fd, const unsigned char * buf, size_t count);
MakeError(EndOfFile, Error)
/* Read a file descriptor until EOF occurs. */ /* Read a file descriptor until EOF occurs. */
string drainFD(int fd); string drainFD(int fd);
@ -147,6 +149,19 @@ string drainFD(int fd);
/* Automatic cleanup of resources. */ /* Automatic cleanup of resources. */
template <class T>
struct AutoDeleteArray
{
T * p;
AutoDeleteArray(T * p) : p(p) { }
~AutoDeleteArray()
{
delete [] p;
}
};
class AutoDelete class AutoDelete
{ {
string path; string path;
@ -229,6 +244,8 @@ void inline checkInterrupt()
if (_isInterrupted) _interrupted(); if (_isInterrupted) _interrupted();
} }
MakeError(Interrupted, Error)
/* String packing / unpacking. */ /* String packing / unpacking. */
string packStrings(const Strings & strings); string packStrings(const Strings & strings);

View file

@ -9,6 +9,10 @@
#include <iostream> #include <iostream>
#include <unistd.h> #include <unistd.h>
#include <signal.h> #include <signal.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <fcntl.h> #include <fcntl.h>
using namespace nix; using namespace nix;
@ -43,9 +47,6 @@ bool canSendStderr;
socket. */ socket. */
static void tunnelStderr(const unsigned char * buf, size_t count) static void tunnelStderr(const unsigned char * buf, size_t count)
{ {
if (canSendStderr)
writeFull(STDERR_FILENO, (unsigned char *) "L: ", 3);
writeFull(STDERR_FILENO, buf, count);
if (canSendStderr) { if (canSendStderr) {
try { try {
writeInt(STDERR_NEXT, to); writeInt(STDERR_NEXT, to);
@ -65,13 +66,21 @@ static void tunnelStderr(const unsigned char * buf, size_t count)
socket. This handler is enabled at precisely those moments in the socket. This handler is enabled at precisely those moments in the
protocol when we're doing work and the client is supposed to be protocol when we're doing work and the client is supposed to be
quiet. Thus, if we get a SIGIO signal, it means that the client quiet. Thus, if we get a SIGIO signal, it means that the client
has quit. So we should quit as well. */ has quit. So we should quit as well.
Too bad most operating systems don't support the POLL_HUP value for
si_code in siginfo_t. That would make most of the SIGIO complexity
unnecessary, i.e., we could just enable SIGIO all the time and
wouldn't have to worry about races. */
static void sigioHandler(int sigNo) static void sigioHandler(int sigNo)
{ {
if (!blockInt) {
_isInterrupted = 1; _isInterrupted = 1;
blockInt = 1;
canSendStderr = false; canSendStderr = false;
write(STDERR_FILENO, "SIGIO\n", 6); write(STDERR_FILENO, "SIGIO\n", 6);
} }
}
/* startWork() means that we're starting an operation for which we /* startWork() means that we're starting an operation for which we
@ -97,14 +106,14 @@ static void startWork()
fd_set fds; fd_set fds;
FD_ZERO(&fds); FD_ZERO(&fds);
FD_SET(STDIN_FILENO, &fds); FD_SET(from.fd, &fds);
if (select(STDIN_FILENO + 1, &fds, 0, 0, &timeout) == -1) if (select(from.fd + 1, &fds, 0, 0, &timeout) == -1)
throw SysError("select()"); throw SysError("select()");
if (FD_ISSET(STDIN_FILENO, &fds)) { if (FD_ISSET(from.fd, &fds)) {
char c; char c;
if (read(STDIN_FILENO, &c, 1) != 0) if (read(from.fd, &c, 1) != 0)
throw Error("EOF expected (protocol error?)"); throw Error("EOF expected (protocol error?)");
_isInterrupted = 1; _isInterrupted = 1;
checkInterrupt(); checkInterrupt();
@ -114,7 +123,7 @@ static void startWork()
/* stopWork() means that we're done; stop sending stderr to the /* stopWork() means that we're done; stop sending stderr to the
client. */ client. */
static void stopWork() static void stopWork(bool success = true, const string & msg = "")
{ {
/* Stop handling async client death; we're going to a state where /* Stop handling async client death; we're going to a state where
we're either sending or receiving from the client, so we'll be we're either sending or receiving from the client, so we'll be
@ -123,7 +132,13 @@ static void stopWork()
throw SysError("ignoring SIGIO"); throw SysError("ignoring SIGIO");
canSendStderr = false; canSendStderr = false;
if (success)
writeInt(STDERR_LAST, to); writeInt(STDERR_LAST, to);
else {
writeInt(STDERR_ERROR, to);
writeString(msg, to);
}
} }
@ -237,11 +252,17 @@ static void processConnection()
/* Allow us to receive SIGIO for events on the client socket. */ /* Allow us to receive SIGIO for events on the client socket. */
signal(SIGIO, SIG_IGN); signal(SIGIO, SIG_IGN);
if (fcntl(STDIN_FILENO, F_SETOWN, getpid()) == -1) if (fcntl(from.fd, F_SETOWN, getpid()) == -1)
throw SysError("F_SETOWN"); throw SysError("F_SETOWN");
if (fcntl(STDIN_FILENO, F_SETFL, fcntl(STDIN_FILENO, F_GETFL, 0) | FASYNC) == -1) if (fcntl(from.fd, F_SETFL, fcntl(from.fd, F_GETFL, 0) | FASYNC) == -1)
throw SysError("F_SETFL"); throw SysError("F_SETFL");
/* Exchange the greeting. */
unsigned int magic = readInt(from);
if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch");
verbosity = (Verbosity) readInt(from);
writeInt(WORKER_MAGIC_2, to);
/* Send startup error messages to the client. */ /* Send startup error messages to the client. */
startWork(); startWork();
@ -258,40 +279,137 @@ static void processConnection()
stopWork(); stopWork();
} catch (Error & e) { } catch (Error & e) {
writeInt(STDERR_ERROR, to); stopWork(false, e.msg());
writeString(e.msg(), to);
return; return;
} }
/* Exchange the greeting. */
unsigned int magic = readInt(from);
if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch");
writeInt(WORKER_MAGIC_2, to);
debug("greeting exchanged");
/* Process client requests. */ /* Process client requests. */
bool quit = false;
unsigned int opCount = 0; unsigned int opCount = 0;
do { while (true) {
WorkerOp op = (WorkerOp) readInt(from); WorkerOp op;
try {
op = (WorkerOp) readInt(from);
} catch (EndOfFile & e) {
break;
}
opCount++; opCount++;
try { try {
performOp(from, to, op); performOp(from, to, op);
} catch (Error & e) { } catch (Error & e) {
writeInt(STDERR_ERROR, to); stopWork(false, e.msg());
writeString(e.msg(), to);
} }
};
} while (!quit);
printMsg(lvlError, format("%1% worker operations") % opCount); printMsg(lvlError, format("%1% worker operations") % opCount);
} }
static void setSigChldAction(bool ignore)
{
struct sigaction act, oact;
act.sa_handler = ignore ? SIG_IGN : SIG_DFL;
sigfillset(&act.sa_mask);
act.sa_flags = 0;
if (sigaction(SIGCHLD, &act, &oact))
throw SysError("setting SIGCHLD handler");
}
static void daemonLoop()
{
/* Get rid of children automatically; don't let them become
zombies. */
setSigChldAction(true);
/* Create and bind to a Unix domain socket. */
AutoCloseFD fdSocket = socket(PF_UNIX, SOCK_STREAM, 0);
if (fdSocket == -1)
throw SysError("cannot create Unix domain socket");
string socketPath = nixStateDir + DEFAULT_SOCKET_PATH;
struct sockaddr_un addr;
addr.sun_family = AF_UNIX;
if (socketPath.size() >= sizeof(addr.sun_path))
throw Error(format("socket path `%1%' is too long") % socketPath);
strcpy(addr.sun_path, socketPath.c_str());
unlink(socketPath.c_str());
/* Make sure that the socket is created with 0666 permission
(everybody can connect). */
mode_t oldMode = umask(0111);
int res = bind(fdSocket, (struct sockaddr *) &addr, sizeof(addr));
umask(oldMode);
if (res == -1)
throw SysError(format("cannot bind to socket `%1%'") % socketPath);
if (listen(fdSocket, 5) == -1)
throw SysError(format("cannot listen on socket `%1%'") % socketPath);
/* Loop accepting connections. */
while (1) {
try {
/* Important: the server process *cannot* open the
Berkeley DB environment, because it doesn't like forks
very much. */
assert(!store);
/* Accept a connection. */
struct sockaddr_un remoteAddr;
socklen_t remoteAddrLen = sizeof(remoteAddr);
AutoCloseFD remote = accept(fdSocket,
(struct sockaddr *) &remoteAddr, &remoteAddrLen);
checkInterrupt();
if (remote == -1)
throw SysError("accepting connection");
printMsg(lvlInfo, format("accepted connection %1%") % remote);
/* Fork a child to handle the connection. */
pid_t child;
child = fork();
switch (child) {
case -1:
throw SysError("unable to fork");
case 0:
try { /* child */
/* Background the worker. */
if (setsid() == -1)
throw SysError(format("creating a new session"));
/* Restore normal handling of SIGCHLD. */
setSigChldAction(false);
/* Handle the connection. */
from.fd = remote;
to.fd = remote;
processConnection();
} catch (std::exception & e) {
std::cerr << format("child error: %1%\n") % e.what();
}
exit(0);
}
} catch (Interrupted & e) {
throw;
} catch (Error & e) {
printMsg(lvlError, format("error processing connection: %1%") % e.msg());
}
}
}
void run(Strings args) void run(Strings args)
{ {
bool slave = false; bool slave = false;
@ -315,8 +433,7 @@ void run(Strings args)
else if (daemon) { else if (daemon) {
if (setuidMode) if (setuidMode)
throw Error("daemon cannot be started in setuid mode"); throw Error("daemon cannot be started in setuid mode");
daemonLoop();
throw Error("daemon mode not implemented");
} }
else else