Allow only 1 thread to send a closure to a given machine at the same time

This prevents a race where multiple threads see that machine X is
missing path P, and start sending it concurrently. Nix handles this
correctly, but it's still wasteful (especially for the case where P ==
GHC).

A more refined scheme would be to have per machine, per path locks.
This commit is contained in:
Eelco Dolstra 2015-07-07 14:04:36 +02:00
parent 16696a4aee
commit 35b7c4f82b
3 changed files with 7 additions and 21 deletions

View file

@ -70,7 +70,7 @@ static void openConnection(const string & sshName, const string & sshKey,
static void copyClosureTo(std::shared_ptr<StoreAPI> store, static void copyClosureTo(std::shared_ptr<StoreAPI> store,
FdSource & from, FdSink & to, const PathSet & paths, FdSource & from, FdSink & to, const PathSet & paths,
TokenServer & copyClosureTokenServer, counter & bytesSent, counter & bytesSent,
bool useSubstitutes = false) bool useSubstitutes = false)
{ {
PathSet closure; PathSet closure;
@ -99,19 +99,6 @@ static void copyClosureTo(std::shared_ptr<StoreAPI> store,
for (auto i = sorted.rbegin(); i != sorted.rend(); ++i) for (auto i = sorted.rbegin(); i != sorted.rend(); ++i)
if (present.find(*i) == present.end()) missing.push_back(*i); if (present.find(*i) == present.end()) missing.push_back(*i);
/* Ensure that only a limited number of threads can copy closures
at the same time. However, proceed anyway after a timeout to
prevent starvation by a handful of really huge closures. */
time_t start = time(0);
int timeout = 60 * (10 + rand() % 5);
auto token(copyClosureTokenServer.get(timeout));
time_t stop = time(0);
if (token())
printMsg(lvlDebug, format("got copy closure token after %1%s") % (stop - start));
else
printMsg(lvlDebug, format("did not get copy closure token after %1%s") % (stop - start));
printMsg(lvlDebug, format("sending %1% missing paths") % missing.size()); printMsg(lvlDebug, format("sending %1% missing paths") % missing.size());
for (auto & p : missing) for (auto & p : missing)
@ -194,7 +181,8 @@ void State::buildRemote(std::shared_ptr<StoreAPI> store,
if (machine->sshName != "localhost") { if (machine->sshName != "localhost") {
printMsg(lvlDebug, format("sending closure of %1% to %2%") % step->drvPath % machine->sshName); printMsg(lvlDebug, format("sending closure of %1% to %2%") % step->drvPath % machine->sshName);
MaintainCount mc(nrStepsCopyingTo); MaintainCount mc(nrStepsCopyingTo);
copyClosureTo(store, from, to, inputs, copyClosureTokenServer, bytesSent); std::lock_guard<std::mutex> sendLock(machine->state->sendLock);
copyClosureTo(store, from, to, inputs, bytesSent);
} }
autoDelete.cancel(); autoDelete.cancel();

View file

@ -32,7 +32,6 @@ bool has(const C & c, const V & v)
State::State() State::State()
: copyClosureTokenServer{maxParallelCopyClosure}
{ {
hydraData = getEnv("HYDRA_DATA"); hydraData = getEnv("HYDRA_DATA");
if (hydraData == "") throw Error("$HYDRA_DATA must be set"); if (hydraData == "") throw Error("$HYDRA_DATA must be set");

View file

@ -12,7 +12,6 @@
#include "pathlocks.hh" #include "pathlocks.hh"
#include "pool.hh" #include "pool.hh"
#include "sync.hh" #include "sync.hh"
#include "token-server.hh"
#include "store-api.hh" #include "store-api.hh"
#include "derivations.hh" #include "derivations.hh"
@ -136,6 +135,10 @@ struct Machine
counter nrStepsDone{0}; counter nrStepsDone{0};
counter totalStepTime{0}; // total time for steps, including closure copying counter totalStepTime{0}; // total time for steps, including closure copying
counter totalStepBuildTime{0}; // total build time for steps counter totalStepBuildTime{0}; // total build time for steps
/* Mutex to prevent multiple threads from sending data to the
same machine (which would be inefficient). */
std::mutex sendLock;
}; };
State::ptr state; State::ptr state;
@ -191,10 +194,6 @@ private:
nix::Path machinesFile; nix::Path machinesFile;
struct stat machinesFileStat; struct stat machinesFileStat;
/* Token server limiting the number of threads copying closures in
parallel to prevent excessive I/O load. */
TokenServer copyClosureTokenServer;
/* Various stats. */ /* Various stats. */
time_t startedAt; time_t startedAt;
counter nrBuildsRead{0}; counter nrBuildsRead{0};