Merge branch 'priorityqueue' of https://github.com/groxxda/nix

This commit is contained in:
Eelco Dolstra 2016-10-19 16:37:04 +02:00
commit 307cc8c33d

View file

@ -10,6 +10,7 @@
#include <curl/curl.h> #include <curl/curl.h>
#include <queue>
#include <iostream> #include <iostream>
#include <thread> #include <thread>
#include <cmath> #include <cmath>
@ -281,8 +282,13 @@ struct CurlDownloader : public Downloader
struct State struct State
{ {
struct EmbargoComparator {
bool operator() (const std::shared_ptr<DownloadItem> & i1, const std::shared_ptr<DownloadItem> & i2) {
return i1->embargo > i2->embargo;
}
};
bool quit = false; bool quit = false;
std::vector<std::shared_ptr<DownloadItem>> incoming; std::priority_queue<std::shared_ptr<DownloadItem>, std::vector<std::shared_ptr<DownloadItem>>, EmbargoComparator> incoming;
}; };
Sync<State> state_; Sync<State> state_;
@ -380,9 +386,7 @@ struct CurlDownloader : public Downloader
/* Add new curl requests from the incoming requests queue, /* Add new curl requests from the incoming requests queue,
except for requests that are embargoed (waiting for a except for requests that are embargoed (waiting for a
retry timeout to expire). FIXME: should use a priority retry timeout to expire). */
queue for the embargoed items to prevent repeated O(n)
checks. */
if (extraFDs[0].revents & CURL_WAIT_POLLIN) { if (extraFDs[0].revents & CURL_WAIT_POLLIN) {
char buf[1024]; char buf[1024];
auto res = read(extraFDs[0].fd, buf, sizeof(buf)); auto res = read(extraFDs[0].fd, buf, sizeof(buf));
@ -390,22 +394,23 @@ struct CurlDownloader : public Downloader
throw SysError("reading curl wakeup socket"); throw SysError("reading curl wakeup socket");
} }
std::vector<std::shared_ptr<DownloadItem>> incoming, embargoed; std::vector<std::shared_ptr<DownloadItem>> incoming;
auto now = std::chrono::steady_clock::now(); auto now = std::chrono::steady_clock::now();
{ {
auto state(state_.lock()); auto state(state_.lock());
for (auto & item: state->incoming) { while (!state->incoming.empty()) {
if (item->embargo <= now) auto item = state->incoming.top();
if (item->embargo <= now) {
incoming.push_back(item); incoming.push_back(item);
else { state->incoming.pop();
embargoed.push_back(item); } else {
if (nextWakeup == std::chrono::steady_clock::time_point() if (nextWakeup == std::chrono::steady_clock::time_point()
|| item->embargo < nextWakeup) || item->embargo < nextWakeup)
nextWakeup = item->embargo; nextWakeup = item->embargo;
break;
} }
} }
state->incoming = embargoed;
quit = state->quit; quit = state->quit;
} }
@ -432,7 +437,7 @@ struct CurlDownloader : public Downloader
{ {
auto state(state_.lock()); auto state(state_.lock());
state->incoming.clear(); while (!state->incoming.empty()) state->incoming.pop();
state->quit = true; state->quit = true;
} }
} }
@ -443,7 +448,7 @@ struct CurlDownloader : public Downloader
auto state(state_.lock()); auto state(state_.lock());
if (state->quit) if (state->quit)
throw nix::Error("cannot enqueue download request because the download thread is shutting down"); throw nix::Error("cannot enqueue download request because the download thread is shutting down");
state->incoming.push_back(item); state->incoming.push(item);
} }
writeFull(wakeupPipe.writeSide.get(), " "); writeFull(wakeupPipe.writeSide.get(), " ");
} }