forked from lix-project/lix
* No longer block while waiting for a lock on a store path. Instead
poll for it (i.e. if we can't acquire the lock, then let the main select() loop wait for at most a few seconds and then try again). This improves parallelism: if two nix-store processes are both trying to build a path at the same time, the second one shouldn't block; it should first see if it can build other goals. Also, it prevents the deadlocks that have been occuring in Hydra lately, where a process waits for a lock held by another process that's waiting for a lock held by the first. The downside is that polling isn't really elegant, but POSIX doesn't provide a way to wait for locks in a select() loop. The only solution would be to spawn a thread for each lock to do a blocking fcntl() and then signal the main thread, but that would require pthreads.
This commit is contained in:
parent
58969fa2bf
commit
cacff1be88
|
@ -8,6 +8,7 @@
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
|
#include <algorithm>
|
||||||
#include <boost/shared_ptr.hpp>
|
#include <boost/shared_ptr.hpp>
|
||||||
#include <boost/weak_ptr.hpp>
|
#include <boost/weak_ptr.hpp>
|
||||||
#include <boost/enable_shared_from_this.hpp>
|
#include <boost/enable_shared_from_this.hpp>
|
||||||
|
@ -200,6 +201,12 @@ private:
|
||||||
/* Goals waiting for busy paths to be unlocked. */
|
/* Goals waiting for busy paths to be unlocked. */
|
||||||
WeakGoals waitingForAnyGoal;
|
WeakGoals waitingForAnyGoal;
|
||||||
|
|
||||||
|
/* Goals sleeping for a few seconds (polling a lock). */
|
||||||
|
WeakGoals waitingForAWhile;
|
||||||
|
|
||||||
|
/* Last time the goals in `waitingForAWhile' where woken up. */
|
||||||
|
time_t lastWokenUp;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|
||||||
LocalStore & store;
|
LocalStore & store;
|
||||||
|
@ -246,6 +253,12 @@ public:
|
||||||
wait for some resource that some other goal is holding. */
|
wait for some resource that some other goal is holding. */
|
||||||
void waitForAnyGoal(GoalPtr goal);
|
void waitForAnyGoal(GoalPtr goal);
|
||||||
|
|
||||||
|
/* Wait for a few seconds and then retry this goal. Used when
|
||||||
|
waiting for a lock held by another process. This kind of
|
||||||
|
polling is inefficient, but POSIX doesn't really provide a way
|
||||||
|
to wait for multiple locks in the main select() loop. */
|
||||||
|
void waitForAWhile(GoalPtr goal);
|
||||||
|
|
||||||
/* Loop until the specified top-level goals have finished. */
|
/* Loop until the specified top-level goals have finished. */
|
||||||
void run(const Goals & topGoals);
|
void run(const Goals & topGoals);
|
||||||
|
|
||||||
|
@ -952,10 +965,14 @@ void DerivationGoal::tryToBuild()
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Obtain locks on all output paths. The locks are automatically
|
/* Obtain locks on all output paths. The locks are automatically
|
||||||
released when we exit this function or Nix crashes. */
|
released when we exit this function or Nix crashes. If we
|
||||||
/* !!! nonblock */
|
can't acquire the lock, then continue; hopefully some other
|
||||||
outputLocks.lockPaths(outputPaths(drv.outputs),
|
goal can start a build, and if not, the main loop will sleep a
|
||||||
(format("waiting for lock on %1%") % showPaths(outputPaths(drv.outputs))).str());
|
few seconds and then retry this goal. */
|
||||||
|
if (!outputLocks.lockPaths(outputPaths(drv.outputs), "", false)) {
|
||||||
|
worker.waitForAWhile(shared_from_this());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
/* Now check again whether the outputs are valid. This is because
|
/* Now check again whether the outputs are valid. This is because
|
||||||
another process may have started building in parallel. After
|
another process may have started building in parallel. After
|
||||||
|
@ -2205,8 +2222,10 @@ void SubstitutionGoal::tryToRun()
|
||||||
|
|
||||||
/* Acquire a lock on the output path. */
|
/* Acquire a lock on the output path. */
|
||||||
outputLock = boost::shared_ptr<PathLocks>(new PathLocks);
|
outputLock = boost::shared_ptr<PathLocks>(new PathLocks);
|
||||||
outputLock->lockPaths(singleton<PathSet>(storePath),
|
if (!outputLock->lockPaths(singleton<PathSet>(storePath), "", false)) {
|
||||||
(format("waiting for lock on `%1%'") % storePath).str());
|
worker.waitForAWhile(shared_from_this());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
/* Check again whether the path is invalid. */
|
/* Check again whether the path is invalid. */
|
||||||
if (worker.store.isValidPath(storePath)) {
|
if (worker.store.isValidPath(storePath)) {
|
||||||
|
@ -2372,6 +2391,7 @@ Worker::Worker(LocalStore & store)
|
||||||
if (working) abort();
|
if (working) abort();
|
||||||
working = true;
|
working = true;
|
||||||
nrChildren = 0;
|
nrChildren = 0;
|
||||||
|
lastWokenUp = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -2440,9 +2460,7 @@ void Worker::removeGoal(GoalPtr goal)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Wake up goals waiting for any goal to finish. */
|
/* Wake up goals waiting for any goal to finish. */
|
||||||
for (WeakGoals::iterator i = waitingForAnyGoal.begin();
|
foreach (WeakGoals::iterator, i, waitingForAnyGoal) {
|
||||||
i != waitingForAnyGoal.end(); ++i)
|
|
||||||
{
|
|
||||||
GoalPtr goal = i->lock();
|
GoalPtr goal = i->lock();
|
||||||
if (goal) wakeUp(goal);
|
if (goal) wakeUp(goal);
|
||||||
}
|
}
|
||||||
|
@ -2539,6 +2557,13 @@ void Worker::waitForAnyGoal(GoalPtr goal)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Worker::waitForAWhile(GoalPtr goal)
|
||||||
|
{
|
||||||
|
debug("wait for a while");
|
||||||
|
waitingForAWhile.insert(goal);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void Worker::run(const Goals & _topGoals)
|
void Worker::run(const Goals & _topGoals)
|
||||||
{
|
{
|
||||||
for (Goals::iterator i = _topGoals.begin();
|
for (Goals::iterator i = _topGoals.begin();
|
||||||
|
@ -2566,10 +2591,9 @@ void Worker::run(const Goals & _topGoals)
|
||||||
if (topGoals.empty()) break;
|
if (topGoals.empty()) break;
|
||||||
|
|
||||||
/* Wait for input. */
|
/* Wait for input. */
|
||||||
if (!children.empty())
|
if (!children.empty() || !waitingForAWhile.empty())
|
||||||
waitForInput();
|
waitForInput();
|
||||||
else
|
else
|
||||||
/* !!! not when we're polling */
|
|
||||||
assert(!awake.empty());
|
assert(!awake.empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2592,22 +2616,36 @@ void Worker::waitForInput()
|
||||||
the logger pipe of a build, we assume that the builder has
|
the logger pipe of a build, we assume that the builder has
|
||||||
terminated. */
|
terminated. */
|
||||||
|
|
||||||
|
bool useTimeout = false;
|
||||||
|
struct timeval timeout;
|
||||||
|
timeout.tv_usec = 0;
|
||||||
|
time_t before = time(0);
|
||||||
|
|
||||||
/* If we're monitoring for silence on stdout/stderr, sleep until
|
/* If we're monitoring for silence on stdout/stderr, sleep until
|
||||||
the first deadline for any child. */
|
the first deadline for any child. */
|
||||||
struct timeval timeout;
|
|
||||||
if (maxSilentTime != 0) {
|
if (maxSilentTime != 0) {
|
||||||
time_t oldest = 0;
|
time_t oldest = 0;
|
||||||
foreach (Children::iterator, i, children) {
|
foreach (Children::iterator, i, children) {
|
||||||
oldest = oldest == 0 || i->second.lastOutput < oldest
|
oldest = oldest == 0 || i->second.lastOutput < oldest
|
||||||
? i->second.lastOutput : oldest;
|
? i->second.lastOutput : oldest;
|
||||||
}
|
}
|
||||||
time_t now = time(0);
|
useTimeout = true;
|
||||||
timeout.tv_sec = (time_t) (oldest + maxSilentTime) <= now ? 0 :
|
timeout.tv_sec = std::max((time_t) 0, oldest + maxSilentTime - before);
|
||||||
oldest + maxSilentTime - now;
|
|
||||||
timeout.tv_usec = 0;
|
|
||||||
printMsg(lvlVomit, format("sleeping %1% seconds") % timeout.tv_sec);
|
printMsg(lvlVomit, format("sleeping %1% seconds") % timeout.tv_sec);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* If we are polling goals that are waiting for a lock, then wake
|
||||||
|
up after a few seconds at most. */
|
||||||
|
int wakeUpInterval = 3;
|
||||||
|
|
||||||
|
if (!waitingForAWhile.empty()) {
|
||||||
|
useTimeout = true;
|
||||||
|
if (lastWokenUp == 0 && children.empty())
|
||||||
|
printMsg(lvlError, "waiting for locks...");
|
||||||
|
if (lastWokenUp == 0 || lastWokenUp > before) lastWokenUp = before;
|
||||||
|
timeout.tv_sec = std::max((time_t) 0, lastWokenUp + wakeUpInterval - before);
|
||||||
|
} else lastWokenUp = 0;
|
||||||
|
|
||||||
/* Use select() to wait for the input side of any logger pipe to
|
/* Use select() to wait for the input side of any logger pipe to
|
||||||
become `available'. Note that `available' (i.e., non-blocking)
|
become `available'. Note that `available' (i.e., non-blocking)
|
||||||
includes EOF. */
|
includes EOF. */
|
||||||
|
@ -2621,12 +2659,12 @@ void Worker::waitForInput()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (select(fdMax, &fds, 0, 0, maxSilentTime != 0 ? &timeout : 0) == -1) {
|
if (select(fdMax, &fds, 0, 0, useTimeout ? &timeout : 0) == -1) {
|
||||||
if (errno == EINTR) return;
|
if (errno == EINTR) return;
|
||||||
throw SysError("waiting for input");
|
throw SysError("waiting for input");
|
||||||
}
|
}
|
||||||
|
|
||||||
time_t now = time(0);
|
time_t after = time(0);
|
||||||
|
|
||||||
/* Process all available file descriptors. */
|
/* Process all available file descriptors. */
|
||||||
|
|
||||||
|
@ -2662,13 +2700,13 @@ void Worker::waitForInput()
|
||||||
% goal->getName() % rd);
|
% goal->getName() % rd);
|
||||||
string data((char *) buffer, rd);
|
string data((char *) buffer, rd);
|
||||||
goal->handleChildOutput(*k, data);
|
goal->handleChildOutput(*k, data);
|
||||||
j->second.lastOutput = now;
|
j->second.lastOutput = after;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (maxSilentTime != 0 &&
|
if (maxSilentTime != 0 &&
|
||||||
now - j->second.lastOutput >= (time_t) maxSilentTime)
|
after - j->second.lastOutput >= (time_t) maxSilentTime)
|
||||||
{
|
{
|
||||||
printMsg(lvlError,
|
printMsg(lvlError,
|
||||||
format("%1% timed out after %2% seconds of silence")
|
format("%1% timed out after %2% seconds of silence")
|
||||||
|
@ -2676,6 +2714,15 @@ void Worker::waitForInput()
|
||||||
goal->cancel();
|
goal->cancel();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!waitingForAWhile.empty() && lastWokenUp + wakeUpInterval >= after) {
|
||||||
|
lastWokenUp = after;
|
||||||
|
foreach (WeakGoals::iterator, i, waitingForAWhile) {
|
||||||
|
GoalPtr goal = i->lock();
|
||||||
|
if (goal) wakeUp(goal);
|
||||||
|
}
|
||||||
|
waitingForAWhile.clear();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,7 @@ Verbosity buildVerbosity = lvlInfo;
|
||||||
unsigned int maxBuildJobs = 1;
|
unsigned int maxBuildJobs = 1;
|
||||||
bool readOnlyMode = false;
|
bool readOnlyMode = false;
|
||||||
string thisSystem = "unset";
|
string thisSystem = "unset";
|
||||||
unsigned int maxSilentTime = 0;
|
time_t maxSilentTime = 0;
|
||||||
Paths substituters;
|
Paths substituters;
|
||||||
bool useBuildHook = true;
|
bool useBuildHook = true;
|
||||||
bool printBuildTrace = false;
|
bool printBuildTrace = false;
|
||||||
|
|
|
@ -65,7 +65,7 @@ extern string thisSystem;
|
||||||
/* The maximum time in seconds that a builer can go without producing
|
/* The maximum time in seconds that a builer can go without producing
|
||||||
any output on stdout/stderr before it is killed. 0 means
|
any output on stdout/stderr before it is killed. 0 means
|
||||||
infinity. */
|
infinity. */
|
||||||
extern unsigned int maxSilentTime;
|
extern time_t maxSilentTime;
|
||||||
|
|
||||||
/* The substituters. There are programs that can somehow realise a
|
/* The substituters. There are programs that can somehow realise a
|
||||||
store path without building, e.g., by downloading it or copying it
|
store path without building, e.g., by downloading it or copying it
|
||||||
|
|
|
@ -141,9 +141,9 @@ PathLocks::PathLocks(const PathSet & paths, const string & waitMsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void PathLocks::lockPaths(const PathSet & _paths, const string & waitMsg)
|
bool PathLocks::lockPaths(const PathSet & _paths,
|
||||||
|
const string & waitMsg, bool wait)
|
||||||
{
|
{
|
||||||
/* May be called only once! */
|
|
||||||
assert(fds.empty());
|
assert(fds.empty());
|
||||||
|
|
||||||
/* Note that `fds' is built incrementally so that the destructor
|
/* Note that `fds' is built incrementally so that the destructor
|
||||||
|
@ -174,8 +174,15 @@ void PathLocks::lockPaths(const PathSet & _paths, const string & waitMsg)
|
||||||
|
|
||||||
/* Acquire an exclusive lock. */
|
/* Acquire an exclusive lock. */
|
||||||
if (!lockFile(fd, ltWrite, false)) {
|
if (!lockFile(fd, ltWrite, false)) {
|
||||||
|
if (wait) {
|
||||||
if (waitMsg != "") printMsg(lvlError, waitMsg);
|
if (waitMsg != "") printMsg(lvlError, waitMsg);
|
||||||
lockFile(fd, ltWrite, true);
|
lockFile(fd, ltWrite, true);
|
||||||
|
} else {
|
||||||
|
/* Failed to lock this path; release all other
|
||||||
|
locks. */
|
||||||
|
unlock();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
debug(format("lock acquired on `%1%'") % lockPath);
|
debug(format("lock acquired on `%1%'") % lockPath);
|
||||||
|
@ -199,6 +206,8 @@ void PathLocks::lockPaths(const PathSet & _paths, const string & waitMsg)
|
||||||
fds.push_back(FDPair(fd.borrow(), lockPath));
|
fds.push_back(FDPair(fd.borrow(), lockPath));
|
||||||
lockedPaths.insert(lockPath);
|
lockedPaths.insert(lockPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -33,8 +33,9 @@ public:
|
||||||
PathLocks();
|
PathLocks();
|
||||||
PathLocks(const PathSet & paths,
|
PathLocks(const PathSet & paths,
|
||||||
const string & waitMsg = "");
|
const string & waitMsg = "");
|
||||||
void lockPaths(const PathSet & _paths,
|
bool lockPaths(const PathSet & _paths,
|
||||||
const string & waitMsg = "");
|
const string & waitMsg = "",
|
||||||
|
bool wait = true);
|
||||||
~PathLocks();
|
~PathLocks();
|
||||||
void unlock();
|
void unlock();
|
||||||
void setDeletion(bool deletePaths);
|
void setDeletion(bool deletePaths);
|
||||||
|
|
Loading…
Reference in a new issue