libstore: delocalize TransferSource

future changes will need to add template functions to this class, which
cannot be done for classes declared locally in unctions. otherwise this
is mostly code motion to clean up enqueueFileTransfer a little further.

Change-Id: If9a9d9eb47ceadfa75a4eebd54e2db39f2305643
This commit is contained in:
eldritch horrors 2024-11-09 01:17:28 +01:00
parent 1338e93bc9
commit d75c399b29

View file

@ -847,6 +847,14 @@ struct curlFileTransfer : public FileTransfer
return std::move(*eager); return std::move(*eager);
} }
auto source = make_box_ptr<TransferSource>(*this, uri, headers, std::move(data), noBody);
auto metadata = source->startTransfer();
source->awaitData();
return {std::move(metadata), std::move(source)};
}
struct TransferSource : Source
{
struct State { struct State {
bool done = false; bool done = false;
std::exception_ptr exc; std::exception_ptr exc;
@ -854,126 +862,135 @@ struct curlFileTransfer : public FileTransfer
std::condition_variable avail; std::condition_variable avail;
}; };
auto _state = std::make_shared<Sync<State>>(); curlFileTransfer & parent;
std::string uri;
Headers headers;
std::optional<std::string> data;
bool noBody;
auto item = std::make_shared<TransferItem>( const std::shared_ptr<Sync<State>> _state = std::make_shared<Sync<State>>();
*this, std::shared_ptr<TransferItem> transfer;
uri, std::string chunk;
headers, std::string_view buffered;
getCurActivity(),
[_state](std::exception_ptr ex) {
auto state(_state->lock());
state->done = true;
state->exc = ex;
state->avail.notify_one();
},
[_state](std::string_view data) {
auto state(_state->lock());
/* If the buffer is full, then go to sleep until the calling TransferSource(
thread wakes us up (i.e. when it has removed data from the curlFileTransfer & parent,
buffer). We don't wait forever to prevent stalling the const std::string & uri,
download thread. (Hopefully sleeping will throttle the const Headers & headers,
sender.) */ std::optional<std::string> data,
if (state->data.size() > 1024 * 1024) { bool noBody
return false; )
} : parent(parent)
, uri(uri)
/* Append data to the buffer and wake up the calling , headers(headers)
thread. */ , data(std::move(data))
state->data.append(data); , noBody(noBody)
state->avail.notify_one();
return true;
},
std::move(data),
noBody
);
enqueueItem(item);
struct TransferSource : Source
{ {
const std::shared_ptr<Sync<State>> _state; }
std::shared_ptr<TransferItem> transfer;
std::string chunk;
std::string_view buffered;
explicit TransferSource( ~TransferSource()
const std::shared_ptr<Sync<State>> & state, std::shared_ptr<TransferItem> transfer {
) // wake up the download thread if it's still going and have it abort
: _state(state) try {
, transfer(std::move(transfer)) if (transfer) {
{
}
~TransferSource()
{
// wake up the download thread if it's still going and have it abort
try {
transfer->cancel(); transfer->cancel();
} catch (...) {
ignoreExceptionInDestructor();
} }
} catch (...) {
ignoreExceptionInDestructor();
} }
}
void awaitData() FileTransferResult startTransfer()
{ {
/* Grab data if available, otherwise wait for the download transfer = std::make_shared<TransferItem>(
thread to wake us up. */ parent,
while (buffered.empty()) { uri,
headers,
getCurActivity(),
[_state{_state}](std::exception_ptr ex) {
auto state(_state->lock());
state->done = true;
state->exc = ex;
state->avail.notify_one();
},
[_state{_state}](std::string_view data) {
auto state(_state->lock()); auto state(_state->lock());
if (state->data.empty()) { /* If the buffer is full, then go to sleep until the calling
if (state->done) { thread wakes us up (i.e. when it has removed data from the
if (state->exc) { buffer). We don't wait forever to prevent stalling the
std::rethrow_exception(state->exc); download thread. (Hopefully sleeping will throttle the
} sender.) */
return; if (state->data.size() > 1024 * 1024) {
return false;
}
/* Append data to the buffer and wake up the calling
thread. */
state->data.append(data);
state->avail.notify_one();
return true;
},
std::move(data),
noBody
);
parent.enqueueItem(transfer);
return transfer->metadataPromise.get_future().get();
}
void awaitData()
{
/* Grab data if available, otherwise wait for the download
thread to wake us up. */
while (buffered.empty()) {
auto state(_state->lock());
if (state->data.empty()) {
if (state->done) {
if (state->exc) {
std::rethrow_exception(state->exc);
} }
return;
transfer->unpause();
state.wait(state->avail);
} }
chunk = std::move(state->data);
buffered = chunk;
transfer->unpause(); transfer->unpause();
state.wait(state->avail);
}
chunk = std::move(state->data);
buffered = chunk;
transfer->unpause();
}
}
size_t read(char * data, size_t len) override
{
auto readPartial = [this](char * data, size_t len) -> size_t {
const auto available = std::min(len, buffered.size());
if (available == 0u) return 0u;
memcpy(data, buffered.data(), available);
buffered.remove_prefix(available);
return available;
};
size_t total = readPartial(data, len);
while (total < len) {
awaitData();
const auto current = readPartial(data + total, len - total);
total += current;
if (total == 0 || current == 0) {
break;
} }
} }
size_t read(char * data, size_t len) override if (total == 0) {
{ throw EndOfFile("transfer finished");
auto readPartial = [this](char * data, size_t len) -> size_t {
const auto available = std::min(len, buffered.size());
if (available == 0u) return 0u;
memcpy(data, buffered.data(), available);
buffered.remove_prefix(available);
return available;
};
size_t total = readPartial(data, len);
while (total < len) {
awaitData();
const auto current = readPartial(data + total, len - total);
total += current;
if (total == 0 || current == 0) {
break;
}
}
if (total == 0) {
throw EndOfFile("transfer finished");
}
return total;
} }
};
auto metadata = item->metadataPromise.get_future().get(); return total;
auto source = make_box_ptr<TransferSource>(_state, item); }
source->awaitData(); };
return {std::move(metadata), std::move(source)};
}
bool exists(const std::string & uri, const Headers & headers) override bool exists(const std::string & uri, const Headers & headers) override
{ {