forked from lix-project/lix
Merge pull request #3876 from NixOS/nix-copy-latency
Fix RemoteStore::addToStore() latency
This commit is contained in:
commit
428e716193
3 changed files with 154 additions and 19 deletions
|
@ -703,6 +703,64 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
|
||||||
if (!trusted)
|
if (!trusted)
|
||||||
info.ultimate = false;
|
info.ultimate = false;
|
||||||
|
|
||||||
|
if (GET_PROTOCOL_MINOR(clientVersion) >= 23) {
|
||||||
|
|
||||||
|
struct FramedSource : Source
|
||||||
|
{
|
||||||
|
Source & from;
|
||||||
|
bool eof = false;
|
||||||
|
std::vector<unsigned char> pending;
|
||||||
|
size_t pos = 0;
|
||||||
|
|
||||||
|
FramedSource(Source & from) : from(from)
|
||||||
|
{ }
|
||||||
|
|
||||||
|
~FramedSource()
|
||||||
|
{
|
||||||
|
if (!eof) {
|
||||||
|
while (true) {
|
||||||
|
auto n = readInt(from);
|
||||||
|
if (!n) break;
|
||||||
|
std::vector<unsigned char> data(n);
|
||||||
|
from(data.data(), n);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t read(unsigned char * data, size_t len) override
|
||||||
|
{
|
||||||
|
if (eof) throw EndOfFile("reached end of FramedSource");
|
||||||
|
|
||||||
|
if (pos >= pending.size()) {
|
||||||
|
size_t len = readInt(from);
|
||||||
|
if (!len) {
|
||||||
|
eof = true;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
pending = std::vector<unsigned char>(len);
|
||||||
|
pos = 0;
|
||||||
|
from(pending.data(), len);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto n = std::min(len, pending.size() - pos);
|
||||||
|
memcpy(data, pending.data() + pos, n);
|
||||||
|
pos += n;
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
logger->startWork();
|
||||||
|
|
||||||
|
{
|
||||||
|
FramedSource source(from);
|
||||||
|
store->addToStore(info, source, (RepairFlag) repair,
|
||||||
|
dontCheckSigs ? NoCheckSigs : CheckSigs);
|
||||||
|
}
|
||||||
|
|
||||||
|
logger->stopWork();
|
||||||
|
}
|
||||||
|
|
||||||
|
else {
|
||||||
std::unique_ptr<Source> source;
|
std::unique_ptr<Source> source;
|
||||||
if (GET_PROTOCOL_MINOR(clientVersion) >= 21)
|
if (GET_PROTOCOL_MINOR(clientVersion) >= 21)
|
||||||
source = std::make_unique<TunnelSource>(from, to);
|
source = std::make_unique<TunnelSource>(from, to);
|
||||||
|
@ -721,6 +779,8 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
|
||||||
dontCheckSigs ? NoCheckSigs : CheckSigs);
|
dontCheckSigs ? NoCheckSigs : CheckSigs);
|
||||||
|
|
||||||
logger->stopWork();
|
logger->stopWork();
|
||||||
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -503,9 +503,84 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
|
||||||
conn->to << info.registrationTime << info.narSize
|
conn->to << info.registrationTime << info.narSize
|
||||||
<< info.ultimate << info.sigs << renderContentAddress(info.ca)
|
<< info.ultimate << info.sigs << renderContentAddress(info.ca)
|
||||||
<< repair << !checkSigs;
|
<< repair << !checkSigs;
|
||||||
bool tunnel = GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21;
|
|
||||||
if (!tunnel) copyNAR(source, conn->to);
|
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 23) {
|
||||||
conn.processStderr(0, tunnel ? &source : nullptr);
|
|
||||||
|
std::exception_ptr ex;
|
||||||
|
|
||||||
|
struct FramedSink : BufferedSink
|
||||||
|
{
|
||||||
|
ConnectionHandle & conn;
|
||||||
|
std::exception_ptr & ex;
|
||||||
|
|
||||||
|
FramedSink(ConnectionHandle & conn, std::exception_ptr & ex) : conn(conn), ex(ex)
|
||||||
|
{ }
|
||||||
|
|
||||||
|
~FramedSink()
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
conn->to << 0;
|
||||||
|
conn->to.flush();
|
||||||
|
} catch (...) {
|
||||||
|
ignoreException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void write(const unsigned char * data, size_t len) override
|
||||||
|
{
|
||||||
|
/* Don't send more data if the remote has
|
||||||
|
encountered an error. */
|
||||||
|
if (ex) {
|
||||||
|
auto ex2 = ex;
|
||||||
|
ex = nullptr;
|
||||||
|
std::rethrow_exception(ex2);
|
||||||
|
}
|
||||||
|
conn->to << len;
|
||||||
|
conn->to(data, len);
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
/* Handle log messages / exceptions from the remote on a
|
||||||
|
separate thread. */
|
||||||
|
std::thread stderrThread([&]()
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
conn.processStderr(0, nullptr);
|
||||||
|
} catch (...) {
|
||||||
|
ex = std::current_exception();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Finally joinStderrThread([&]()
|
||||||
|
{
|
||||||
|
if (stderrThread.joinable()) {
|
||||||
|
stderrThread.join();
|
||||||
|
if (ex) {
|
||||||
|
try {
|
||||||
|
std::rethrow_exception(ex);
|
||||||
|
} catch (...) {
|
||||||
|
ignoreException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
{
|
||||||
|
FramedSink sink(conn, ex);
|
||||||
|
copyNAR(source, sink);
|
||||||
|
sink.flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
stderrThread.join();
|
||||||
|
if (ex)
|
||||||
|
std::rethrow_exception(ex);
|
||||||
|
|
||||||
|
} else if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21) {
|
||||||
|
conn.processStderr(0, &source);
|
||||||
|
} else {
|
||||||
|
copyNAR(source, conn->to);
|
||||||
|
conn.processStderr(0, nullptr);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,7 @@ namespace nix {
|
||||||
#define WORKER_MAGIC_1 0x6e697863
|
#define WORKER_MAGIC_1 0x6e697863
|
||||||
#define WORKER_MAGIC_2 0x6478696f
|
#define WORKER_MAGIC_2 0x6478696f
|
||||||
|
|
||||||
#define PROTOCOL_VERSION 0x116
|
#define PROTOCOL_VERSION 0x117
|
||||||
#define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00)
|
#define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00)
|
||||||
#define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff)
|
#define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue