Compare commits

...

6 commits

Author SHA1 Message Date
jade 6e34a4d30f libstore: fix queryValidPaths concurrency
The lock usage was obviously wrong so it was entirely serialized. This
has the predicted speedups, the only question is whether it is sound
because it's exposing a bunch of new code to actual concurrency.

I did audit all the stores' queryPathInfoUncached implementations and
they all look *intended* to be thread safe, but whether that is actually
sound or not: lol lmao. I am highly confident in the s3 one because it
is calling s3 sdk methods that are thread safe and has no actual state.

Others are using Pool and look to be *supposed* to be thread safe, but
unsure if they actually are.

Change-Id: I0369152a510e878b5ac56c9ac956a98d48cd5fef
2024-06-15 17:48:04 -07:00
jade cf5df9bc4a store-api: fix/clarify capture lifetimes in copyPaths
This seems to fix a use of stack after return.

Change-Id: If690a6defb9a3225684685132cf78b227e271447
2024-06-15 17:48:04 -07:00
jade 14e390e679 libstore: work around aws sdk log spam at debug level
aws-sdk-cpp spams logs about sending TLS data in the otherwise rather
helpful debug logs. I've filed a PR upstream to stop it, but for now we
can just fix their verbosity ourselves.

Upstream-PR: https://github.com/aws/aws-sdk-cpp/pull/3003
Change-Id: I0c41a50d5f5958106836d6345843f4b05b9c8981
2024-06-15 17:48:04 -07:00
jade 25c0f18b9f s3: delete obsolete ifdefs
The versions checked for are so old that we can just drop support.

Change-Id: Ib9cf136d1cb9a4a91a6613102c4fd15e1190363b
2024-06-15 17:47:16 -07:00
jade c40c572e52 tree-wide: make logger a shared_ptr so it actually destructs at the end of the program
It was, of course, just leaked before. Fixing this naturally exposed some bugs.

Change-Id: I013d4631ae424929e28c2705943410592116c965
2024-06-15 17:47:16 -07:00
jade 2c60ac102a libutil: tidy Sync and fix its move constructor
There was a previously-unused move constructor that just called abort,
which makes no sense since it ought to just be `= delete` if you don't
want one (commit history says it was Eelco doing an optimistic
performance optimisation in 2016, so it probably would not pass review
today).

However, a Lock has some great reasons to be moved! You might need to
unlock it early, for instance, or give it to someone else. So we change
the move constructor to instead hollow out the moved-from object and
make it unusable.

Change-Id: Iff2a4c2f7ebd0a558c4866d4dfe526bc8558bed7
2024-06-15 17:47:16 -07:00
16 changed files with 221 additions and 97 deletions

View file

