* Make the import operation through the daemon much more efficient

(way fewer roundtrips) by allowing the client to send data in bigger
  chunks.
* Some refactoring.
This commit is contained in:
Eelco Dolstra 2011-12-16 19:44:13 +00:00
parent 78598d06f0
commit e0bd307802
6 changed files with 68 additions and 44 deletions

View file

@ -1199,10 +1199,11 @@ struct HashAndReadSource : Source
{ {
hashing = true; hashing = true;
} }
virtual void operator () (unsigned char * data, size_t len) size_t read(unsigned char * data, size_t len)
{ {
readSource(data, len); size_t n = readSource.read(data, len);
if (hashing) hashSink(data, len); if (hashing) hashSink(data, n);
return n;
} }
}; };

View file

@ -501,11 +501,11 @@ void RemoteStore::processStderr(Sink * sink, Source * source)
} }
else if (msg == STDERR_READ) { else if (msg == STDERR_READ) {
if (!source) throw Error("no source"); if (!source) throw Error("no source");
unsigned int len = readInt(from); size_t len = readInt(from);
unsigned char * buf = new unsigned char[len]; unsigned char * buf = new unsigned char[len];
AutoDeleteArray<unsigned char> d(buf); AutoDeleteArray<unsigned char> d(buf);
(*source)(buf, len); size_t n = source->read(buf, len);
writeString(string((const char *) buf, len), to); writeString(string((const char *) buf, n), to); // !!! inefficient
to.flush(); to.flush();
} }
else { else {

View file

@ -8,7 +8,7 @@ namespace nix {
#define WORKER_MAGIC_1 0x6e697863 #define WORKER_MAGIC_1 0x6e697863
#define WORKER_MAGIC_2 0x6478696f #define WORKER_MAGIC_2 0x6478696f
#define PROTOCOL_VERSION 0x108 #define PROTOCOL_VERSION 0x109
#define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00) #define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00)
#define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff) #define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff)

View file

@ -23,8 +23,9 @@ void BufferedSink::operator () (const unsigned char * data, size_t len)
while (len) { while (len) {
/* Optimisation: bypass the buffer if the data exceeds the /* Optimisation: bypass the buffer if the data exceeds the
buffer size and there is no unflushed data. */ buffer size. */
if (bufPos == 0 && len >= bufSize) { if (bufPos + len >= bufSize) {
flush();
write(data, len); write(data, len);
break; break;
} }
@ -59,29 +60,37 @@ void FdSink::write(const unsigned char * data, size_t len)
} }
void Source::operator () (unsigned char * data, size_t len)
{
while (len) {
size_t n = read(data, len);
data += n; len -= n;
}
}
BufferedSource::~BufferedSource() BufferedSource::~BufferedSource()
{ {
if (buffer) delete[] buffer; if (buffer) delete[] buffer;
} }
void BufferedSource::operator () (unsigned char * data, size_t len) size_t BufferedSource::read(unsigned char * data, size_t len)
{ {
if (!buffer) buffer = new unsigned char[bufSize]; if (!buffer) buffer = new unsigned char[bufSize];
while (len) { if (!bufPosIn) bufPosIn = readUnbuffered(buffer, bufSize);
if (!bufPosIn) bufPosIn = read(buffer, bufSize);
/* Copy out the data in the buffer. */ /* Copy out the data in the buffer. */
size_t n = len > bufPosIn - bufPosOut ? bufPosIn - bufPosOut : len; size_t n = len > bufPosIn - bufPosOut ? bufPosIn - bufPosOut : len;
memcpy(data, buffer + bufPosOut, n); memcpy(data, buffer + bufPosOut, n);
data += n; bufPosOut += n; len -= n; bufPosOut += n;
if (bufPosIn == bufPosOut) bufPosIn = bufPosOut = 0; if (bufPosIn == bufPosOut) bufPosIn = bufPosOut = 0;
} return n;
} }
size_t FdSource::read(unsigned char * data, size_t len) size_t FdSource::readUnbuffered(unsigned char * data, size_t len)
{ {
ssize_t n; ssize_t n;
do { do {
@ -94,6 +103,15 @@ size_t FdSource::read(unsigned char * data, size_t len)
} }
size_t StringSource::read(unsigned char * data, size_t len)
{
if (pos == s.size()) throw EndOfFile("end of string reached");
size_t n = s.copy((char *) data, len, pos);
pos += n;
return n;
}
void writePadding(size_t len, Sink & sink) void writePadding(size_t len, Sink & sink)
{ {
if (len % 8) { if (len % 8) {

View file

@ -24,7 +24,7 @@ struct BufferedSink : Sink
BufferedSink(size_t bufSize = 32 * 1024) BufferedSink(size_t bufSize = 32 * 1024)
: bufSize(bufSize), bufPos(0), buffer(0) { } : bufSize(bufSize), bufPos(0), buffer(0) { }
~BufferedSink(); ~BufferedSink();
void operator () (const unsigned char * data, size_t len); void operator () (const unsigned char * data, size_t len);
void flush(); void flush();
@ -39,9 +39,14 @@ struct Source
virtual ~Source() { } virtual ~Source() { }
/* Store exactly len bytes in the buffer pointed to by data. /* Store exactly len bytes in the buffer pointed to by data.
It blocks if that much data is not yet available, or throws an It blocks until all the requested data is available, or throws
error if it is not going to be available. */ an error if it is not going to be available. */
virtual void operator () (unsigned char * data, size_t len) = 0; void operator () (unsigned char * data, size_t len);
/* Store up to len in the buffer pointed to by data, and
return the number of bytes stored. If blocks until at least
one byte is available. */
virtual size_t read(unsigned char * data, size_t len) = 0;
}; };
@ -55,12 +60,10 @@ struct BufferedSource : Source
: bufSize(bufSize), bufPosIn(0), bufPosOut(0), buffer(0) { } : bufSize(bufSize), bufPosIn(0), bufPosOut(0), buffer(0) { }
~BufferedSource(); ~BufferedSource();
void operator () (unsigned char * data, size_t len); size_t read(unsigned char * data, size_t len);
/* Store up to len in the buffer pointed to by data, and /* Underlying read call, to be overriden. */
return the number of bytes stored. If should block until at virtual size_t readUnbuffered(unsigned char * data, size_t len) = 0;
least one byte is available. */
virtual size_t read(unsigned char * data, size_t len) = 0;
}; };
@ -83,7 +86,7 @@ struct FdSource : BufferedSource
int fd; int fd;
FdSource() : fd(-1) { } FdSource() : fd(-1) { }
FdSource(int fd) : fd(fd) { } FdSource(int fd) : fd(fd) { }
size_t read(unsigned char * data, size_t len); size_t readUnbuffered(unsigned char * data, size_t len);
}; };
@ -104,13 +107,7 @@ struct StringSource : Source
const string & s; const string & s;
size_t pos; size_t pos;
StringSource(const string & _s) : s(_s), pos(0) { } StringSource(const string & _s) : s(_s), pos(0) { }
virtual void operator () (unsigned char * data, size_t len) size_t read(unsigned char * data, size_t len);
{
s.copy((char *) data, len, pos);
pos += len;
if (pos > s.size())
throw Error("end of string reached");
}
}; };

View file

@ -210,11 +210,11 @@ struct TunnelSink : Sink
}; };
struct TunnelSource : Source struct TunnelSource : BufferedSource
{ {
Source & from; Source & from;
TunnelSource(Source & from) : from(from) { } TunnelSource(Source & from) : from(from) { }
virtual void operator () (unsigned char * data, size_t len) size_t readUnbuffered(unsigned char * data, size_t len)
{ {
/* Careful: we're going to receive data from the client now, /* Careful: we're going to receive data from the client now,
so we have to disable the SIGPOLL handler. */ so we have to disable the SIGPOLL handler. */
@ -224,11 +224,16 @@ struct TunnelSource : Source
writeInt(STDERR_READ, to); writeInt(STDERR_READ, to);
writeInt(len, to); writeInt(len, to);
to.flush(); to.flush();
string s = readString(from); string s = readString(from); // !!! inefficient
if (s.size() != len) throw Error("not enough data");
memcpy(data, (const unsigned char *) s.c_str(), len);
startWork(); startWork();
if (s.empty()) throw EndOfFile("unexpected end-of-file");
if (s.size() > len) throw Error("client sent too much data");
memcpy(data, (const unsigned char *) s.c_str(), s.size());
return s.size();
} }
}; };
@ -265,10 +270,11 @@ struct SavingSourceAdapter : Source
Source & orig; Source & orig;
string s; string s;
SavingSourceAdapter(Source & orig) : orig(orig) { } SavingSourceAdapter(Source & orig) : orig(orig) { }
void operator () (unsigned char * data, size_t len) size_t read(unsigned char * data, size_t len)
{ {
orig(data, len); size_t n = orig.read(data, len);
s.append((const char *) data, len); s.append((const char *) data, n);
return n;
} }
}; };
@ -397,6 +403,8 @@ static void performOp(unsigned int clientVersion,
case wopImportPath: { case wopImportPath: {
startWork(); startWork();
if (GET_PROTOCOL_MINOR(clientVersion) < 9)
throw Error("import not supported; upgrade your client");
TunnelSource source(from); TunnelSource source(from);
Path path = store->importPath(true, source); Path path = store->importPath(true, source);
stopWork(); stopWork();