diff --git a/distributed/build-remote.pl b/distributed/build-remote.pl index 12cb272b9..44482530b 100755 --- a/distributed/build-remote.pl +++ b/distributed/build-remote.pl @@ -80,6 +80,21 @@ if (!defined $machine) { } } +sub writeLoad { + system "echo A >> /tmp/blaaaa"; + open LOAD, "> /home/eelco/nix/distributed/current-load" or die; + system "echo B >> /tmp/blaaaa"; + foreach my $cur (keys %machines) { + system "echo $cur $curJobs{$cur} >> /tmp/blaaaa"; + print LOAD "$cur $curJobs{$cur}\n"; + } + system "echo C >> /tmp/blaaaa"; + close LOAD; +} + +$curJobs{$machine} = $curJobs{$machine} + 1; +writeLoad; + sendReply "accept"; open IN, "<&4" or die; my $x = ; @@ -87,30 +102,27 @@ chomp $x; print "got $x\n"; close IN; -print "BUILDING REMOTE: $storeExpr on $machine\n"; +system "echo $x >> /tmp/blaaaa"; +system "echo $curJobs{$machine} >> /tmp/blaaaa"; -$curJobs{$machine} = $curJobs{$machine} + 1; - -sub writeLoad { - open LOAD, "> /home/eelco/nix/distributed/current-load" or die; - foreach my $cur (keys %machines) { - print LOAD "$cur $curJobs{$cur}\n"; - } - close LOAD; +if ($x ne "okay") { + $curJobs{$machine} = $curJobs{$machine} - 1; + system "echo $curJobs{$machine} >> /tmp/blaaaa"; + writeLoad; + exit 0; } -writeLoad - +print "BUILDING REMOTE: $storeExpr on $machine\n"; my $ssh = "ssh -i $sshKeys{$machine} -x"; -my $inputs = `cat inputs`; +my $inputs = `cat inputs` or die; $inputs =~ s/\n/ /g; -my $outputs = `cat outputs`; +my $outputs = `cat outputs` or die; $outputs =~ s/\n/ /g; -my $successors = `cat successors`; +my $successors = `cat successors` or die; $successors =~ s/\n/ /g; system "rsync -a -e '$ssh' $storeExpr $inputs $machine:/nix/store"; @@ -136,4 +148,4 @@ foreach my $output (split '\n', $outputs) { $curJobs{$machine} = $curJobs{$machine} - 1; -writeLoad +writeLoad; diff --git a/src/libstore/normalise.cc b/src/libstore/normalise.cc index 0673814ba..5d0009539 100644 --- a/src/libstore/normalise.cc +++ b/src/libstore/normalise.cc @@ -91,6 +91,9 @@ struct Goal /* The remainder is state held during the build. */ + /* Whether it's being built by a hook or by ourselves. */ + bool inHook; + /* Locks on the output paths. */ PathLocks outputLocks; @@ -201,6 +204,11 @@ private: of derivation expressions that have yet to be normalised. */ Goals goals; + /* Finished goals are removed in run() at top-level; they are not + deleted as soon as they are finished, since there may be + references hanging about. */ + PathSet deadGoals; + /* The set of `buildable' goals, which are the ones with an empty list of unfinished inputs. */ PathSet buildable; @@ -237,8 +245,11 @@ private: void startBuildChild(Goal & goal); - bool tryBuildHook(Goal & goal); + typedef enum {rpAccept, rpDecline, rpPostpone} HookReply; + HookReply tryBuildHook(Goal & goal); + void terminateBuildHook(Goal & goal); + void openLogFile(Goal & goal); void initChild(Goal & goal); @@ -353,6 +364,12 @@ void Normaliser::run() do { printMsg(lvlVomit, "waiting for children"); } while (!waitForChildren()); + + /* Remove finished goals from the graph. */ + for (PathSet::iterator i = deadGoals.begin(); + i != deadGoals.end(); ++i) + goals.erase(*i); + deadGoals.clear(); } assert(buildable.empty() && building.empty()); @@ -361,7 +378,12 @@ void Normaliser::run() bool Normaliser::canBuildMore() { - return building.size() < maxBuildJobs; + /* !!! O(n) - not that it matters */ + unsigned int localJobs = 0; + for (Building::iterator i = building.begin(); + i != building.end(); ++i) + if (!goals[i->second].inHook) localJobs++; + return localJobs < maxBuildJobs; } @@ -378,12 +400,18 @@ bool Normaliser::startBuild(Path nePath) format("starting normalisation of goal `%1%'") % nePath); /* Is the build hook willing to accept this job? */ - if (tryBuildHook(goal)) return true; + switch (tryBuildHook(goal)) { + case rpAccept: return true; + case rpPostpone: return false; + case rpDecline: ; + } if (!canBuildMore()) { debug("postponing build"); return false; } + + goal.inHook = false; /* Prepare the build, i.e., acquire locks and gather necessary information. */ @@ -616,10 +644,17 @@ string readLine(int fd) } -bool Normaliser::tryBuildHook(Goal & goal) +void writeLine(int fd, string s) +{ + s += '\n'; + writeFull(fd, (const unsigned char *) s.c_str(), s.size()); +} + + +Normaliser::HookReply Normaliser::tryBuildHook(Goal & goal) { Path buildHook = getEnv("NIX_BUILD_HOOK"); - if (buildHook == "") return false; + if (buildHook == "") return rpDecline; buildHook = absPath(buildHook); /* Create a directory where we will store files used for @@ -676,21 +711,17 @@ bool Normaliser::tryBuildHook(Goal & goal) if (reply == "decline" || reply == "postpone") { /* Clean up the child. !!! hacky / should verify */ - /* !!! drain stdout of hook, wait for child process */ - goal.pid = 0; - goal.fromHook.closeReadSide(); - goal.toHook.closeWriteSide(); - close(goal.fdLogFile); - goal.fdLogFile = 0; - goal.logPipe.closeReadSide(); - building.erase(pid); - return reply == "postpone"; + terminateBuildHook(goal); + return reply == "decline" ? rpDecline : rpPostpone; } else if (reply == "accept") { - if (!prepareBuild(goal)) - throw Error("NOT IMPLEMENTED: hook unnecessary"); + if (!prepareBuild(goal)) { + writeLine(goal.toHook.writeSide, "cancel"); + terminateBuildHook(goal); + return rpAccept; + } Path inputListFN = goal.tmpDir + "/inputs"; Path outputListFN = goal.tmpDir + "/outputs"; @@ -717,17 +748,35 @@ bool Normaliser::tryBuildHook(Goal & goal) s += i->first + " " + i->second + "\n"; writeStringToFile(successorsListFN, s); - string okay = "okay\n"; - writeFull(goal.toHook.writeSide, - (const unsigned char *) okay.c_str(), okay.size()); + writeLine(goal.toHook.writeSide, "okay"); - return true; + goal.inHook = true; + + return rpAccept; } else throw Error(format("bad hook reply `%1%'") % reply); } +void Normaliser::terminateBuildHook(Goal & goal) +{ + /* !!! drain stdout of hook, wait for child process */ + debug("terminating build hook"); + pid_t pid = goal.pid; + int status; + if (waitpid(goal.pid, &status, 0) != goal.pid) + printMsg(lvlError, format("process `%1%' missing") % goal.pid); + goal.pid = 0; + goal.fromHook.closeReadSide(); + goal.toHook.closeWriteSide(); + close(goal.fdLogFile); + goal.fdLogFile = 0; + goal.logPipe.closeReadSide(); + building.erase(pid); +} + + void Normaliser::openLogFile(Goal & goal) { /* Create a log file. */ @@ -841,7 +890,7 @@ bool Normaliser::waitForChildren() Goal & goal(goals[i->second]); int fd = goal.logPipe.readSide; if (FD_ISSET(fd, &fds)) { - unsigned char buffer[1024]; + unsigned char buffer[4096]; ssize_t rd = read(fd, buffer, sizeof(buffer)); if (rd == -1) { if (errno != EINTR) @@ -1047,9 +1096,10 @@ void Normaliser::removeGoal(Goal & goal) } } - /* Remove this goal from the graph. Careful: after this `goal' is - probably no longer valid. */ - goals.erase(goal.nePath); + /* Lazily remove the goal from the graph (it will be actually + removed in run(); this is since callers may have references to + `goal'). */ + deadGoals.insert(goal.nePath); }