@ -222,7 +222,8 @@ brotli = [
openssl = dependency('libcrypto', 'openssl', required : true)
aws_sdk = dependency('aws-cpp-sdk-core', required : false)
# FIXME: confirm we actually support such old versions of aws-sdk-cpp
aws_sdk = dependency('aws-cpp-sdk-core', required : false, version : '>=1.8')
aws_sdk_transfer = dependency('aws-cpp-sdk-transfer', required : aws_sdk.found(), fallback : ['aws_sdk', 'aws_cpp_sdk_transfer_dep'])
if aws_sdk.found()
# The AWS pkg-config adds -std=c++11.
@ -234,12 +235,6 @@ if aws_sdk.found()
links : true,
sources : true,
)
s = aws_sdk.version().split('.')
configdata += {
'AWS_VERSION_MAJOR': s[0].to_int(),
'AWS_VERSION_MINOR': s[1].to_int(),
'AWS_VERSION_PATCH': s[2].to_int(),
}
aws_sdk_transfer = aws_sdk_transfer.partial_dependency(
compile_args : false,
includes : true,

View file

@ -60,7 +60,7 @@ static bool allSupportedLocally(Store & store, const std::set<std::string>& requ
static int main_build_remote(int argc, char * * argv)
{
{
logger = makeJSONLogger(*logger);
logger.replace(makeJSONLogger(*logger));
/* Ensure we don't get any SSH passphrase or host key popups. */
unsetenv("DISPLAY");

View file

@ -20,14 +20,14 @@ LogFormat parseLogFormat(const std::string & logFormatStr) {
throw Error("option 'log-format' has an invalid value '%s'", logFormatStr);
}
Logger * makeDefaultLogger() {
std::shared_ptr<Logger> makeDefaultLogger() {
switch (defaultLogFormat) {
case LogFormat::raw:
return makeSimpleLogger(false);
case LogFormat::rawWithLogs:
return makeSimpleLogger(true);
case LogFormat::internalJSON:
return makeJSONLogger(*makeSimpleLogger(true));
return makeJSONLogger(makeSimpleLogger(true));
case LogFormat::bar:
return makeProgressBar();
case LogFormat::barWithLogs: {
@ -50,7 +50,7 @@ void setLogFormat(const LogFormat & logFormat) {
}
void createDefaultLogger() {
logger = makeDefaultLogger();
logger.replace(makeDefaultLogger());
}
}

View file

@ -1,10 +1,10 @@
#include "progress-bar.hh"
#include "file-system.hh"
#include "strings.hh"
#include "sync.hh"
#include "store-api.hh"
#include "names.hh"
#include "terminal.hh"
#include <atomic>
#include <map>
#include <thread>
#include <sstream>
@ -119,7 +119,18 @@ public:
{
{
auto state(state_.lock());
if (!state->active) return;
if (!state->active) {
// Unlock immediately so we don't deadlock.
{
auto _ = std::move(state);
}
// Even if the thread is inactive, the handle needs to be
// explicitly joined to not call terminate if it is destructed.
if (updateThread.joinable()) {
updateThread.join();
}
return;
}
state->active = false;
writeToStderr("\r\e[K");
updateCV.notify_one();
@ -531,19 +542,19 @@ public:
}
};
Logger * makeProgressBar()
std::shared_ptr<Logger> makeProgressBar()
{
return new ProgressBar(shouldANSI());
return std::make_shared<ProgressBar>(shouldANSI());
}
void startProgressBar()
{
logger = makeProgressBar();
logger.replace(makeProgressBar());
}
void stopProgressBar()
{
auto progressBar = dynamic_cast<ProgressBar *>(logger);
auto progressBar = dynamic_cast<ProgressBar *>(&**logger);
if (progressBar) progressBar->stop();
}

View file

@ -5,7 +5,7 @@
namespace nix {
Logger * makeProgressBar();
std::shared_ptr<Logger> makeProgressBar();
void startProgressBar();

View file

@ -5,7 +5,7 @@ namespace nix {
void commonChildInit()
{
logger = makeSimpleLogger();
logger.replace(makeSimpleLogger());
const static std::string pathNullDevice = "/dev/null";
restoreProcessContext(false);

View file

@ -873,9 +873,9 @@ void DerivationGoal::cleanupPostOutputsRegisteredModeNonCheck()
{
}
void runPostBuildHook(
static void runPostBuildHook(
Store & store,
Logger & logger,
std::shared_ptr<Logger> logger,
const StorePath & drvPath,
const StorePathSet & outputPaths)
{

View file

@ -2156,7 +2156,7 @@ void LocalDerivationGoal::runChild()
/* Execute the program. This should not return. */
if (drv->isBuiltin()) {
try {
logger = makeJSONLogger(*logger);
logger.replace(makeJSONLogger(*logger));
BasicDerivation & drv2(*drv);
for (auto & e : drv2.env)

View file

@ -263,7 +263,7 @@ struct ClientSettings
}
};
static void performOp(TunnelLogger * logger, ref<Store> store,
static void performOp(std::shared_ptr<TunnelLogger> logger, ref<Store> store,
TrustedFlag trusted, RecursiveFlag recursive, WorkerProto::Version clientVersion,
Source & from, BufferedSink & to, WorkerProto::Op op)
{
@ -1014,11 +1014,11 @@ void processConnection(
if (clientVersion < 0x10a)
throw Error("the Nix client version is too old");
auto tunnelLogger = new TunnelLogger(to, clientVersion);
auto prevLogger = nix::logger;
auto tunnelLogger = std::make_shared<TunnelLogger>(to, clientVersion);
auto prevLogger = *nix::logger;
// FIXME
if (!recursive)
logger = tunnelLogger;
prevLogger = nix::logger.replace(tunnelLogger);
unsigned int opCount = 0;

View file

@ -55,12 +55,14 @@ class AwsLogger : public Aws::Utils::Logging::FormattedLogSystem
void ProcessFormattedStatement(Aws::String && statement) override
{
// FIXME: workaround for truly excessive log spam in debug level: https://github.com/aws/aws-sdk-cpp/pull/3003
if ((statement.find("(SSLDataIn)") != std::string::npos || statement.find("(SSLDataOut)") != std::string::npos) && verbosity <= lvlDebug) {
return;
}
debug("AWS: %s", chomp(statement));
}
#if !(AWS_VERSION_MAJOR <= 1 && AWS_VERSION_MINOR <= 7 && AWS_VERSION_PATCH <= 115)
void Flush() override {}
#endif
};
static void initAWS()
@ -100,12 +102,7 @@ S3Helper::S3Helper(
: std::dynamic_pointer_cast<Aws::Auth::AWSCredentialsProvider>(
std::make_shared<Aws::Auth::ProfileConfigFileAWSCredentialsProvider>(profile.c_str())),
*config,
// FIXME: https://github.com/aws/aws-sdk-cpp/issues/759
#if AWS_VERSION_MAJOR == 1 && AWS_VERSION_MINOR < 3
false,
#else
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
#endif
endpoint.empty()))
{
}

View file

@ -792,17 +792,31 @@ StorePathSet Store::queryValidPaths(const StorePathSet & paths, SubstituteFlag m
auto doQuery = [&](const StorePath & path) {
checkInterrupt();
auto state(state_.lock());
bool exists = false;
std::optional<std::exception_ptr> newExc;
try {
auto info = queryPathInfo(path);
state->valid.insert(path);
queryPathInfo(path);
exists = true;
} catch (InvalidPath &) {
} catch (...) {
state->exc = std::current_exception();
newExc = std::current_exception();
}
{
auto state(state_.lock());
if (exists) {
state->valid.insert(path);
}
if (newExc.has_value()) {
state->exc = *newExc;
}
assert(state->left);
if (!--state->left)
wakeup.notify_one();
}
assert(state->left);
if (!--state->left)
wakeup.notify_one();
};
for (auto & path : paths)
@ -1163,25 +1177,30 @@ std::map<StorePath, StorePath> copyPaths(
ValidPathInfo infoForDst = *info;
infoForDst.path = storePathForDst;
auto source = sinkToSource([&](Sink & sink) {
// We can reasonably assume that the copy will happen whenever we
// read the path, so log something about that at that point
auto srcUri = srcStore.getUri();
auto dstUri = dstStore.getUri();
auto storePathS = srcStore.printStorePath(missingPath);
Activity act(*logger, lvlInfo, actCopyPath,
makeCopyPathMessage(srcUri, dstUri, storePathS),
{storePathS, srcUri, dstUri});
PushActivity pact(act.id);
auto source =
sinkToSource([&srcStore, &dstStore, missingPath = missingPath, info = std::move(info)](Sink & sink) {
// We can reasonably assume that the copy will happen whenever we
// read the path, so log something about that at that point
auto srcUri = srcStore.getUri();
auto dstUri = dstStore.getUri();
auto storePathS = srcStore.printStorePath(missingPath);
Activity act(
*logger,
lvlInfo,
actCopyPath,
makeCopyPathMessage(srcUri, dstUri, storePathS),
{storePathS, srcUri, dstUri}
);
PushActivity pact(act.id);
LambdaSink progressSink([&, total = 0ULL](std::string_view data) mutable {
total += data.size();
act.progress(total, info->narSize);
LambdaSink progressSink([&, total = 0ULL](std::string_view data) mutable {
total += data.size();
act.progress(total, info->narSize);
});
TeeSink tee{sink, progressSink};
srcStore.narFromPath(missingPath, tee);
});
TeeSink tee { sink, progressSink };
srcStore.narFromPath(missingPath, tee);
});
pathsToCopy.push_back(std::pair{infoForDst, std::move(source)});
}

View file

@ -26,8 +26,6 @@ void setCurActivity(const ActivityId activityId)
curActivity = activityId;
}
Logger * logger = makeSimpleLogger(true);
void Logger::warn(const std::string & msg)
{
log(lvlWarn, ANSI_WARNING "warning:" ANSI_NORMAL " " + msg);
@ -108,6 +106,38 @@ public:
}
};
// XXX: We have to leak this so that we can not destroy the damned thing on
// shutdown because of destructor ordering. Bad evil horrible.
LoggerWrapper &logger = *(new LoggerWrapper(makeSimpleLogger(true)));
class NullLogger : public Logger
{
void log(Verbosity lvl, std::string_view s) override {}
void logEI(const ErrorInfo & ei) override {}
};
Logger * globalNullLogger = new NullLogger();
Logger * globalSimpleLogger = new SimpleLogger(false);
[[gnu::destructor(65535)]]
void switchToShutdownLogger()
{
static auto dontDelete = [](auto d) {};
// XXX: Replace the logger with a global logger that is leaked at the end of the process
// This is so that custom loggers with meaningful destructors are guaranteed to actually be destroyed.
// Genuinely I am so sorry for this code.
//
// If there's a tty on the other end, we assume it's probably ok if the log
// format regresses to basic, and we consider it more important that the
// logs get out at all.
//
// FIXME: Replace this madness with explicitly passing around a shared
// pointer to const LoggerWrapper, because holy cow globals are broken.
logger.replace(std::shared_ptr<Logger>(isatty(STDERR_FILENO) ? globalSimpleLogger : globalNullLogger, dontDelete));
}
Verbosity verbosity = lvlInfo;
void writeToStderr(std::string_view s)
@ -122,18 +152,18 @@ void writeToStderr(std::string_view s)
}
}
Logger * makeSimpleLogger(bool printBuildLogs)
std::shared_ptr<Logger> makeSimpleLogger(bool printBuildLogs)
{
return new SimpleLogger(printBuildLogs);
return std::make_shared<SimpleLogger>(printBuildLogs);
}
std::atomic<uint64_t> nextId{0};
Activity::Activity(Logger & logger, Verbosity lvl, ActivityType type,
Activity::Activity(std::shared_ptr<Logger> logger, Verbosity lvl, ActivityType type,
const std::string & s, const Logger::Fields & fields, ActivityId parent)
: logger(logger), id(nextId++ + (((uint64_t) getpid()) << 32))
{
logger.startActivity(id, lvl, type, s, fields, parent);
logger->startActivity(id, lvl, type, s, fields, parent);
}
void to_json(nlohmann::json & json, std::shared_ptr<Pos> pos)
@ -152,9 +182,9 @@ void to_json(nlohmann::json & json, std::shared_ptr<Pos> pos)
}
struct JSONLogger : Logger {
Logger & prevLogger;
std::shared_ptr<Logger> prevLogger;
JSONLogger(Logger & prevLogger) : prevLogger(prevLogger) { }
JSONLogger(std::shared_ptr<Logger> prevLogger) : prevLogger(prevLogger) { }
bool isVerbose() override {
return true;
@ -175,7 +205,7 @@ struct JSONLogger : Logger {
void write(const nlohmann::json & json)
{
prevLogger.log(lvlError, "@nix " + json.dump(-1, ' ', false, nlohmann::json::error_handler_t::replace));
prevLogger->log(lvlError, "@nix " + json.dump(-1, ' ', false, nlohmann::json::error_handler_t::replace));
}
void log(Verbosity lvl, std::string_view s) override
@ -247,9 +277,9 @@ struct JSONLogger : Logger {
}
};
Logger * makeJSONLogger(Logger & prevLogger)
std::shared_ptr<Logger> makeJSONLogger(std::shared_ptr<Logger> prevLogger)
{
return new JSONLogger(prevLogger);
return std::make_shared<JSONLogger>(std::move(prevLogger));
}
static Logger::Fields getFields(nlohmann::json & json)
@ -325,7 +355,7 @@ bool handleJSONLogMessage(const std::string & msg,
Activity::~Activity()
{
try {
logger.stopActivity(id);
logger->stopActivity(id);
} catch (...) {
ignoreException();
}

View file

@ -1,7 +1,6 @@
#pragma once
///@file
#include "types.hh"
#include "error.hh"
#include "config.hh"
@ -132,14 +131,14 @@ void setCurActivity(const ActivityId activityId);
struct Activity
{
Logger & logger;
std::shared_ptr<Logger> logger;
const ActivityId id;
Activity(Logger & logger, Verbosity lvl, ActivityType type, const std::string & s = "",
Activity(std::shared_ptr<Logger> logger, Verbosity lvl, ActivityType type, const std::string & s = "",
const Logger::Fields & fields = {}, ActivityId parent = getCurActivity());
Activity(Logger & logger, ActivityType type,
Activity(std::shared_ptr<Logger> logger, ActivityType type,
const Logger::Fields & fields = {}, ActivityId parent = getCurActivity())
: Activity(logger, lvlError, type, "", fields, parent) { };
@ -163,7 +162,7 @@ struct Activity
void result(ResultType type, const Logger::Fields & fields) const
{
logger.result(id, type, fields);
logger->result(id, type, fields);
}
friend class Logger;
@ -176,11 +175,39 @@ struct PushActivity
~PushActivity() { setCurActivity(prevAct); }
};
extern Logger * logger;
/** Wrapper to make replacing the global logger object thread-safe, since we do
* that in various places.
*
* The actual usage of shared_ptr across threads is ok, only replacing it is
* not thread safe.
*/
struct LoggerWrapper
{
std::atomic<std::shared_ptr<Logger>> inner;
Logger * makeSimpleLogger(bool printBuildLogs = true);
explicit LoggerWrapper(std::shared_ptr<Logger> inner) : inner(inner) {}
Logger * makeJSONLogger(Logger & prevLogger);
const std::shared_ptr<Logger> operator->() const {
return inner.load(std::memory_order_relaxed);
}
const std::shared_ptr<Logger> operator*() const {
return inner.load(std::memory_order_relaxed);
}
/**
* Replaces the logger with a new one atomically.
*/
const std::shared_ptr<Logger> replace(std::shared_ptr<Logger> newInner) {
return inner.exchange(std::move(newInner), std::memory_order_relaxed);
}
};
extern LoggerWrapper& logger;
std::shared_ptr<Logger> makeSimpleLogger(bool printBuildLogs = true);
std::shared_ptr<Logger> makeJSONLogger(std::shared_ptr<Logger> prevLogger);
/**
* suppress msgs > this

View file

@ -190,7 +190,7 @@ static int childEntry(void * arg)
pid_t startProcess(std::function<void()> fun, const ProcessOptions & options)
{
std::function<void()> wrapper = [&]() {
logger = makeSimpleLogger();
logger.replace(makeSimpleLogger());
try {
#if __linux__
if (options.dieWithParent && prctl(PR_SET_PDEATHSIG, SIGKILL) == -1)

View file

@ -40,50 +40,96 @@ public:
class Lock
{
private:
// Non-owning pointer. This would be an
// optional<reference_wrapper<Sync>> if it didn't break gdb accessing
// Lock values (as of 2024-06-15, gdb 14.2)
Sync * s;
std::unique_lock<M> lk;
friend Sync;
Lock(Sync * s) : s(s), lk(s->mutex) { }
public:
Lock(Lock && l) : s(l.s) { abort(); }
Lock(const Lock & l) = delete;
~Lock() { }
T * operator -> () { return &s->data; }
T & operator * () { return s->data; }
Lock(Sync &s) : s(&s), lk(s.mutex) { }
void wait(std::condition_variable & cv)
inline void checkLockingInvariants()
{
assert(s);
assert(lk.owns_lock());
}
public:
Lock(Lock && l) : s(l.s), lk(std::move(l.lk))
{
l.s = nullptr;
}
Lock(const Lock & l) = delete;
~Lock() { }
T * operator -> ()
{
checkLockingInvariants();
return &s->data;
}
T & operator * ()
{
checkLockingInvariants();
return s->data;
}
/**
* Wait for the given condition variable with no timeout.
*
* May spuriously wake up.
*/
void wait(std::condition_variable & cv)
{
checkLockingInvariants();
cv.wait(lk);
}
/**
* Wait for the given condition variable for a maximum elapsed time of \p duration.
*
* May spuriously wake up.
*/
template<class Rep, class Period>
std::cv_status wait_for(std::condition_variable & cv,
const std::chrono::duration<Rep, Period> & duration)
{
assert(s);
checkLockingInvariants();
return cv.wait_for(lk, duration);
}
/**
* Wait for the given condition variable for a maximum elapsed time of \p duration.
* Calls \p pred to check if the wakeup should be heeded: \p pred
* returning false will ignore the wakeup.
*/
template<class Rep, class Period, class Predicate>
bool wait_for(std::condition_variable & cv,
const std::chrono::duration<Rep, Period> & duration,
Predicate pred)
{
assert(s);
checkLockingInvariants();
return cv.wait_for(lk, duration, pred);
}
/**
* Wait for the given condition variable or until the time point \p duration.
*/
template<class Clock, class Duration>
std::cv_status wait_until(std::condition_variable & cv,
const std::chrono::time_point<Clock, Duration> & duration)
{
assert(s);
checkLockingInvariants();
return cv.wait_until(lk, duration);
}
};
Lock lock() { return Lock(this); }
/**
* Lock this Sync and return a RAII guard object.
*/
Lock lock() { return Lock(*this); }
};
}

View file

@ -27,16 +27,15 @@ namespace nix {
};
class CaptureLogging {
Logger * oldLogger;
std::unique_ptr<CaptureLogger> tempLogger;
std::shared_ptr<Logger> oldLogger;
std::shared_ptr<CaptureLogger> tempLogger;
public:
CaptureLogging() : tempLogger(std::make_unique<CaptureLogger>()) {
oldLogger = logger;
logger = tempLogger.get();
CaptureLogging() : tempLogger(std::make_shared<CaptureLogger>()) {
oldLogger = logger.replace(tempLogger);
}
~CaptureLogging() {
logger = oldLogger;
logger.replace(oldLogger);
}
std::string get() const {