* Buffer writes in FdSink. This significantly reduces the number of

system calls / context switches when dumping a NAR and in the worker
  protocol.
This commit is contained in:
Eelco Dolstra 2011-12-14 23:30:06 +00:00
parent 893cac1402
commit 3a48282b06
4 changed files with 51 additions and 15 deletions

View file

@ -65,6 +65,7 @@ void RemoteStore::openConnection()
/* Send the magic greeting, check for the reply. */ /* Send the magic greeting, check for the reply. */
try { try {
writeInt(WORKER_MAGIC_1, to); writeInt(WORKER_MAGIC_1, to);
to.flush();
unsigned int magic = readInt(from); unsigned int magic = readInt(from);
if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch"); if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch");
@ -166,6 +167,7 @@ void RemoteStore::connectToDaemon()
RemoteStore::~RemoteStore() RemoteStore::~RemoteStore()
{ {
try { try {
to.flush();
fdSocket.close(); fdSocket.close();
if (child != -1) if (child != -1)
child.wait(true); child.wait(true);
@ -488,6 +490,7 @@ void RemoteStore::clearFailedPaths(const PathSet & paths)
void RemoteStore::processStderr(Sink * sink, Source * source) void RemoteStore::processStderr(Sink * sink, Source * source)
{ {
to.flush();
unsigned int msg; unsigned int msg;
while ((msg = readInt(from)) == STDERR_NEXT while ((msg = readInt(from)) == STDERR_NEXT
|| msg == STDERR_READ || msg == STDERR_WRITE) { || msg == STDERR_READ || msg == STDERR_WRITE) {
@ -503,6 +506,7 @@ void RemoteStore::processStderr(Sink * sink, Source * source)
AutoDeleteArray<unsigned char> d(buf); AutoDeleteArray<unsigned char> d(buf);
(*source)(buf, len); (*source)(buf, len);
writeString(string((const char *) buf, len), to); writeString(string((const char *) buf, len), to);
to.flush();
} }
else { else {
string s = readString(from); string s = readString(from);

View file

@ -9,7 +9,30 @@ namespace nix {
void FdSink::operator () (const unsigned char * data, unsigned int len) void FdSink::operator () (const unsigned char * data, unsigned int len)
{ {
if (!buffer) buffer = new unsigned char[bufSize];
while (len) {
/* Optimisation: bypass the buffer if the data exceeds the
buffer size and there is no unflushed data. */
if (bufPos == 0 && len >= bufSize) {
writeFull(fd, data, len); writeFull(fd, data, len);
break;
}
/* Otherwise, copy the bytes to the buffer. Flush the buffer
when it's full. */
size_t n = bufPos + len > bufSize ? bufSize - bufPos : len;
memcpy(buffer + bufPos, data, n);
data += n; bufPos += n; len -= n;
if (bufPos == bufSize) flush();
}
}
void FdSink::flush()
{
if (fd == -1 || bufPos == 0) return;
writeFull(fd, buffer, bufPos);
bufPos = 0;
} }

View file

@ -28,22 +28,29 @@ struct Source
}; };
/* A sink that writes data to a file descriptor. */ /* A sink that writes data to a file descriptor (using a buffer). */
struct FdSink : Sink struct FdSink : Sink
{ {
int fd; int fd;
unsigned int bufSize, bufPos;
unsigned char * buffer;
FdSink() FdSink() : fd(-1), bufSize(32 * 1024), bufPos(0), buffer(0) { }
FdSink(int fd, unsigned int bufSize = 32 * 1024)
: fd(fd), bufSize(bufSize), bufPos(0), buffer(0)
{ {
fd = -1;
} }
FdSink(int fd) ~FdSink()
{ {
this->fd = fd; flush();
if (buffer) delete[] buffer;
} }
void operator () (const unsigned char * data, unsigned int len); void operator () (const unsigned char * data, unsigned int len);
void flush();
}; };

View file

@ -57,6 +57,7 @@ static void tunnelStderr(const unsigned char * buf, size_t count)
try { try {
writeInt(STDERR_NEXT, to); writeInt(STDERR_NEXT, to);
writeString(string((char *) buf, count), to); writeString(string((char *) buf, count), to);
to.flush();
} catch (...) { } catch (...) {
/* Write failed; that means that the other side is /* Write failed; that means that the other side is
gone. */ gone. */
@ -200,9 +201,7 @@ static void stopWork(bool success = true, const string & msg = "", unsigned int
struct TunnelSink : Sink struct TunnelSink : Sink
{ {
Sink & to; Sink & to;
TunnelSink(Sink & to) : to(to) TunnelSink(Sink & to) : to(to) { }
{
}
virtual void operator () virtual void operator ()
(const unsigned char * data, unsigned int len) (const unsigned char * data, unsigned int len)
{ {
@ -215,9 +214,7 @@ struct TunnelSink : Sink
struct TunnelSource : Source struct TunnelSource : Source
{ {
Source & from; Source & from;
TunnelSource(Source & from) : from(from) TunnelSource(Source & from) : from(from) { }
{
}
virtual void operator () virtual void operator ()
(unsigned char * data, unsigned int len) (unsigned char * data, unsigned int len)
{ {
@ -228,6 +225,7 @@ struct TunnelSource : Source
writeInt(STDERR_READ, to); writeInt(STDERR_READ, to);
writeInt(len, to); writeInt(len, to);
to.flush();
string s = readString(from); string s = readString(from);
if (s.size() != len) throw Error("not enough data"); if (s.size() != len) throw Error("not enough data");
memcpy(data, (const unsigned char *) s.c_str(), len); memcpy(data, (const unsigned char *) s.c_str(), len);
@ -596,8 +594,8 @@ static void processConnection()
unsigned int magic = readInt(from); unsigned int magic = readInt(from);
if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch"); if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch");
writeInt(WORKER_MAGIC_2, to); writeInt(WORKER_MAGIC_2, to);
writeInt(PROTOCOL_VERSION, to); writeInt(PROTOCOL_VERSION, to);
to.flush();
unsigned int clientVersion = readInt(from); unsigned int clientVersion = readInt(from);
/* Send startup error messages to the client. */ /* Send startup error messages to the client. */
@ -619,9 +617,11 @@ static void processConnection()
store = boost::shared_ptr<StoreAPI>(new LocalStore()); store = boost::shared_ptr<StoreAPI>(new LocalStore());
stopWork(); stopWork();
to.flush();
} catch (Error & e) { } catch (Error & e) {
stopWork(false, e.msg()); stopWork(false, e.msg());
to.flush();
return; return;
} }
@ -652,6 +652,8 @@ static void processConnection()
if (!errorAllowed) break; if (!errorAllowed) break;
} }
to.flush();
assert(!canSendStderr); assert(!canSendStderr);
}; };