forked from lix-project/lix
DownloadResult -> DataTransferResult
This commit is contained in:
parent
e5cc53beec
commit
142ed7fe45
|
@ -50,10 +50,10 @@ struct CurlDownloader : public Downloader
|
||||||
{
|
{
|
||||||
CurlDownloader & downloader;
|
CurlDownloader & downloader;
|
||||||
DataTransferRequest request;
|
DataTransferRequest request;
|
||||||
DownloadResult result;
|
DataTransferResult result;
|
||||||
Activity act;
|
Activity act;
|
||||||
bool done = false; // whether either the success or failure function has been called
|
bool done = false; // whether either the success or failure function has been called
|
||||||
Callback<DownloadResult> callback;
|
Callback<DataTransferResult> callback;
|
||||||
CURL * req = 0;
|
CURL * req = 0;
|
||||||
bool active = false; // whether the handle has been added to the multi object
|
bool active = false; // whether the handle has been added to the multi object
|
||||||
std::string status;
|
std::string status;
|
||||||
|
@ -74,7 +74,7 @@ struct CurlDownloader : public Downloader
|
||||||
|
|
||||||
DownloadItem(CurlDownloader & downloader,
|
DownloadItem(CurlDownloader & downloader,
|
||||||
const DataTransferRequest & request,
|
const DataTransferRequest & request,
|
||||||
Callback<DownloadResult> && callback)
|
Callback<DataTransferResult> && callback)
|
||||||
: downloader(downloader)
|
: downloader(downloader)
|
||||||
, request(request)
|
, request(request)
|
||||||
, act(*logger, lvlTalkative, actDownload,
|
, act(*logger, lvlTalkative, actDownload,
|
||||||
|
@ -642,7 +642,7 @@ struct CurlDownloader : public Downloader
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
void enqueueDownload(const DataTransferRequest & request,
|
void enqueueDownload(const DataTransferRequest & request,
|
||||||
Callback<DownloadResult> callback) override
|
Callback<DataTransferResult> callback) override
|
||||||
{
|
{
|
||||||
/* Ugly hack to support s3:// URIs. */
|
/* Ugly hack to support s3:// URIs. */
|
||||||
if (hasPrefix(request.uri, "s3://")) {
|
if (hasPrefix(request.uri, "s3://")) {
|
||||||
|
@ -660,7 +660,7 @@ struct CurlDownloader : public Downloader
|
||||||
|
|
||||||
// FIXME: implement ETag
|
// FIXME: implement ETag
|
||||||
auto s3Res = s3Helper.getObject(bucketName, key);
|
auto s3Res = s3Helper.getObject(bucketName, key);
|
||||||
DownloadResult res;
|
DataTransferResult res;
|
||||||
if (!s3Res.data)
|
if (!s3Res.data)
|
||||||
throw DownloadError(NotFound, fmt("S3 object '%s' does not exist", request.uri));
|
throw DownloadError(NotFound, fmt("S3 object '%s' does not exist", request.uri));
|
||||||
res.data = s3Res.data;
|
res.data = s3Res.data;
|
||||||
|
@ -687,11 +687,11 @@ ref<Downloader> makeDownloader()
|
||||||
return make_ref<CurlDownloader>();
|
return make_ref<CurlDownloader>();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::future<DownloadResult> Downloader::enqueueDownload(const DataTransferRequest & request)
|
std::future<DataTransferResult> Downloader::enqueueDownload(const DataTransferRequest & request)
|
||||||
{
|
{
|
||||||
auto promise = std::make_shared<std::promise<DownloadResult>>();
|
auto promise = std::make_shared<std::promise<DataTransferResult>>();
|
||||||
enqueueDownload(request,
|
enqueueDownload(request,
|
||||||
{[promise](std::future<DownloadResult> fut) {
|
{[promise](std::future<DataTransferResult> fut) {
|
||||||
try {
|
try {
|
||||||
promise->set_value(fut.get());
|
promise->set_value(fut.get());
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
|
@ -701,7 +701,7 @@ std::future<DownloadResult> Downloader::enqueueDownload(const DataTransferReques
|
||||||
return promise->get_future();
|
return promise->get_future();
|
||||||
}
|
}
|
||||||
|
|
||||||
DownloadResult Downloader::download(const DataTransferRequest & request)
|
DataTransferResult Downloader::download(const DataTransferRequest & request)
|
||||||
{
|
{
|
||||||
return enqueueDownload(request).get();
|
return enqueueDownload(request).get();
|
||||||
}
|
}
|
||||||
|
@ -756,7 +756,7 @@ void Downloader::download(DataTransferRequest && request, Sink & sink)
|
||||||
};
|
};
|
||||||
|
|
||||||
enqueueDownload(request,
|
enqueueDownload(request,
|
||||||
{[_state](std::future<DownloadResult> fut) {
|
{[_state](std::future<DataTransferResult> fut) {
|
||||||
auto state(_state->lock());
|
auto state(_state->lock());
|
||||||
state->quit = true;
|
state->quit = true;
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -56,7 +56,7 @@ struct DataTransferRequest
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
struct DownloadResult
|
struct DataTransferResult
|
||||||
{
|
{
|
||||||
bool cached = false;
|
bool cached = false;
|
||||||
std::string etag;
|
std::string etag;
|
||||||
|
@ -75,12 +75,12 @@ struct Downloader
|
||||||
the download. The future may throw a DownloadError
|
the download. The future may throw a DownloadError
|
||||||
exception. */
|
exception. */
|
||||||
virtual void enqueueDownload(const DataTransferRequest & request,
|
virtual void enqueueDownload(const DataTransferRequest & request,
|
||||||
Callback<DownloadResult> callback) = 0;
|
Callback<DataTransferResult> callback) = 0;
|
||||||
|
|
||||||
std::future<DownloadResult> enqueueDownload(const DataTransferRequest & request);
|
std::future<DataTransferResult> enqueueDownload(const DataTransferRequest & request);
|
||||||
|
|
||||||
/* Synchronously download a file. */
|
/* Synchronously download a file. */
|
||||||
DownloadResult download(const DataTransferRequest & request);
|
DataTransferResult download(const DataTransferRequest & request);
|
||||||
|
|
||||||
/* Download a file, writing its data to a sink. The sink will be
|
/* Download a file, writing its data to a sink. The sink will be
|
||||||
invoked on the thread of the caller. */
|
invoked on the thread of the caller. */
|
||||||
|
|
|
@ -143,7 +143,7 @@ protected:
|
||||||
auto callbackPtr = std::make_shared<decltype(callback)>(std::move(callback));
|
auto callbackPtr = std::make_shared<decltype(callback)>(std::move(callback));
|
||||||
|
|
||||||
getDownloader()->enqueueDownload(request,
|
getDownloader()->enqueueDownload(request,
|
||||||
{[callbackPtr, this](std::future<DownloadResult> result) {
|
{[callbackPtr, this](std::future<DataTransferResult> result) {
|
||||||
try {
|
try {
|
||||||
(*callbackPtr)(result.get().data);
|
(*callbackPtr)(result.get().data);
|
||||||
} catch (DownloadError & e) {
|
} catch (DownloadError & e) {
|
||||||
|
|
|
@ -132,7 +132,7 @@ ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(const string & region
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
S3Helper::DownloadResult S3Helper::getObject(
|
S3Helper::DataTransferResult S3Helper::getObject(
|
||||||
const std::string & bucketName, const std::string & key)
|
const std::string & bucketName, const std::string & key)
|
||||||
{
|
{
|
||||||
debug("fetching 's3://%s/%s'...", bucketName, key);
|
debug("fetching 's3://%s/%s'...", bucketName, key);
|
||||||
|
@ -146,7 +146,7 @@ S3Helper::DownloadResult S3Helper::getObject(
|
||||||
return Aws::New<std::stringstream>("STRINGSTREAM");
|
return Aws::New<std::stringstream>("STRINGSTREAM");
|
||||||
});
|
});
|
||||||
|
|
||||||
DownloadResult res;
|
DataTransferResult res;
|
||||||
|
|
||||||
auto now1 = std::chrono::steady_clock::now();
|
auto now1 = std::chrono::steady_clock::now();
|
||||||
|
|
||||||
|
|
|
@ -18,13 +18,13 @@ struct S3Helper
|
||||||
|
|
||||||
ref<Aws::Client::ClientConfiguration> makeConfig(const std::string & region, const std::string & scheme, const std::string & endpoint);
|
ref<Aws::Client::ClientConfiguration> makeConfig(const std::string & region, const std::string & scheme, const std::string & endpoint);
|
||||||
|
|
||||||
struct DownloadResult
|
struct DataTransferResult
|
||||||
{
|
{
|
||||||
std::shared_ptr<std::string> data;
|
std::shared_ptr<std::string> data;
|
||||||
unsigned int durationMs;
|
unsigned int durationMs;
|
||||||
};
|
};
|
||||||
|
|
||||||
DownloadResult getObject(
|
DataTransferResult getObject(
|
||||||
const std::string & bucketName, const std::string & key);
|
const std::string & bucketName, const std::string & key);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue