Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## v3.0.36 (work in progress)

### New Features & Enhancements
* Added support for list parts requests at the end of multipart uploads (enable by passing the `--s3listparts` parameter).
* Added support for list multipart uploads requests at the end of multipart uploads (enable by passing the `--s3listmpu` parameter).

### General Changes
* Added config values to `--jsonfile` output.

Expand Down
2 changes: 2 additions & 0 deletions dist/etc/bash_completion.d/elbencho
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ _elbencho_opts()
--s3bversion
--s3bversionverify
--s3chksumalgo
--s3listmpu
--s3listparts
--s3endpoints
--s3fastget
--s3fastput
Expand Down
11 changes: 11 additions & 0 deletions source/ProgArgs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,11 @@ void ProgArgs::defineAllowedArgs()
"\"--" ARG_S3BUCKETTAG_LONG "\")")
/*s3b*/ (ARG_S3BUCKETVER_LONG, bpo::bool_switch(&this->doS3BucketVersioning),
"Activate bucket versioning operations.")
(ARG_S3LISTMPU_LONG, bpo::bool_switch(&this->doS3ListMPU),
"Enable list multipart uploads requests during multipart "
"uploads. (Off by default)")
(ARG_S3LISTPARTS_LONG, bpo::bool_switch(&this->doS3ListParts),
"Enable list parts requests during multipart uploads. (Off by default)")
/*s3b*/ (ARG_S3BUCKETVERVERIFY_LONG, bpo::bool_switch(&this->doS3BucketVersioningVerify),
"Verify the correctness of S3 bucket versioning settings. (Requires "
"\"--" ARG_S3BUCKETVER_LONG "\")")
Expand Down Expand Up @@ -891,6 +896,8 @@ void ProgArgs::defineDefaults()
this->doS3BucketVersioningVerify = false;
this->doS3ObjectTag = false;
this->doS3ObjectTagVerify = false;
this->doS3ListMPU = false;
this->doS3ListParts = false;
this->doS3ObjectLockCfg = false;
this->doS3ObjectLockCfgVerify = false;
this->useOpsLogLocking = false;
Expand Down Expand Up @@ -3329,6 +3336,8 @@ void ProgArgs::setFromPropertyTreeForService(bpt::ptree& tree)
doS3BucketTag = tree.get<bool>(ARG_S3BUCKETTAG_LONG);
doS3BucketTagVerify = tree.get<bool>(ARG_S3BUCKETTAGVERIFY_LONG);
doS3BucketVersioning = tree.get<bool>(ARG_S3BUCKETVER_LONG);
doS3ListMPU = tree.get<bool>(ARG_S3LISTMPU_LONG);
doS3ListParts = tree.get<bool>(ARG_S3LISTPARTS_LONG);
doS3BucketVersioningVerify = tree.get<bool>(ARG_S3BUCKETVERVERIFY_LONG);
doS3ObjectTag = tree.get<bool>(ARG_S3OBJTAG_LONG);
doS3ObjectTagVerify = tree.get<bool>(ARG_S3OBJTAGVERIFY_LONG);
Expand Down Expand Up @@ -3556,6 +3565,8 @@ void ProgArgs::getAsPropertyTreeForService(bpt::ptree& outTree, size_t serviceRa
outTree.put(ARG_S3BUCKETTAG_LONG, doS3BucketTag);
outTree.put(ARG_S3BUCKETTAGVERIFY_LONG, doS3BucketTagVerify);
outTree.put(ARG_S3BUCKETVER_LONG, doS3BucketVersioning);
outTree.put(ARG_S3LISTMPU_LONG, doS3ListMPU);
outTree.put(ARG_S3LISTPARTS_LONG, doS3ListParts);
outTree.put(ARG_S3BUCKETVERVERIFY_LONG, doS3BucketVersioningVerify);
outTree.put(ARG_S3OBJTAG_LONG, doS3ObjectTag);
outTree.put(ARG_S3OBJTAGVERIFY_LONG, doS3ObjectTagVerify);
Expand Down
8 changes: 7 additions & 1 deletion source/ProgArgs.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ namespace bpt = boost::property_tree;
#define ARG_S3CHECKSUM_ALGO_LONG "s3chksumalgo" // parameter for x-amz-sdk-checksum-algorithm
#define ARG_S3SSEKMSKEY_LONG "s3ssekmskey"
#define ARG_S3STATDIRS_LONG "s3statdirs"
#define ARG_S3LISTMPU_LONG "s3listmpu"
#define ARG_S3LISTPARTS_LONG "s3listparts"
#define ARG_SENDBUFSIZE_LONG "sendbuf"
#define ARG_SERVERS_LONG "servers"
#define ARG_SERVERSFILE_LONG "serversfile"
Expand Down Expand Up @@ -377,6 +379,8 @@ class ProgArgs
bool doS3ObjectTagVerify; // do bucket tagging verification.
bool doS3ObjectLockCfg; // do S3 object lock configuration
bool doS3ObjectLockCfgVerify; // do S3 object lock configuration verification
bool doS3ListMPU; // enable list multipart uploads requests
bool doS3ListParts; // enable list parts requests
bool doTruncate; // truncate files to 0 size on open for writing
bool doTruncToSize; // truncate files to size on creation via ftruncate()
unsigned fadviseFlags; // flags for fadvise() (ARG_FADVISE_FLAG_x)
Expand Down Expand Up @@ -639,7 +643,9 @@ class ProgArgs
bool getDoS3ObjectLockConfiguration() const { return doS3ObjectLockCfg; }
bool getDoS3ObjectLockConfigurationVerify() const { return doS3ObjectLockCfgVerify; }
bool getDoS3AclPutInline() const { return doS3AclPutInline; }
bool getDoS3AclVerify() const { return doS3AclVerify; }
bool getDoS3AclVerify() const { return doS3AclVerify; }
bool getDoS3ListMPU() const { return doS3ListMPU; }
bool getDoS3ListParts() const { return doS3ListParts; }
bool getDoTruncate() const { return doTruncate; }
bool getDoTruncToSize() const { return doTruncToSize; }
bool getDoListObjVerify() const { return doS3ListObjVerify; }
Expand Down
1 change: 1 addition & 0 deletions source/toolkits/S3Tk.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define TOOLKITS_S3TK_H_

#ifdef S3_SUPPORT
#include "Common.h"
#include <aws/core/Aws.h>
#include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/core/utils/HashingUtils.h>
Expand Down
134 changes: 110 additions & 24 deletions source/workers/LocalWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
#include INCLUDE_AWS_S3(model/GetBucketVersioningRequest.h)
#include INCLUDE_AWS_S3(model/HeadObjectRequest.h)
#include INCLUDE_AWS_S3(model/HeadBucketRequest.h)
#include INCLUDE_AWS_S3(model/ListMultipartUploadsRequest.h)
#include INCLUDE_AWS_S3(model/ListObjectsV2Request.h)
#include INCLUDE_AWS_S3(model/ListPartsRequest.h)
#include INCLUDE_AWS_S3(model/Object.h)
#include INCLUDE_AWS_S3(model/ObjectLockRule.h)
#include INCLUDE_AWS_S3(model/PutBucketAclRequest.h)
Expand Down Expand Up @@ -3882,15 +3884,9 @@ void LocalWorker::s3ModeIterateObjects()
IF_UNLIKELY( (fileIndex % INTERRUPTION_CHECK_INTERVAL) == 0)
checkInterruptionRequest();

// generate current dir path
int printRes;

if(haveSubdirs)
printRes = snprintf(currentPath.data(), PATH_BUF_LEN, "r%zu/d%zu/r%zu-f%zu",
workerDirRank, dirIndex, workerRank, fileIndex);
else
printRes = snprintf(currentPath.data(), PATH_BUF_LEN, "r%zu-f%zu",
workerRank, fileIndex);
// generate current file path (subdir is added to objectPrefix below)
int printRes = snprintf(currentPath.data(), PATH_BUF_LEN, "r%zu-f%zu",
workerRank, fileIndex);

IF_UNLIKELY(printRes >= PATH_BUF_LEN)
throw WorkerException("object path too long for static buffer. "
Expand All @@ -3902,6 +3898,16 @@ void LocalWorker::s3ModeIterateObjects()
if(objectPrefixRand)
objectPrefix = getS3RandObjectPrefix(
workerRank, dirIndex, fileIndex, progArgs->getS3ObjectPrefix() );
else if(haveSubdirs)
{
// add subdir to objectPrefix instead of currentPath
char subdirBuf[PATH_BUF_LEN];
snprintf(subdirBuf, PATH_BUF_LEN, "r%zu/d%zu/",
workerDirRank, dirIndex);
objectPrefix = progArgs->getS3ObjectPrefix() + std::string(subdirBuf);
}
else
objectPrefix = progArgs->getS3ObjectPrefix();

unsigned bucketIndex = (workerRank + dirIndex) % bucketVec.size();
std::string currentObjectPath = objectPrefix + currentPath.data();
Expand All @@ -3914,7 +3920,9 @@ void LocalWorker::s3ModeIterateObjects()
if( (benchPhase == BenchPhase_CREATEFILES) && !isRWMixedReader)
{
if(blockSize < fileSize)
s3ModeUploadObjectMultiPart(bucketVec[bucketIndex], currentObjectPath);
s3ModeUploadObjectMultiPart(bucketVec[bucketIndex],
objectPrefix,
currentObjectPath);
else
s3ModeUploadObjectSinglePart(bucketVec[bucketIndex], currentObjectPath);
}
Expand Down Expand Up @@ -4133,18 +4141,21 @@ void LocalWorker::s3ModeIterateCustomObjects()

if(benchPhase == BenchPhase_CREATEFILES)
{
if(rangeLen < fileSize)
s3ModeUploadObjectMultiPartShared(bucketName,
objectPrefix + currentPathElem.path, fileSize);
else
{ // this worker uploads the whole object
if(blockSize < fileSize)
s3ModeUploadObjectMultiPart(bucketName,
objectPrefix + currentPathElem.path);
else
s3ModeUploadObjectSinglePart(bucketName,
objectPrefix + currentPathElem.path);
}
if(rangeLen < fileSize)
{
s3ModeUploadObjectMultiPartShared(bucketName,
objectPrefix + currentPathElem.path, fileSize);
}
else
{ // this worker uploads the whole object
if(blockSize < fileSize)
s3ModeUploadObjectMultiPart(bucketName,
objectPrefix,
objectPrefix + currentPathElem.path);
else
s3ModeUploadObjectSinglePart(bucketName,
objectPrefix + currentPathElem.path);
}
}

if(benchPhase == BenchPhase_READFILES)
Expand Down Expand Up @@ -4211,7 +4222,7 @@ void LocalWorker::s3ModeThrowOnError(
IF_LIKELY(outcome.IsSuccess() )
return;

const auto s3Error = outcome.GetError();
const S3ErrorType s3Error = outcome.GetError();

std::stringstream errStr;
errStr << failMessage << std::endl <<
Expand Down Expand Up @@ -4734,7 +4745,7 @@ void LocalWorker::s3ModeUploadObjectSinglePart(std::string bucketName, std::stri
*
* @throw WorkerException on error.
*/
void LocalWorker::s3ModeUploadObjectMultiPart(std::string bucketName, std::string objectName)
void LocalWorker::s3ModeUploadObjectMultiPart(std::string bucketName, std::string prefix, std::string objectName)
{
#ifndef S3_SUPPORT
throw WorkerException(std::string(__func__) + "called, but this was built without S3 support");
Expand Down Expand Up @@ -4887,6 +4898,81 @@ void LocalWorker::s3ModeUploadObjectMultiPart(std::string bucketName, std::strin
atomicLiveOps.numIOPSDone++;
}

// List parts before completion to validate
if (progArgs->getDoS3ListParts()) {
S3::ListPartsRequest listPartsRequest;
listPartsRequest.WithBucket(bucketName)
.WithKey(objectName)
.WithUploadId(uploadID);

OPLOG_PRE_OP("S3ListParts", bucketName + "/" + objectName, 0, 0);

auto listPartsOutcome = s3Client->ListParts(listPartsRequest);

OPLOG_POST_OP("S3ListParts", bucketName + "/" + objectName, 0, 0,
!listPartsOutcome.IsSuccess());

IF_UNLIKELY(!listPartsOutcome.IsSuccess()) {
auto s3Error = listPartsOutcome.GetError();
if (!ignoreS3Errors) {
throw WorkerException(
std::string("List parts failed. ") + "Bucket: " + bucketName + "; " +
"Key: " + objectName + "; " + "UploadId: " + uploadID + "; " +
"Error: " + s3Error.GetExceptionName() + "; " +
"Message: " + s3Error.GetMessage());
}
}
// Validate that the number of parts matches
const auto &parts = listPartsOutcome.GetResult().GetParts();
size_t expectedPartCount = completedMultipartUpload.GetParts().size();
size_t actualPartCount = parts.size();

IF_UNLIKELY(actualPartCount != expectedPartCount) {
if (!ignoreS3Errors) {
throw WorkerException(
std::string("List parts count mismatch. ") + "Bucket: " + bucketName +
"; " + "Key: " + objectName + "; " +
"Expected parts: " + std::to_string(expectedPartCount) + "; " +
"Actual parts: " + std::to_string(actualPartCount));
}
}
}

// List multipart uploads before completion
if (progArgs->getDoS3ListMPU()) {
S3::ListMultipartUploadsRequest listRequest;
listRequest.WithBucket(bucketName).WithPrefix(prefix);

OPLOG_PRE_OP("S3ListMultipartUploads", bucketName, 0, 0);

auto listOutcome = s3Client->ListMultipartUploads(listRequest);

OPLOG_POST_OP("S3ListMultipartUploads", bucketName, 0, 0,
!listOutcome.IsSuccess());

IF_UNLIKELY(!listOutcome.IsSuccess()) {
auto s3Error = listOutcome.GetError();
if (!ignoreS3Errors) {
throw WorkerException(std::string("Multipart upload listing failed. ") +
"Bucket: " + bucketName + "; " +
"Error: " + s3Error.GetExceptionName() + "; " +
"Message: " + s3Error.GetMessage());
}
}

auto outcome = listOutcome.GetResult();
std::cout << "Uploads size: " << outcome.GetUploads().size() << " For prefix " << outcome.GetPrefix() << std::endl;

// We expect exactly 1 multipart upload because the prefix contains the worker rank,
// making it specific enough to match only the current upload
IF_UNLIKELY(outcome.GetUploads().size() != 1) {
throw WorkerException(
std::string("Expected exactly 1 multipart upload, but found ") +
std::to_string(outcome.GetUploads().size()) + "; " +
"Bucket: " + bucketName + "; " + "Prefix: " + outcome.GetPrefix());
}
}

// S T E P 3: submit upload completion

IF_UNLIKELY(s3NoMpuCompletion)
Expand Down
2 changes: 1 addition & 1 deletion source/workers/LocalWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ class LocalWorker : public Worker
void s3ModeGetBucketVersioning(const std::string& bucketName);
void s3ModePutBucketVersioning(const std::string& bucketName, bool enable = true);
void s3ModeUploadObjectSinglePart(std::string bucketName, std::string objectName);
void s3ModeUploadObjectMultiPart(std::string bucketName, std::string objectName);
void s3ModeUploadObjectMultiPart(std::string bucketName, std::string prefix, std::string objectName);
void s3ModeUploadObjectMultiPartAsync(std::string bucketName, std::string objectName);
void s3ModeUploadObjectMultiPartShared(std::string bucketName, std::string objectName,
uint64_t objectTotalSize);
Expand Down