Ensure download thread liveness

* Don't wait forever for the client to remove data from the
  buffer. This does mean that the buffer can grow without bounds
  (e.g. when downloading is faster than writing to disk), but meh.

* Don't hold the state lock while calling the sink. The sink could
  take any amount of time to process the data (in particular when it's
  actually a coroutine), so we don't want to block the download
  thread.
This commit is contained in:
Eelco Dolstra 2018-09-26 21:43:17 +02:00
parent 98b2cc2e6e
commit 9750430003
No known key found for this signature in database
GPG key ID: 8170B4726D7198DE

View file

@ -710,11 +710,12 @@ void Downloader::download(DownloadRequest && request, Sink & sink)
/* If the buffer is full, then go to sleep until the calling /* If the buffer is full, then go to sleep until the calling
thread wakes us up (i.e. when it has removed data from the thread wakes us up (i.e. when it has removed data from the
buffer). Note: this does stall the download thread. */ buffer). We don't wait forever to prevent stalling the
while (state->data.size() > 1024 * 1024) { download thread. (Hopefully sleeping will throttle the
if (state->quit) return; sender.) */
if (state->data.size() > 1024 * 1024) {
debug("download buffer is full; going to sleep"); debug("download buffer is full; going to sleep");
state.wait(state->request); state.wait_for(state->request, std::chrono::seconds(10));
} }
/* Append data to the buffer and wake up the calling /* Append data to the buffer and wake up the calling
@ -736,30 +737,36 @@ void Downloader::download(DownloadRequest && request, Sink & sink)
state->request.notify_one(); state->request.notify_one();
}}); }});
auto state(_state->lock());
while (true) { while (true) {
checkInterrupt(); checkInterrupt();
/* If no data is available, then wait for the download thread std::string chunk;
to wake us up. */
if (state->data.empty()) {
if (state->quit) { /* Grab data if available, otherwise wait for the download
if (state->exc) std::rethrow_exception(state->exc); thread to wake us up. */
break; {
auto state(_state->lock());
while (state->data.empty()) {
if (state->quit) {
if (state->exc) std::rethrow_exception(state->exc);
return;
}
state.wait(state->avail);
} }
state.wait(state->avail); chunk = std::move(state->data);
}
/* If data is available, then flush it to the sink and wake up
the download thread if it's blocked on a full buffer. */
if (!state->data.empty()) {
sink((unsigned char *) state->data.data(), state->data.size());
state->data.clear();
state->request.notify_one(); state->request.notify_one();
} }
/* Flush the data to the sink and wake up the download thread
if it's blocked on a full buffer. We don't hold the state
lock while doing this to prevent blocking the download
thread if sink() takes a long time. */
sink((unsigned char *) chunk.data(), chunk.size());
} }
} }