From 9f99d62480cf7c58c0a110b180f2096b7d25adab Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Tue, 30 Oct 2018 14:25:00 +0100 Subject: [PATCH] S3BinaryCacheStore: Allow disabling multipart uploads The use of TransferManager has several issues, including that it doesn't allow setting a Content-Encoding without a patch, and it doesn't handle exceptions in worker threads (causing termination on memory allocation failure). Fixes #2493. --- src/libstore/s3-binary-cache-store.cc | 86 +++++++++++++++++---------- 1 file changed, 56 insertions(+), 30 deletions(-) diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 13ee257ba..c5c6b89b1 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -173,6 +173,8 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore const Setting narinfoCompression{this, "", "narinfo-compression", "compression method for .narinfo files"}; const Setting lsCompression{this, "", "ls-compression", "compression method for .ls files"}; const Setting logCompression{this, "", "log-compression", "compression method for log/* files"}; + const Setting multipartUpload{ + this, false, "multipart-upload", "whether to use multi-part uploads"}; const Setting bufferSize{ this, 5 * 1024 * 1024, "buffer-size", "size (in bytes) of each part in multi-part uploads"}; @@ -261,46 +263,70 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore static std::shared_ptr executor = std::make_shared(maxThreads); - std::call_once(transferManagerCreated, [&]() { + std::call_once(transferManagerCreated, [&]() + { + if (multipartUpload) { + TransferManagerConfiguration transferConfig(executor.get()); - TransferManagerConfiguration transferConfig(executor.get()); + transferConfig.s3Client = s3Helper.client; + transferConfig.bufferSize = bufferSize; - transferConfig.s3Client = s3Helper.client; - transferConfig.bufferSize = bufferSize; + transferConfig.uploadProgressCallback = + [](const TransferManager *transferManager, + const std::shared_ptr + &transferHandle) + { + //FIXME: find a way to properly abort the multipart upload. + //checkInterrupt(); + debug("upload progress ('%s'): '%d' of '%d' bytes", + transferHandle->GetKey(), + transferHandle->GetBytesTransferred(), + transferHandle->GetBytesTotalSize()); + }; - transferConfig.uploadProgressCallback = - [](const TransferManager *transferManager, - const std::shared_ptr - &transferHandle) - { - //FIXME: find a way to properly abort the multipart upload. - //checkInterrupt(); - debug("upload progress ('%s'): '%d' of '%d' bytes", - transferHandle->GetKey(), - transferHandle->GetBytesTransferred(), - transferHandle->GetBytesTotalSize()); - }; - - transferManager = TransferManager::Create(transferConfig); + transferManager = TransferManager::Create(transferConfig); + } }); auto now1 = std::chrono::steady_clock::now(); - std::shared_ptr transferHandle = - transferManager->UploadFile( - stream, bucketName, path, mimeType, - Aws::Map(), - nullptr, contentEncoding); + if (transferManager) { - transferHandle->WaitUntilFinished(); + std::shared_ptr transferHandle = + transferManager->UploadFile( + stream, bucketName, path, mimeType, + Aws::Map(), + nullptr, contentEncoding); - if (transferHandle->GetStatus() == TransferStatus::FAILED) - throw Error("AWS error: failed to upload 's3://%s/%s': %s", - bucketName, path, transferHandle->GetLastError().GetMessage()); + transferHandle->WaitUntilFinished(); - if (transferHandle->GetStatus() != TransferStatus::COMPLETED) - throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state", - bucketName, path); + if (transferHandle->GetStatus() == TransferStatus::FAILED) + throw Error("AWS error: failed to upload 's3://%s/%s': %s", + bucketName, path, transferHandle->GetLastError().GetMessage()); + + if (transferHandle->GetStatus() != TransferStatus::COMPLETED) + throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state", + bucketName, path); + + } else { + + auto request = + Aws::S3::Model::PutObjectRequest() + .WithBucket(bucketName) + .WithKey(path); + + request.SetContentType(mimeType); + + if (contentEncoding != "") + request.SetContentEncoding(contentEncoding); + + auto stream = std::make_shared(data); + + request.SetBody(stream); + + auto result = checkAws(fmt("AWS error uploading '%s'", path), + s3Helper.client->PutObject(request)); + } printTalkative("upload of '%s' completed", path);