From ca9256a789b413b71f424405c8a0d7d37ca36696 Mon Sep 17 00:00:00 2001 From: eldritch horrors Date: Tue, 24 Sep 2024 00:21:16 +0200 Subject: [PATCH] libutil: add an async semaphore implementation like a normal semaphore, but with awaitable acquire actions. this is primarily intended as an intermediate concurrency limiting device in the Worker code, but it may find other uses over time. we do not use std::counting_semaphore as a base because the counter of that is not inspectable as will be needed for Worker. we also do not need atomic operations for cross-thread consistency since we don't have multiple threads (thanks to kj event loops being confined to a single thread) Change-Id: Ie2bcb107f3a2c0185138330f7cbba4cec6cbdd95 --- src/libutil/async-semaphore.hh | 122 ++++++++++++++++++++++++++ src/libutil/meson.build | 1 + tests/unit/libutil/async-semaphore.cc | 74 ++++++++++++++++ tests/unit/meson.build | 2 + 4 files changed, 199 insertions(+) create mode 100644 src/libutil/async-semaphore.hh create mode 100644 tests/unit/libutil/async-semaphore.cc diff --git a/src/libutil/async-semaphore.hh b/src/libutil/async-semaphore.hh new file mode 100644 index 000000000..f8db31a68 --- /dev/null +++ b/src/libutil/async-semaphore.hh @@ -0,0 +1,122 @@ +#pragma once +/// @file +/// @brief A semaphore implementation usable from within a KJ event loop. + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace nix { + +class AsyncSemaphore +{ +public: + class [[nodiscard("destroying a semaphore guard releases the semaphore immediately")]] Token + { + struct Release + { + void operator()(AsyncSemaphore * sem) const + { + sem->unsafeRelease(); + } + }; + + std::unique_ptr parent; + + public: + Token() = default; + Token(AsyncSemaphore & parent, kj::Badge) : parent(&parent) {} + + bool valid() const + { + return parent != nullptr; + } + }; + +private: + struct Waiter + { + kj::PromiseFulfiller & fulfiller; + kj::ListLink link; + kj::List & list; + + Waiter(kj::PromiseFulfiller & fulfiller, kj::List & list) + : fulfiller(fulfiller) + , list(list) + { + list.add(*this); + } + + ~Waiter() + { + if (link.isLinked()) { + list.remove(*this); + } + } + }; + + const unsigned capacity_; + unsigned used_ = 0; + kj::List waiters; + + void unsafeRelease() + { + used_ -= 1; + while (used_ < capacity_ && !waiters.empty()) { + used_ += 1; + auto & w = waiters.front(); + w.fulfiller.fulfill(Token{*this, {}}); + waiters.remove(w); + } + } + +public: + explicit AsyncSemaphore(unsigned capacity) : capacity_(capacity) {} + + KJ_DISALLOW_COPY_AND_MOVE(AsyncSemaphore); + + ~AsyncSemaphore() + { + assert(waiters.empty() && "destroyed a semaphore with active waiters"); + } + + std::optional tryAcquire() + { + if (used_ < capacity_) { + used_ += 1; + return Token{*this, {}}; + } else { + return {}; + } + } + + kj::Promise acquire() + { + if (auto t = tryAcquire()) { + return std::move(*t); + } else { + return kj::newAdaptedPromise(waiters); + } + } + + unsigned capacity() const + { + return capacity_; + } + + unsigned used() const + { + return used_; + } + + unsigned available() const + { + return capacity_ - used_; + } +}; +} diff --git a/src/libutil/meson.build b/src/libutil/meson.build index a3f21de59..89eeed133 100644 --- a/src/libutil/meson.build +++ b/src/libutil/meson.build @@ -53,6 +53,7 @@ libutil_headers = files( 'archive.hh', 'args/root.hh', 'args.hh', + 'async-semaphore.hh', 'backed-string-view.hh', 'box_ptr.hh', 'canon-path.hh', diff --git a/tests/unit/libutil/async-semaphore.cc b/tests/unit/libutil/async-semaphore.cc new file mode 100644 index 000000000..12b52885d --- /dev/null +++ b/tests/unit/libutil/async-semaphore.cc @@ -0,0 +1,74 @@ +#include "async-semaphore.hh" + +#include +#include + +namespace nix { + +TEST(AsyncSemaphore, counting) +{ + kj::EventLoop loop; + kj::WaitScope waitScope(loop); + + AsyncSemaphore sem(2); + + ASSERT_EQ(sem.available(), 2); + ASSERT_EQ(sem.used(), 0); + + auto a = kj::evalNow([&] { return sem.acquire(); }); + ASSERT_EQ(sem.available(), 1); + ASSERT_EQ(sem.used(), 1); + auto b = kj::evalNow([&] { return sem.acquire(); }); + ASSERT_EQ(sem.available(), 0); + ASSERT_EQ(sem.used(), 2); + + auto c = kj::evalNow([&] { return sem.acquire(); }); + auto d = kj::evalNow([&] { return sem.acquire(); }); + + ASSERT_TRUE(a.poll(waitScope)); + ASSERT_TRUE(b.poll(waitScope)); + ASSERT_FALSE(c.poll(waitScope)); + ASSERT_FALSE(d.poll(waitScope)); + + a = nullptr; + ASSERT_TRUE(c.poll(waitScope)); + ASSERT_FALSE(d.poll(waitScope)); + + { + auto lock = b.wait(waitScope); + ASSERT_FALSE(d.poll(waitScope)); + } + + ASSERT_TRUE(d.poll(waitScope)); + + ASSERT_EQ(sem.available(), 0); + ASSERT_EQ(sem.used(), 2); + c = nullptr; + ASSERT_EQ(sem.available(), 1); + ASSERT_EQ(sem.used(), 1); + d = nullptr; + ASSERT_EQ(sem.available(), 2); + ASSERT_EQ(sem.used(), 0); +} + +TEST(AsyncSemaphore, cancelledWaiter) +{ + kj::EventLoop loop; + kj::WaitScope waitScope(loop); + + AsyncSemaphore sem(1); + + auto a = kj::evalNow([&] { return sem.acquire(); }); + auto b = kj::evalNow([&] { return sem.acquire(); }); + auto c = kj::evalNow([&] { return sem.acquire(); }); + + ASSERT_TRUE(a.poll(waitScope)); + ASSERT_FALSE(b.poll(waitScope)); + + b = nullptr; + a = nullptr; + + ASSERT_TRUE(c.poll(waitScope)); +} + +} diff --git a/tests/unit/meson.build b/tests/unit/meson.build index 8ff0b5ec5..5baf10de2 100644 --- a/tests/unit/meson.build +++ b/tests/unit/meson.build @@ -39,6 +39,7 @@ liblixutil_test_support = declare_dependency( ) libutil_tests_sources = files( + 'libutil/async-semaphore.cc', 'libutil/canon-path.cc', 'libutil/checked-arithmetic.cc', 'libutil/chunked-vector.cc', @@ -76,6 +77,7 @@ libutil_tester = executable( liblixexpr_mstatic, liblixutil_test_support, nlohmann_json, + kj, ], cpp_pch : cpp_pch, )