forked from lix-project/lix
libutil: add an async semaphore implementation
like a normal semaphore, but with awaitable acquire actions. this is
primarily intended as an intermediate concurrency limiting device in
the Worker code, but it may find other uses over time. we do not use
std::counting_semaphore as a base because the counter of that is not
inspectable as will be needed for Worker. we also do not need atomic
operations for cross-thread consistency since we don't have multiple
threads (thanks to kj event loops being confined to a single thread)
Change-Id: Ie2bcb107f3a2c0185138330f7cbba4cec6cbdd95
This commit is contained in:
parent
4b66e1e24f
commit
ca9256a789
4 changed files with 199 additions and 0 deletions
122
src/libutil/async-semaphore.hh
Normal file
122
src/libutil/async-semaphore.hh
Normal file
|
@ -0,0 +1,122 @@
|
|||
#pragma once
|
||||
/// @file
|
||||
/// @brief A semaphore implementation usable from within a KJ event loop.
|
||||
|
||||
#include <cassert>
|
||||
#include <kj/async.h>
|
||||
#include <kj/common.h>
|
||||
#include <kj/exception.h>
|
||||
#include <kj/list.h>
|
||||
#include <kj/source-location.h>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
|
||||
namespace nix {
|
||||
|
||||
class AsyncSemaphore
|
||||
{
|
||||
public:
|
||||
class [[nodiscard("destroying a semaphore guard releases the semaphore immediately")]] Token
|
||||
{
|
||||
struct Release
|
||||
{
|
||||
void operator()(AsyncSemaphore * sem) const
|
||||
{
|
||||
sem->unsafeRelease();
|
||||
}
|
||||
};
|
||||
|
||||
std::unique_ptr<AsyncSemaphore, Release> parent;
|
||||
|
||||
public:
|
||||
Token() = default;
|
||||
Token(AsyncSemaphore & parent, kj::Badge<AsyncSemaphore>) : parent(&parent) {}
|
||||
|
||||
bool valid() const
|
||||
{
|
||||
return parent != nullptr;
|
||||
}
|
||||
};
|
||||
|
||||
private:
|
||||
struct Waiter
|
||||
{
|
||||
kj::PromiseFulfiller<Token> & fulfiller;
|
||||
kj::ListLink<Waiter> link;
|
||||
kj::List<Waiter, &Waiter::link> & list;
|
||||
|
||||
Waiter(kj::PromiseFulfiller<Token> & fulfiller, kj::List<Waiter, &Waiter::link> & list)
|
||||
: fulfiller(fulfiller)
|
||||
, list(list)
|
||||
{
|
||||
list.add(*this);
|
||||
}
|
||||
|
||||
~Waiter()
|
||||
{
|
||||
if (link.isLinked()) {
|
||||
list.remove(*this);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const unsigned capacity_;
|
||||
unsigned used_ = 0;
|
||||
kj::List<Waiter, &Waiter::link> waiters;
|
||||
|
||||
void unsafeRelease()
|
||||
{
|
||||
used_ -= 1;
|
||||
while (used_ < capacity_ && !waiters.empty()) {
|
||||
used_ += 1;
|
||||
auto & w = waiters.front();
|
||||
w.fulfiller.fulfill(Token{*this, {}});
|
||||
waiters.remove(w);
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
explicit AsyncSemaphore(unsigned capacity) : capacity_(capacity) {}
|
||||
|
||||
KJ_DISALLOW_COPY_AND_MOVE(AsyncSemaphore);
|
||||
|
||||
~AsyncSemaphore()
|
||||
{
|
||||
assert(waiters.empty() && "destroyed a semaphore with active waiters");
|
||||
}
|
||||
|
||||
std::optional<Token> tryAcquire()
|
||||
{
|
||||
if (used_ < capacity_) {
|
||||
used_ += 1;
|
||||
return Token{*this, {}};
|
||||
} else {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
kj::Promise<Token> acquire()
|
||||
{
|
||||
if (auto t = tryAcquire()) {
|
||||
return std::move(*t);
|
||||
} else {
|
||||
return kj::newAdaptedPromise<Token, Waiter>(waiters);
|
||||
}
|
||||
}
|
||||
|
||||
unsigned capacity() const
|
||||
{
|
||||
return capacity_;
|
||||
}
|
||||
|
||||
unsigned used() const
|
||||
{
|
||||
return used_;
|
||||
}
|
||||
|
||||
unsigned available() const
|
||||
{
|
||||
return capacity_ - used_;
|
||||
}
|
||||
};
|
||||
}
|
|
@ -53,6 +53,7 @@ libutil_headers = files(
|
|||
'archive.hh',
|
||||
'args/root.hh',
|
||||
'args.hh',
|
||||
'async-semaphore.hh',
|
||||
'backed-string-view.hh',
|
||||
'box_ptr.hh',
|
||||
'canon-path.hh',
|
||||
|
|
74
tests/unit/libutil/async-semaphore.cc
Normal file
74
tests/unit/libutil/async-semaphore.cc
Normal file
|
@ -0,0 +1,74 @@
|
|||
#include "async-semaphore.hh"
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <kj/async.h>
|
||||
|
||||
namespace nix {
|
||||
|
||||
TEST(AsyncSemaphore, counting)
|
||||
{
|
||||
kj::EventLoop loop;
|
||||
kj::WaitScope waitScope(loop);
|
||||
|
||||
AsyncSemaphore sem(2);
|
||||
|
||||
ASSERT_EQ(sem.available(), 2);
|
||||
ASSERT_EQ(sem.used(), 0);
|
||||
|
||||
auto a = kj::evalNow([&] { return sem.acquire(); });
|
||||
ASSERT_EQ(sem.available(), 1);
|
||||
ASSERT_EQ(sem.used(), 1);
|
||||
auto b = kj::evalNow([&] { return sem.acquire(); });
|
||||
ASSERT_EQ(sem.available(), 0);
|
||||
ASSERT_EQ(sem.used(), 2);
|
||||
|
||||
auto c = kj::evalNow([&] { return sem.acquire(); });
|
||||
auto d = kj::evalNow([&] { return sem.acquire(); });
|
||||
|
||||
ASSERT_TRUE(a.poll(waitScope));
|
||||
ASSERT_TRUE(b.poll(waitScope));
|
||||
ASSERT_FALSE(c.poll(waitScope));
|
||||
ASSERT_FALSE(d.poll(waitScope));
|
||||
|
||||
a = nullptr;
|
||||
ASSERT_TRUE(c.poll(waitScope));
|
||||
ASSERT_FALSE(d.poll(waitScope));
|
||||
|
||||
{
|
||||
auto lock = b.wait(waitScope);
|
||||
ASSERT_FALSE(d.poll(waitScope));
|
||||
}
|
||||
|
||||
ASSERT_TRUE(d.poll(waitScope));
|
||||
|
||||
ASSERT_EQ(sem.available(), 0);
|
||||
ASSERT_EQ(sem.used(), 2);
|
||||
c = nullptr;
|
||||
ASSERT_EQ(sem.available(), 1);
|
||||
ASSERT_EQ(sem.used(), 1);
|
||||
d = nullptr;
|
||||
ASSERT_EQ(sem.available(), 2);
|
||||
ASSERT_EQ(sem.used(), 0);
|
||||
}
|
||||
|
||||
TEST(AsyncSemaphore, cancelledWaiter)
|
||||
{
|
||||
kj::EventLoop loop;
|
||||
kj::WaitScope waitScope(loop);
|
||||
|
||||
AsyncSemaphore sem(1);
|
||||
|
||||
auto a = kj::evalNow([&] { return sem.acquire(); });
|
||||
auto b = kj::evalNow([&] { return sem.acquire(); });
|
||||
auto c = kj::evalNow([&] { return sem.acquire(); });
|
||||
|
||||
ASSERT_TRUE(a.poll(waitScope));
|
||||
ASSERT_FALSE(b.poll(waitScope));
|
||||
|
||||
b = nullptr;
|
||||
a = nullptr;
|
||||
|
||||
ASSERT_TRUE(c.poll(waitScope));
|
||||
}
|
||||
|
||||
}
|
|
@ -39,6 +39,7 @@ liblixutil_test_support = declare_dependency(
|
|||
)
|
||||
|
||||
libutil_tests_sources = files(
|
||||
'libutil/async-semaphore.cc',
|
||||
'libutil/canon-path.cc',
|
||||
'libutil/checked-arithmetic.cc',
|
||||
'libutil/chunked-vector.cc',
|
||||
|
@ -76,6 +77,7 @@ libutil_tester = executable(
|
|||
liblixexpr_mstatic,
|
||||
liblixutil_test_support,
|
||||
nlohmann_json,
|
||||
kj,
|
||||
],
|
||||
cpp_pch : cpp_pch,
|
||||
)
|
||||
|
|
Loading…
Reference in a new issue