diff --git a/src/libutil/async-semaphore.hh b/src/libutil/async-semaphore.hh new file mode 100644 index 000000000..f8db31a68 --- /dev/null +++ b/src/libutil/async-semaphore.hh @@ -0,0 +1,122 @@ +#pragma once +/// @file +/// @brief A semaphore implementation usable from within a KJ event loop. + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace nix { + +class AsyncSemaphore +{ +public: + class [[nodiscard("destroying a semaphore guard releases the semaphore immediately")]] Token + { + struct Release + { + void operator()(AsyncSemaphore * sem) const + { + sem->unsafeRelease(); + } + }; + + std::unique_ptr parent; + + public: + Token() = default; + Token(AsyncSemaphore & parent, kj::Badge) : parent(&parent) {} + + bool valid() const + { + return parent != nullptr; + } + }; + +private: + struct Waiter + { + kj::PromiseFulfiller & fulfiller; + kj::ListLink link; + kj::List & list; + + Waiter(kj::PromiseFulfiller & fulfiller, kj::List & list) + : fulfiller(fulfiller) + , list(list) + { + list.add(*this); + } + + ~Waiter() + { + if (link.isLinked()) { + list.remove(*this); + } + } + }; + + const unsigned capacity_; + unsigned used_ = 0; + kj::List waiters; + + void unsafeRelease() + { + used_ -= 1; + while (used_ < capacity_ && !waiters.empty()) { + used_ += 1; + auto & w = waiters.front(); + w.fulfiller.fulfill(Token{*this, {}}); + waiters.remove(w); + } + } + +public: + explicit AsyncSemaphore(unsigned capacity) : capacity_(capacity) {} + + KJ_DISALLOW_COPY_AND_MOVE(AsyncSemaphore); + + ~AsyncSemaphore() + { + assert(waiters.empty() && "destroyed a semaphore with active waiters"); + } + + std::optional tryAcquire() + { + if (used_ < capacity_) { + used_ += 1; + return Token{*this, {}}; + } else { + return {}; + } + } + + kj::Promise acquire() + { + if (auto t = tryAcquire()) { + return std::move(*t); + } else { + return kj::newAdaptedPromise(waiters); + } + } + + unsigned capacity() const + { + return capacity_; + } + + unsigned used() const + { + return used_; + } + + unsigned available() const + { + return capacity_ - used_; + } +}; +} diff --git a/src/libutil/meson.build b/src/libutil/meson.build index a3f21de59..89eeed133 100644 --- a/src/libutil/meson.build +++ b/src/libutil/meson.build @@ -53,6 +53,7 @@ libutil_headers = files( 'archive.hh', 'args/root.hh', 'args.hh', + 'async-semaphore.hh', 'backed-string-view.hh', 'box_ptr.hh', 'canon-path.hh', diff --git a/tests/unit/libutil/async-semaphore.cc b/tests/unit/libutil/async-semaphore.cc new file mode 100644 index 000000000..12b52885d --- /dev/null +++ b/tests/unit/libutil/async-semaphore.cc @@ -0,0 +1,74 @@ +#include "async-semaphore.hh" + +#include +#include + +namespace nix { + +TEST(AsyncSemaphore, counting) +{ + kj::EventLoop loop; + kj::WaitScope waitScope(loop); + + AsyncSemaphore sem(2); + + ASSERT_EQ(sem.available(), 2); + ASSERT_EQ(sem.used(), 0); + + auto a = kj::evalNow([&] { return sem.acquire(); }); + ASSERT_EQ(sem.available(), 1); + ASSERT_EQ(sem.used(), 1); + auto b = kj::evalNow([&] { return sem.acquire(); }); + ASSERT_EQ(sem.available(), 0); + ASSERT_EQ(sem.used(), 2); + + auto c = kj::evalNow([&] { return sem.acquire(); }); + auto d = kj::evalNow([&] { return sem.acquire(); }); + + ASSERT_TRUE(a.poll(waitScope)); + ASSERT_TRUE(b.poll(waitScope)); + ASSERT_FALSE(c.poll(waitScope)); + ASSERT_FALSE(d.poll(waitScope)); + + a = nullptr; + ASSERT_TRUE(c.poll(waitScope)); + ASSERT_FALSE(d.poll(waitScope)); + + { + auto lock = b.wait(waitScope); + ASSERT_FALSE(d.poll(waitScope)); + } + + ASSERT_TRUE(d.poll(waitScope)); + + ASSERT_EQ(sem.available(), 0); + ASSERT_EQ(sem.used(), 2); + c = nullptr; + ASSERT_EQ(sem.available(), 1); + ASSERT_EQ(sem.used(), 1); + d = nullptr; + ASSERT_EQ(sem.available(), 2); + ASSERT_EQ(sem.used(), 0); +} + +TEST(AsyncSemaphore, cancelledWaiter) +{ + kj::EventLoop loop; + kj::WaitScope waitScope(loop); + + AsyncSemaphore sem(1); + + auto a = kj::evalNow([&] { return sem.acquire(); }); + auto b = kj::evalNow([&] { return sem.acquire(); }); + auto c = kj::evalNow([&] { return sem.acquire(); }); + + ASSERT_TRUE(a.poll(waitScope)); + ASSERT_FALSE(b.poll(waitScope)); + + b = nullptr; + a = nullptr; + + ASSERT_TRUE(c.poll(waitScope)); +} + +} diff --git a/tests/unit/meson.build b/tests/unit/meson.build index 8ff0b5ec5..5baf10de2 100644 --- a/tests/unit/meson.build +++ b/tests/unit/meson.build @@ -39,6 +39,7 @@ liblixutil_test_support = declare_dependency( ) libutil_tests_sources = files( + 'libutil/async-semaphore.cc', 'libutil/canon-path.cc', 'libutil/checked-arithmetic.cc', 'libutil/chunked-vector.cc', @@ -76,6 +77,7 @@ libutil_tester = executable( liblixexpr_mstatic, liblixutil_test_support, nlohmann_json, + kj, ], cpp_pch : cpp_pch, )