diff --git a/release-common.nix b/release-common.nix index 0c12bc7ce..d7fb8125f 100644 --- a/release-common.nix +++ b/release-common.nix @@ -61,7 +61,7 @@ rec { ++ lib.optional (stdenv.isLinux || stdenv.isDarwin) libsodium ++ lib.optional (stdenv.isLinux || stdenv.isDarwin) (aws-sdk-cpp.override { - apis = ["s3"]; + apis = ["s3" "transfer"]; customMemoryManagement = false; }); diff --git a/src/libstore/local.mk b/src/libstore/local.mk index a7279aa39..3799257f8 100644 --- a/src/libstore/local.mk +++ b/src/libstore/local.mk @@ -18,7 +18,7 @@ libstore_FILES = sandbox-defaults.sb sandbox-minimal.sb sandbox-network.sb $(foreach file,$(libstore_FILES),$(eval $(call install-data-in,$(d)/$(file),$(datadir)/nix/sandbox))) ifeq ($(ENABLE_S3), 1) - libstore_LDFLAGS += -laws-cpp-sdk-s3 -laws-cpp-sdk-core + libstore_LDFLAGS += -laws-cpp-sdk-transfer -laws-cpp-sdk-s3 -laws-cpp-sdk-core endif ifeq ($(OS), SunOS) diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 23af45209..96673a5b0 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -24,6 +25,9 @@ #include #include #include +#include + +using namespace Aws::Transfer; namespace nix { @@ -169,6 +173,8 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore const Setting narinfoCompression{this, "", "narinfo-compression", "compression method for .narinfo files"}; const Setting lsCompression{this, "", "ls-compression", "compression method for .ls files"}; const Setting logCompression{this, "", "log-compression", "compression method for log/* files"}; + const Setting bufferSize{ + this, 5 * 1024 * 1024, "buffer-size", "size (in bytes) of each part in multi-part uploads. defaults to 5Mb"}; std::string bucketName; @@ -271,34 +277,76 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore const std::string & mimeType, const std::string & contentEncoding) { - auto request = - Aws::S3::Model::PutObjectRequest() - .WithBucket(bucketName) - .WithKey(path); - - request.SetContentType(mimeType); - - if (contentEncoding != "") - request.SetContentEncoding(contentEncoding); - auto stream = std::make_shared(data); - request.SetBody(stream); + auto maxThreads = std::thread::hardware_concurrency(); - stats.put++; - stats.putBytes += data.size(); + static std::shared_ptr + executor = std::make_shared(maxThreads); + + TransferManagerConfiguration transferConfig(executor.get()); + + transferConfig.s3Client = s3Helper.client; + transferConfig.bufferSize = bufferSize; + + if (contentEncoding != "") + transferConfig.createMultipartUploadTemplate.SetContentEncoding( + contentEncoding); + + transferConfig.uploadProgressCallback = + [&](const TransferManager *transferManager, + const std::shared_ptr + &transferHandle) { + //FIXME: find a way to properly abort the multipart upload. + checkInterrupt(); + printTalkative("upload progress ('%s'): '%d' of '%d' bytes", + path, + transferHandle->GetBytesTransferred(), + transferHandle->GetBytesTotalSize()); + }; + + transferConfig.transferStatusUpdatedCallback = + [&](const TransferManager *, + const std::shared_ptr + &transferHandle) { + switch (transferHandle->GetStatus()) { + case TransferStatus::COMPLETED: + printTalkative("upload of '%s' completed", path); + stats.put++; + stats.putBytes += data.size(); + break; + case TransferStatus::IN_PROGRESS: + break; + case TransferStatus::FAILED: + throw Error("AWS error: failed to upload 's3://%s/%s'", + bucketName, path); + break; + default: + throw Error("AWS error: transfer status of 's3://%s/%s' " + "in unexpected state", + bucketName, path); + }; + }; + + std::shared_ptr transferManager = + TransferManager::Create(transferConfig); auto now1 = std::chrono::steady_clock::now(); - auto result = checkAws(format("AWS error uploading '%s'") % path, - s3Helper.client->PutObject(request)); + std::shared_ptr transferHandle = + transferManager->UploadFile(stream, bucketName, path, mimeType, + Aws::Map()); + + transferHandle->WaitUntilFinished(); auto now2 = std::chrono::steady_clock::now(); - auto duration = std::chrono::duration_cast(now2 - now1).count(); + auto duration = + std::chrono::duration_cast(now2 - now1) + .count(); - printInfo(format("uploaded 's3://%1%/%2%' (%3% bytes) in %4% ms") - % bucketName % path % data.size() % duration); + printInfo(format("uploaded 's3://%1%/%2%' (%3% bytes) in %4% ms") % + bucketName % path % data.size() % duration); stats.putTimeMs += duration; }