libstore: remove FileTransferRequests::data
use separate upload and download methods instead.
Change-Id: I5baa2177c8ddd70268c75ff074e361b2f17dddbd
This commit is contained in:
parent
2d49efaa2e
commit
ce3e1d1e7a
|
@ -46,7 +46,7 @@ DownloadFileResult downloadFile(
|
||||||
request.expectedETag = getStrAttr(cached->infoAttrs, "etag");
|
request.expectedETag = getStrAttr(cached->infoAttrs, "etag");
|
||||||
FileTransferResult res;
|
FileTransferResult res;
|
||||||
try {
|
try {
|
||||||
res = getFileTransfer()->enqueueFileTransfer(request).get();
|
res = getFileTransfer()->enqueueDownload(request).get();
|
||||||
} catch (FileTransferError & e) {
|
} catch (FileTransferError & e) {
|
||||||
if (cached) {
|
if (cached) {
|
||||||
warn("%s; using cached version", e.msg());
|
warn("%s; using cached version", e.msg());
|
||||||
|
|
|
@ -46,6 +46,7 @@ struct curlFileTransfer : public FileTransfer
|
||||||
FileTransferRequest request;
|
FileTransferRequest request;
|
||||||
FileTransferResult result;
|
FileTransferResult result;
|
||||||
Activity act;
|
Activity act;
|
||||||
|
std::optional<std::string> uploadData;
|
||||||
bool done = false; // whether either the success or failure function has been called
|
bool done = false; // whether either the success or failure function has been called
|
||||||
std::packaged_task<FileTransferResult(std::exception_ptr, FileTransferResult)> callback;
|
std::packaged_task<FileTransferResult(std::exception_ptr, FileTransferResult)> callback;
|
||||||
std::function<void(TransferItem &, std::string_view data)> dataCallback;
|
std::function<void(TransferItem &, std::string_view data)> dataCallback;
|
||||||
|
@ -81,18 +82,21 @@ struct curlFileTransfer : public FileTransfer
|
||||||
|
|
||||||
std::string verb() const
|
std::string verb() const
|
||||||
{
|
{
|
||||||
return request.data ? "upload" : "download";
|
return uploadData ? "upload" : "download";
|
||||||
}
|
}
|
||||||
|
|
||||||
TransferItem(curlFileTransfer & fileTransfer,
|
TransferItem(curlFileTransfer & fileTransfer,
|
||||||
const FileTransferRequest & request,
|
const FileTransferRequest & request,
|
||||||
std::invocable<std::exception_ptr> auto callback,
|
std::invocable<std::exception_ptr> auto callback,
|
||||||
std::function<void(TransferItem &, std::string_view data)> dataCallback)
|
std::function<void(TransferItem &, std::string_view data)> dataCallback,
|
||||||
|
std::optional<std::string> uploadData
|
||||||
|
)
|
||||||
: fileTransfer(fileTransfer)
|
: fileTransfer(fileTransfer)
|
||||||
, request(request)
|
, request(request)
|
||||||
, act(*logger, lvlTalkative, actFileTransfer,
|
, act(*logger, lvlTalkative, actFileTransfer,
|
||||||
fmt(request.data ? "uploading '%s'" : "downloading '%s'", request.uri),
|
fmt(uploadData ? "uploading '%s'" : "downloading '%s'", request.uri),
|
||||||
{request.uri}, request.parentAct)
|
{request.uri}, request.parentAct)
|
||||||
|
, uploadData(std::move(uploadData))
|
||||||
, callback([cb{std::move(callback)}] (std::exception_ptr ex, FileTransferResult r) {
|
, callback([cb{std::move(callback)}] (std::exception_ptr ex, FileTransferResult r) {
|
||||||
cb(ex);
|
cb(ex);
|
||||||
return r;
|
return r;
|
||||||
|
@ -236,14 +240,14 @@ struct curlFileTransfer : public FileTransfer
|
||||||
size_t readOffset = 0;
|
size_t readOffset = 0;
|
||||||
size_t readCallback(char *buffer, size_t size, size_t nitems)
|
size_t readCallback(char *buffer, size_t size, size_t nitems)
|
||||||
{
|
{
|
||||||
if (readOffset == request.data->length())
|
if (readOffset == uploadData->length())
|
||||||
return 0;
|
return 0;
|
||||||
auto count = std::min(size * nitems, request.data->length() - readOffset);
|
auto count = std::min(size * nitems, uploadData->length() - readOffset);
|
||||||
assert(count);
|
assert(count);
|
||||||
// Lint: this is turning a string into a byte array to hand to
|
// Lint: this is turning a string into a byte array to hand to
|
||||||
// curl, which is fine.
|
// curl, which is fine.
|
||||||
// NOLINTNEXTLINE(bugprone-not-null-terminated-result)
|
// NOLINTNEXTLINE(bugprone-not-null-terminated-result)
|
||||||
memcpy(buffer, request.data->data() + readOffset, count);
|
memcpy(buffer, uploadData->data() + readOffset, count);
|
||||||
readOffset += count;
|
readOffset += count;
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
@ -295,11 +299,11 @@ struct curlFileTransfer : public FileTransfer
|
||||||
if (request.head)
|
if (request.head)
|
||||||
curl_easy_setopt(req, CURLOPT_NOBODY, 1);
|
curl_easy_setopt(req, CURLOPT_NOBODY, 1);
|
||||||
|
|
||||||
if (request.data) {
|
if (uploadData) {
|
||||||
curl_easy_setopt(req, CURLOPT_UPLOAD, 1L);
|
curl_easy_setopt(req, CURLOPT_UPLOAD, 1L);
|
||||||
curl_easy_setopt(req, CURLOPT_READFUNCTION, readCallbackWrapper);
|
curl_easy_setopt(req, CURLOPT_READFUNCTION, readCallbackWrapper);
|
||||||
curl_easy_setopt(req, CURLOPT_READDATA, this);
|
curl_easy_setopt(req, CURLOPT_READDATA, this);
|
||||||
curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->length());
|
curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) uploadData->length());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (request.verifyTLS) {
|
if (request.verifyTLS) {
|
||||||
|
@ -620,7 +624,7 @@ struct curlFileTransfer : public FileTransfer
|
||||||
|
|
||||||
std::shared_ptr<TransferItem> enqueueItem(std::shared_ptr<TransferItem> item)
|
std::shared_ptr<TransferItem> enqueueItem(std::shared_ptr<TransferItem> item)
|
||||||
{
|
{
|
||||||
if (item->request.data
|
if (item->uploadData
|
||||||
&& !item->request.uri.starts_with("http://")
|
&& !item->request.uri.starts_with("http://")
|
||||||
&& !item->request.uri.starts_with("https://"))
|
&& !item->request.uri.starts_with("https://"))
|
||||||
throw nix::Error("uploading to '%s' is not supported", item->request.uri);
|
throw nix::Error("uploading to '%s' is not supported", item->request.uri);
|
||||||
|
@ -651,7 +655,19 @@ struct curlFileTransfer : public FileTransfer
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
std::future<FileTransferResult> enqueueFileTransfer(const FileTransferRequest & request) override
|
std::future<FileTransferResult> enqueueDownload(const FileTransferRequest & request) override
|
||||||
|
{
|
||||||
|
return enqueueFileTransfer(request, std::nullopt);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::future<FileTransferResult>
|
||||||
|
enqueueUpload(const FileTransferRequest & request, std::string data) override
|
||||||
|
{
|
||||||
|
return enqueueFileTransfer(request, std::move(data));
|
||||||
|
}
|
||||||
|
|
||||||
|
std::future<FileTransferResult>
|
||||||
|
enqueueFileTransfer(const FileTransferRequest & request, std::optional<std::string> data)
|
||||||
{
|
{
|
||||||
return enqueueFileTransfer(
|
return enqueueFileTransfer(
|
||||||
request,
|
request,
|
||||||
|
@ -660,13 +676,16 @@ struct curlFileTransfer : public FileTransfer
|
||||||
std::rethrow_exception(ex);
|
std::rethrow_exception(ex);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{}
|
{},
|
||||||
|
std::move(data)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::future<FileTransferResult> enqueueFileTransfer(const FileTransferRequest & request,
|
std::future<FileTransferResult> enqueueFileTransfer(const FileTransferRequest & request,
|
||||||
std::invocable<std::exception_ptr> auto callback,
|
std::invocable<std::exception_ptr> auto callback,
|
||||||
std::function<void(TransferItem &, std::string_view data)> dataCallback)
|
std::function<void(TransferItem &, std::string_view data)> dataCallback,
|
||||||
|
std::optional<std::string> data
|
||||||
|
)
|
||||||
{
|
{
|
||||||
/* Ugly hack to support s3:// URIs. */
|
/* Ugly hack to support s3:// URIs. */
|
||||||
if (request.uri.starts_with("s3://")) {
|
if (request.uri.starts_with("s3://")) {
|
||||||
|
@ -695,9 +714,11 @@ struct curlFileTransfer : public FileTransfer
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
return enqueueItem(std::make_shared<TransferItem>(
|
return enqueueItem(
|
||||||
*this, request, std::move(callback), std::move(dataCallback)
|
std::make_shared<TransferItem>(
|
||||||
))
|
*this, request, std::move(callback), std::move(dataCallback), std::move(data)
|
||||||
|
)
|
||||||
|
)
|
||||||
->callback.get_future();
|
->callback.get_future();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -747,7 +768,8 @@ struct curlFileTransfer : public FileTransfer
|
||||||
thread. */
|
thread. */
|
||||||
state->data.append(data);
|
state->data.append(data);
|
||||||
state->avail.notify_one();
|
state->avail.notify_one();
|
||||||
}
|
},
|
||||||
|
std::nullopt
|
||||||
);
|
);
|
||||||
|
|
||||||
struct InnerSource : Source
|
struct InnerSource : Source
|
||||||
|
|
|
@ -62,7 +62,6 @@ struct FileTransferRequest
|
||||||
size_t tries = fileTransferSettings.tries;
|
size_t tries = fileTransferSettings.tries;
|
||||||
unsigned int baseRetryTimeMs = 250;
|
unsigned int baseRetryTimeMs = 250;
|
||||||
ActivityId parentAct;
|
ActivityId parentAct;
|
||||||
std::optional<std::string> data;
|
|
||||||
|
|
||||||
FileTransferRequest(std::string_view uri)
|
FileTransferRequest(std::string_view uri)
|
||||||
: uri(uri), parentAct(getCurActivity()) { }
|
: uri(uri), parentAct(getCurActivity()) { }
|
||||||
|
@ -88,11 +87,17 @@ struct FileTransfer
|
||||||
virtual ~FileTransfer() { }
|
virtual ~FileTransfer() { }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Enqueue a data transfer request, returning a future to the result of
|
* Enqueues a download request, returning a future for the result of
|
||||||
* the download. The future may throw a FileTransferError
|
* the download. The future may throw a FileTransferError exception.
|
||||||
* exception.
|
|
||||||
*/
|
*/
|
||||||
virtual std::future<FileTransferResult> enqueueFileTransfer(const FileTransferRequest & request) = 0;
|
virtual std::future<FileTransferResult> enqueueDownload(const FileTransferRequest & request) = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enqueue an upload request, returning a future for the result of
|
||||||
|
* the upload. The future may throw a FileTransferError exception.
|
||||||
|
*/
|
||||||
|
virtual std::future<FileTransferResult>
|
||||||
|
enqueueUpload(const FileTransferRequest & request, std::string data) = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Download a file, returning its contents through a source. Will not return
|
* Download a file, returning its contents through a source. Will not return
|
||||||
|
|
|
@ -116,7 +116,7 @@ protected:
|
||||||
try {
|
try {
|
||||||
FileTransferRequest request{makeURI(path)};
|
FileTransferRequest request{makeURI(path)};
|
||||||
request.head = true;
|
request.head = true;
|
||||||
getFileTransfer()->enqueueFileTransfer(request).get();
|
getFileTransfer()->enqueueDownload(request).get();
|
||||||
return true;
|
return true;
|
||||||
} catch (FileTransferError & e) {
|
} catch (FileTransferError & e) {
|
||||||
/* S3 buckets return 403 if a file doesn't exist and the
|
/* S3 buckets return 403 if a file doesn't exist and the
|
||||||
|
@ -133,10 +133,10 @@ protected:
|
||||||
const std::string & mimeType) override
|
const std::string & mimeType) override
|
||||||
{
|
{
|
||||||
FileTransferRequest req{makeURI(path)};
|
FileTransferRequest req{makeURI(path)};
|
||||||
req.data = StreamToSourceAdapter(istream).drain();
|
auto data = StreamToSourceAdapter(istream).drain();
|
||||||
req.headers = {{"Content-Type", mimeType}};
|
req.headers = {{"Content-Type", mimeType}};
|
||||||
try {
|
try {
|
||||||
getFileTransfer()->enqueueFileTransfer(req).get();
|
getFileTransfer()->enqueueUpload(req, std::move(data)).get();
|
||||||
} catch (FileTransferError & e) {
|
} catch (FileTransferError & e) {
|
||||||
throw UploadToHTTP("while uploading to HTTP binary cache at '%s': %s", cacheUri, e.msg());
|
throw UploadToHTTP("while uploading to HTTP binary cache at '%s': %s", cacheUri, e.msg());
|
||||||
}
|
}
|
||||||
|
@ -171,7 +171,7 @@ protected:
|
||||||
FileTransferRequest request{makeURI(path)};
|
FileTransferRequest request{makeURI(path)};
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return std::move(getFileTransfer()->enqueueFileTransfer(request).get().data);
|
return std::move(getFileTransfer()->enqueueDownload(request).get().data);
|
||||||
} catch (FileTransferError & e) {
|
} catch (FileTransferError & e) {
|
||||||
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
|
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
|
||||||
return {};
|
return {};
|
||||||
|
|
|
@ -286,7 +286,7 @@ struct CmdUpgradeNix : MixDryRun, EvalCommand
|
||||||
|
|
||||||
// FIXME: use nixos.org?
|
// FIXME: use nixos.org?
|
||||||
auto req = FileTransferRequest(storePathsUrl);
|
auto req = FileTransferRequest(storePathsUrl);
|
||||||
auto res = getFileTransfer()->enqueueFileTransfer(req).get();
|
auto res = getFileTransfer()->enqueueDownload(req).get();
|
||||||
|
|
||||||
auto state = std::make_unique<EvalState>(SearchPath{}, store);
|
auto state = std::make_unique<EvalState>(SearchPath{}, store);
|
||||||
auto v = state->allocValue();
|
auto v = state->allocValue();
|
||||||
|
|
|
@ -163,7 +163,7 @@ TEST(FileTransfer, NOT_ON_DARWIN(reportsSetupErrors))
|
||||||
auto [port, srv] = serveHTTP("404 not found", "", [] { return ""; });
|
auto [port, srv] = serveHTTP("404 not found", "", [] { return ""; });
|
||||||
auto ft = makeFileTransfer();
|
auto ft = makeFileTransfer();
|
||||||
ASSERT_THROW(
|
ASSERT_THROW(
|
||||||
ft->enqueueFileTransfer(FileTransferRequest(fmt("http://[::1]:%d/index", port))).get(),
|
ft->enqueueDownload(FileTransferRequest(fmt("http://[::1]:%d/index", port))).get(),
|
||||||
FileTransferError
|
FileTransferError
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -219,7 +219,7 @@ TEST(FileTransfer, usesIntermediateLinkHeaders)
|
||||||
auto ft = makeFileTransfer();
|
auto ft = makeFileTransfer();
|
||||||
FileTransferRequest req(fmt("http://[::1]:%d/first", port));
|
FileTransferRequest req(fmt("http://[::1]:%d/first", port));
|
||||||
req.baseRetryTimeMs = 0;
|
req.baseRetryTimeMs = 0;
|
||||||
auto result = ft->enqueueFileTransfer(req).get();
|
auto result = ft->enqueueDownload(req).get();
|
||||||
ASSERT_EQ(result.immutableUrl, "http://foo");
|
ASSERT_EQ(result.immutableUrl, "http://foo");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue