Skip to content

Commit aa16315

Browse files
committed
Added list parts and list mpus at the end of mpus
1 parent 27ee53a commit aa16315

7 files changed

Lines changed: 266 additions & 37 deletions

File tree

CHANGELOG.md

Lines changed: 134 additions & 11 deletions
Large diffs are not rendered by default.

dist/etc/bash_completion.d/elbencho

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ _elbencho_opts()
129129
--s3bversion
130130
--s3bversionverify
131131
--s3chksumalgo
132+
--s3listmpu
133+
--s3listparts
132134
--s3endpoints
133135
--s3fastget
134136
--s3fastput

source/ProgArgs.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,11 @@ void ProgArgs::defineAllowedArgs()
551551
"\"--" ARG_S3BUCKETTAG_LONG "\")")
552552
/*s3b*/ (ARG_S3BUCKETVER_LONG, bpo::bool_switch(&this->doS3BucketVersioning),
553553
"Activate bucket versioning operations.")
554+
(ARG_S3LISTMPU_LONG, bpo::bool_switch(&this->doS3ListMPU),
555+
"Enable list multipart uploads requests during multipart "
556+
"uploads. (Off by default)")
557+
(ARG_S3LISTPARTS_LONG, bpo::bool_switch(&this->doS3ListParts),
558+
"Enable list parts requests during multipart uploads. (Off by default)")
554559
/*s3b*/ (ARG_S3BUCKETVERVERIFY_LONG, bpo::bool_switch(&this->doS3BucketVersioningVerify),
555560
"Verify the correctness of S3 bucket versioning settings. (Requires "
556561
"\"--" ARG_S3BUCKETVER_LONG "\")")
@@ -891,6 +896,8 @@ void ProgArgs::defineDefaults()
891896
this->doS3BucketVersioningVerify = false;
892897
this->doS3ObjectTag = false;
893898
this->doS3ObjectTagVerify = false;
899+
this->doS3ListMPU = false;
900+
this->doS3ListParts = false;
894901
this->doS3ObjectLockCfg = false;
895902
this->doS3ObjectLockCfgVerify = false;
896903
this->useOpsLogLocking = false;
@@ -3329,6 +3336,8 @@ void ProgArgs::setFromPropertyTreeForService(bpt::ptree& tree)
33293336
doS3BucketTag = tree.get<bool>(ARG_S3BUCKETTAG_LONG);
33303337
doS3BucketTagVerify = tree.get<bool>(ARG_S3BUCKETTAGVERIFY_LONG);
33313338
doS3BucketVersioning = tree.get<bool>(ARG_S3BUCKETVER_LONG);
3339+
doS3ListMPU = tree.get<bool>(ARG_S3LISTMPU_LONG);
3340+
doS3ListParts = tree.get<bool>(ARG_S3LISTPARTS_LONG);
33323341
doS3BucketVersioningVerify = tree.get<bool>(ARG_S3BUCKETVERVERIFY_LONG);
33333342
doS3ObjectTag = tree.get<bool>(ARG_S3OBJTAG_LONG);
33343343
doS3ObjectTagVerify = tree.get<bool>(ARG_S3OBJTAGVERIFY_LONG);
@@ -3556,6 +3565,8 @@ void ProgArgs::getAsPropertyTreeForService(bpt::ptree& outTree, size_t serviceRa
35563565
outTree.put(ARG_S3BUCKETTAG_LONG, doS3BucketTag);
35573566
outTree.put(ARG_S3BUCKETTAGVERIFY_LONG, doS3BucketTagVerify);
35583567
outTree.put(ARG_S3BUCKETVER_LONG, doS3BucketVersioning);
3568+
outTree.put(ARG_S3LISTMPU_LONG, doS3ListMPU);
3569+
outTree.put(ARG_S3LISTPARTS_LONG, doS3ListParts);
35593570
outTree.put(ARG_S3BUCKETVERVERIFY_LONG, doS3BucketVersioningVerify);
35603571
outTree.put(ARG_S3OBJTAG_LONG, doS3ObjectTag);
35613572
outTree.put(ARG_S3OBJTAGVERIFY_LONG, doS3ObjectTagVerify);

source/ProgArgs.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,8 @@ namespace bpt = boost::property_tree;
182182
#define ARG_S3CHECKSUM_ALGO_LONG "s3chksumalgo" // parameter for x-amz-sdk-checksum-algorithm
183183
#define ARG_S3SSEKMSKEY_LONG "s3ssekmskey"
184184
#define ARG_S3STATDIRS_LONG "s3statdirs"
185+
#define ARG_S3LISTMPU_LONG "s3listmpu"
186+
#define ARG_S3LISTPARTS_LONG "s3listparts"
185187
#define ARG_SENDBUFSIZE_LONG "sendbuf"
186188
#define ARG_SERVERS_LONG "servers"
187189
#define ARG_SERVERSFILE_LONG "serversfile"
@@ -377,6 +379,8 @@ class ProgArgs
377379
bool doS3ObjectTagVerify; // do bucket tagging verification.
378380
bool doS3ObjectLockCfg; // do S3 object lock configuration
379381
bool doS3ObjectLockCfgVerify; // do S3 object lock configuration verification
382+
bool doS3ListMPU; // enable list multipart uploads requests
383+
bool doS3ListParts; // enable list parts requests
380384
bool doTruncate; // truncate files to 0 size on open for writing
381385
bool doTruncToSize; // truncate files to size on creation via ftruncate()
382386
unsigned fadviseFlags; // flags for fadvise() (ARG_FADVISE_FLAG_x)
@@ -639,7 +643,9 @@ class ProgArgs
639643
bool getDoS3ObjectLockConfiguration() const { return doS3ObjectLockCfg; }
640644
bool getDoS3ObjectLockConfigurationVerify() const { return doS3ObjectLockCfgVerify; }
641645
bool getDoS3AclPutInline() const { return doS3AclPutInline; }
642-
bool getDoS3AclVerify() const { return doS3AclVerify; }
646+
bool getDoS3AclVerify() const { return doS3AclVerify; }
647+
bool getDoS3ListMPU() const { return doS3ListMPU; }
648+
bool getDoS3ListParts() const { return doS3ListParts; }
643649
bool getDoTruncate() const { return doTruncate; }
644650
bool getDoTruncToSize() const { return doTruncToSize; }
645651
bool getDoListObjVerify() const { return doS3ListObjVerify; }

source/toolkits/S3Tk.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#define TOOLKITS_S3TK_H_
33

44
#ifdef S3_SUPPORT
5+
#include "Common.h"
56
#include <aws/core/Aws.h>
67
#include <aws/core/client/DefaultRetryStrategy.h>
78
#include <aws/core/utils/HashingUtils.h>

source/workers/LocalWorker.cpp

Lines changed: 110 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@
4141
#include INCLUDE_AWS_S3(model/GetBucketVersioningRequest.h)
4242
#include INCLUDE_AWS_S3(model/HeadObjectRequest.h)
4343
#include INCLUDE_AWS_S3(model/HeadBucketRequest.h)
44+
#include INCLUDE_AWS_S3(model/ListMultipartUploadsRequest.h)
4445
#include INCLUDE_AWS_S3(model/ListObjectsV2Request.h)
46+
#include INCLUDE_AWS_S3(model/ListPartsRequest.h)
4547
#include INCLUDE_AWS_S3(model/Object.h)
4648
#include INCLUDE_AWS_S3(model/ObjectLockRule.h)
4749
#include INCLUDE_AWS_S3(model/PutBucketAclRequest.h)
@@ -3882,15 +3884,9 @@ void LocalWorker::s3ModeIterateObjects()
38823884
IF_UNLIKELY( (fileIndex % INTERRUPTION_CHECK_INTERVAL) == 0)
38833885
checkInterruptionRequest();
38843886

3885-
// generate current dir path
3886-
int printRes;
3887-
3888-
if(haveSubdirs)
3889-
printRes = snprintf(currentPath.data(), PATH_BUF_LEN, "r%zu/d%zu/r%zu-f%zu",
3890-
workerDirRank, dirIndex, workerRank, fileIndex);
3891-
else
3892-
printRes = snprintf(currentPath.data(), PATH_BUF_LEN, "r%zu-f%zu",
3893-
workerRank, fileIndex);
3887+
// generate current file path (subdir is added to objectPrefix below)
3888+
int printRes = snprintf(currentPath.data(), PATH_BUF_LEN, "r%zu-f%zu",
3889+
workerRank, fileIndex);
38943890

38953891
IF_UNLIKELY(printRes >= PATH_BUF_LEN)
38963892
throw WorkerException("object path too long for static buffer. "
@@ -3902,6 +3898,16 @@ void LocalWorker::s3ModeIterateObjects()
39023898
if(objectPrefixRand)
39033899
objectPrefix = getS3RandObjectPrefix(
39043900
workerRank, dirIndex, fileIndex, progArgs->getS3ObjectPrefix() );
3901+
else if(haveSubdirs)
3902+
{
3903+
// add subdir to objectPrefix instead of currentPath
3904+
char subdirBuf[PATH_BUF_LEN];
3905+
snprintf(subdirBuf, PATH_BUF_LEN, "r%zu/d%zu/",
3906+
workerDirRank, dirIndex);
3907+
objectPrefix = progArgs->getS3ObjectPrefix() + std::string(subdirBuf);
3908+
}
3909+
else
3910+
objectPrefix = progArgs->getS3ObjectPrefix();
39053911

39063912
unsigned bucketIndex = (workerRank + dirIndex) % bucketVec.size();
39073913
std::string currentObjectPath = objectPrefix + currentPath.data();
@@ -3914,7 +3920,9 @@ void LocalWorker::s3ModeIterateObjects()
39143920
if( (benchPhase == BenchPhase_CREATEFILES) && !isRWMixedReader)
39153921
{
39163922
if(blockSize < fileSize)
3917-
s3ModeUploadObjectMultiPart(bucketVec[bucketIndex], currentObjectPath);
3923+
s3ModeUploadObjectMultiPart(bucketVec[bucketIndex],
3924+
objectPrefix,
3925+
currentObjectPath);
39183926
else
39193927
s3ModeUploadObjectSinglePart(bucketVec[bucketIndex], currentObjectPath);
39203928
}
@@ -4133,18 +4141,21 @@ void LocalWorker::s3ModeIterateCustomObjects()
41334141

41344142
if(benchPhase == BenchPhase_CREATEFILES)
41354143
{
4136-
if(rangeLen < fileSize)
4137-
s3ModeUploadObjectMultiPartShared(bucketName,
4138-
objectPrefix + currentPathElem.path, fileSize);
4139-
else
4140-
{ // this worker uploads the whole object
4141-
if(blockSize < fileSize)
4142-
s3ModeUploadObjectMultiPart(bucketName,
4143-
objectPrefix + currentPathElem.path);
4144-
else
4145-
s3ModeUploadObjectSinglePart(bucketName,
4146-
objectPrefix + currentPathElem.path);
4147-
}
4144+
if(rangeLen < fileSize)
4145+
{
4146+
s3ModeUploadObjectMultiPartShared(bucketName,
4147+
objectPrefix + currentPathElem.path, fileSize);
4148+
}
4149+
else
4150+
{ // this worker uploads the whole object
4151+
if(blockSize < fileSize)
4152+
s3ModeUploadObjectMultiPart(bucketName,
4153+
objectPrefix,
4154+
objectPrefix + currentPathElem.path);
4155+
else
4156+
s3ModeUploadObjectSinglePart(bucketName,
4157+
objectPrefix + currentPathElem.path);
4158+
}
41484159
}
41494160

41504161
if(benchPhase == BenchPhase_READFILES)
@@ -4211,7 +4222,7 @@ void LocalWorker::s3ModeThrowOnError(
42114222
IF_LIKELY(outcome.IsSuccess() )
42124223
return;
42134224

4214-
const auto s3Error = outcome.GetError();
4225+
const S3ErrorType s3Error = outcome.GetError();
42154226

42164227
std::stringstream errStr;
42174228
errStr << failMessage << std::endl <<
@@ -4734,7 +4745,7 @@ void LocalWorker::s3ModeUploadObjectSinglePart(std::string bucketName, std::stri
47344745
*
47354746
* @throw WorkerException on error.
47364747
*/
4737-
void LocalWorker::s3ModeUploadObjectMultiPart(std::string bucketName, std::string objectName)
4748+
void LocalWorker::s3ModeUploadObjectMultiPart(std::string bucketName, std::string prefix, std::string objectName)
47384749
{
47394750
#ifndef S3_SUPPORT
47404751
throw WorkerException(std::string(__func__) + "called, but this was built without S3 support");
@@ -4887,6 +4898,81 @@ void LocalWorker::s3ModeUploadObjectMultiPart(std::string bucketName, std::strin
48874898
atomicLiveOps.numIOPSDone++;
48884899
}
48894900

4901+
// List parts before completion to validate
4902+
if (progArgs->getDoS3ListParts()) {
4903+
S3::ListPartsRequest listPartsRequest;
4904+
listPartsRequest.WithBucket(bucketName)
4905+
.WithKey(objectName)
4906+
.WithUploadId(uploadID);
4907+
4908+
OPLOG_PRE_OP("S3ListParts", bucketName + "/" + objectName, 0, 0);
4909+
4910+
auto listPartsOutcome = s3Client->ListParts(listPartsRequest);
4911+
4912+
OPLOG_POST_OP("S3ListParts", bucketName + "/" + objectName, 0, 0,
4913+
!listPartsOutcome.IsSuccess());
4914+
4915+
IF_UNLIKELY(!listPartsOutcome.IsSuccess()) {
4916+
auto s3Error = listPartsOutcome.GetError();
4917+
if (!ignoreS3Errors) {
4918+
throw WorkerException(
4919+
std::string("List parts failed. ") + "Bucket: " + bucketName + "; " +
4920+
"Key: " + objectName + "; " + "UploadId: " + uploadID + "; " +
4921+
"Error: " + s3Error.GetExceptionName() + "; " +
4922+
"Message: " + s3Error.GetMessage());
4923+
}
4924+
}
4925+
// Validate that the number of parts matches
4926+
const auto &parts = listPartsOutcome.GetResult().GetParts();
4927+
size_t expectedPartCount = completedMultipartUpload.GetParts().size();
4928+
size_t actualPartCount = parts.size();
4929+
4930+
IF_UNLIKELY(actualPartCount != expectedPartCount) {
4931+
if (!ignoreS3Errors) {
4932+
throw WorkerException(
4933+
std::string("List parts count mismatch. ") + "Bucket: " + bucketName +
4934+
"; " + "Key: " + objectName + "; " +
4935+
"Expected parts: " + std::to_string(expectedPartCount) + "; " +
4936+
"Actual parts: " + std::to_string(actualPartCount));
4937+
}
4938+
}
4939+
}
4940+
4941+
// List multipart uploads before completion
4942+
if (progArgs->getDoS3ListMPU()) {
4943+
S3::ListMultipartUploadsRequest listRequest;
4944+
listRequest.WithBucket(bucketName).WithPrefix(prefix);
4945+
4946+
OPLOG_PRE_OP("S3ListMultipartUploads", bucketName, 0, 0);
4947+
4948+
auto listOutcome = s3Client->ListMultipartUploads(listRequest);
4949+
4950+
OPLOG_POST_OP("S3ListMultipartUploads", bucketName, 0, 0,
4951+
!listOutcome.IsSuccess());
4952+
4953+
IF_UNLIKELY(!listOutcome.IsSuccess()) {
4954+
auto s3Error = listOutcome.GetError();
4955+
if (!ignoreS3Errors) {
4956+
throw WorkerException(std::string("Multipart upload listing failed. ") +
4957+
"Bucket: " + bucketName + "; " +
4958+
"Error: " + s3Error.GetExceptionName() + "; " +
4959+
"Message: " + s3Error.GetMessage());
4960+
}
4961+
}
4962+
4963+
auto outcome = listOutcome.GetResult();
4964+
std::cout << "Uploads size: " << outcome.GetUploads().size() << " For prefix " << outcome.GetPrefix() << std::endl;
4965+
4966+
// We expect exactly 1 multipart upload because the prefix contains the worker rank,
4967+
// making it specific enough to match only the current upload
4968+
IF_UNLIKELY(outcome.GetUploads().size() != 1) {
4969+
throw WorkerException(
4970+
std::string("Expected exactly 1 multipart upload, but found ") +
4971+
std::to_string(outcome.GetUploads().size()) + "; " +
4972+
"Bucket: " + bucketName + "; " + "Prefix: " + outcome.GetPrefix());
4973+
}
4974+
}
4975+
48904976
// S T E P 3: submit upload completion
48914977

48924978
IF_UNLIKELY(s3NoMpuCompletion)

source/workers/LocalWorker.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ class LocalWorker : public Worker
251251
void s3ModeGetBucketVersioning(const std::string& bucketName);
252252
void s3ModePutBucketVersioning(const std::string& bucketName, bool enable = true);
253253
void s3ModeUploadObjectSinglePart(std::string bucketName, std::string objectName);
254-
void s3ModeUploadObjectMultiPart(std::string bucketName, std::string objectName);
254+
void s3ModeUploadObjectMultiPart(std::string bucketName, std::string prefix, std::string objectName);
255255
void s3ModeUploadObjectMultiPartAsync(std::string bucketName, std::string objectName);
256256
void s3ModeUploadObjectMultiPartShared(std::string bucketName, std::string objectName,
257257
uint64_t objectTotalSize);

0 commit comments

Comments
 (0)