Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlobContainer: add copyBlob method #125737

Merged
merged 39 commits into from
Apr 9, 2025
Merged

BlobContainer: add copyBlob method #125737

merged 39 commits into from
Apr 9, 2025

Conversation

bcully
Copy link
Contributor

@bcully bcully commented Mar 26, 2025

If a container implements copyBlob, then the copy is performed by the store, without client-side IO. If the store does not provide a copy operation then the default implementation throws UnsupportedOperationException.

This change provides implementations for the FS and S3 blob containers. More will follow.

Refs ES-11059

If a container implements copyBlob, then the copy is
performed by the store, without client-side IO. If the store
does not provide a copy operation then the default implementation
throws UnsupportedOperationException.

This change provides implementations for the FS and S3 blob containers.
More will follow.
@bcully bcully added >non-issue :Distributed Indexing/Distributed A catch all label for anything in the Distributed Indexing Area. Please avoid if you can. v9.1.0 labels Mar 26, 2025
@bcully bcully requested review from Tim-Brooks and ankikuma March 26, 2025 22:26
@elasticsearchmachine elasticsearchmachine added the Team:Distributed Indexing Meta label for Distributed Indexing team label Mar 26, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing)

@bcully
Copy link
Contributor Author

bcully commented Mar 26, 2025

@elasticmachine run elasticsearch-ci/third-party / s3

@bcully bcully requested a review from a team as a code owner March 26, 2025 23:09
Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please could you also extend the repository analyzer (RepositoryAnalyzeAction and friends) to cover this API?

@DaveCTurner
Copy link
Contributor

Please could you also extend the repository analyzer (RepositoryAnalyzeAction and friends) to cover this API?

Sorry, meant to add more detail before submitting that review:


I'd suggest doing something with BlobAnalyzeAction to sometimes copy the blob it just wrote and then perform (some of?) the reads from the copy. Ideally we'd be checking for atomicity of reads from the copy just like we do today, making sure that we never see a partiallly-copied blob.

Skipping checks on a UOE is ok, there's prior art for that:

if (e instanceof UnsupportedOperationException) {
// Registers are not supported on all repository types, and that's ok.
listener.onResponse(null);
} else {
listener.onFailure(e);
}

throw new IllegalArgumentException("target blob container must be a S3BlobContainer");
}
if (failIfAlreadyExists) {
throw new UnsupportedOperationException("S3 blob container does not support failIfAlreadyExists");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably outside of the scope of the ticket, but I'm pretty sure S3 does now support this. At least for Puts. https://aws.amazon.com/about-aws/whats-new/2024/08/amazon-s3-conditional-writes/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I could tell, while this is true for puts, it is not for copy. On put, there is
If-None-Match

Uploads the object only if the object key name does not already exist in the bucket specified. Otherwise, Amazon S3 returns a 412 Precondition Failed error.`

But on copy, there only appears to be a set of preconditions on the source, not the target.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather we didn't add this argument if it's not going to be supported on S3. Instead, the caller needs to find a way to use this feature that does not require it to fail if the blob already exists. Typically that means we pick a unique fresh name for the blob after we've decided on its contents, such that if the blob does already happen to exist then the operation already succeeded, the blob contents are already correct, and thus retrying the operation will do no harm.

I know we have had this flag on the writeBlob API for years but this was IMO a mistake given the lack of support for this feature in S3 at the time. We don't actually care any more if the blob exists, because of our move towards unique fresh names in all other places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind removing it. The specific workload motivating this PR is to be able to seed a new shard from the contents of an old one when splitting, and for that the source names have the uniqueness we want, and it is probably even convenient for retry to not fail if we happen to copy the same source file to the same destination twice.

It would be nice for safety to be able to assert that if the file is already present its etag matches what we expect and fail otherwise, since that indicates a logic error and potential data loss. I don't think that we can do that atomically with S3 right now, and I'm on the fence about whether it's worth doing it non-atomically (heading the destination before starting the copy). On the one hand it doubles the request count for what's expected to be a hit rate of zero if we're not buggy, but on the other hand split is going to be a relatively infrequent operation so the total request count may not really matter much.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest not doing this belt-and-braces approach, at least not in the production code. We should be able to catch this sort of problem in tests with a sufficiently picky test fixture instead. I would expect this kind of bug would eventually manifest as a corruption in Lucene itself if it really did happen in production.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed failIfAlreadyExists in ec271f5

*
* Server-side copy can be done for any size object, but if the object is larger than 5 GB then
* it must be done through a series of part copy operations rather than a single blob copy.
* See <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html">CopyObject</a>.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this fail exactly? Is the plan for this to be handled in a follow-up (supporting multi part copy)? Or are users of this supposed to catch this type of error and then resort to GET + PUT.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we discussed this a bit on Tuesday I think. The API doesn't have a size hint right now, so if the source object is larger than 5GB, I expect this operation to return an error. I was thinking that the calling function would handle this in the same way as it might currently get unsupported operation on azure, by falling back to get+put, with the option that we add a variant of copyBlob with a size hint also possible as a followup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want our long-term plan to involve throwing a UnsupportedOperationException in this case. We'll almost certainly forget that we need to handle a UOE in the caller at some point, and given the rarity (but not impossibility) of encountering blobs that exceed 5GiB we will likely first notice when it causes a production outage. It's unreasonable to expect unit tests to cover such large cases. I see that a UOE is ok for now but we need to make this method work either always or never in the long run.

Please could we add a configurable limit on the maximum CopyObject size so we don't rely on the S3 API returning an error, and so that we can reduce the limit to something tiny in tests?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... or if we don't even know the size of the source blob ahead of time, could we at least impose a much smaller limit in the test fixture so we encounter this limitation in tests sometimes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I don't think we would throw UOE if the source object were over 5GB. That's an S3-side limit and I'd expect that to come back as a regular failed operation (a subclass of IOException IIRC).

I will test manually on S3 to get the concrete error it produces for larger than 5GB files and make test coverage of that case by generating the same error in a fixture. I'll also just go ahead and add multipart copy support now, triggered by receiving that error from S3. Later we can add a variant of copyBlob that takes a source size if known to avoid having to make a first copy attempt and handle failure, if it seems worthwhile.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah sorry I got the two threads mixed up. No UOE here indeed, just some kind of AmazonS3Exception or something. But still the point about this being a rarely-hit and easily-forgotten path remains, so yes please it'd be ideal if we could do multipart copy at the same time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My own notes here from manual testing: attempting to copy an object larger than 5g with aws s3api copy-object --debug produces an HTTP 200 InvalidRequest error response:

Response headers: {'x-amz-request-id': 'CZWYF5KNMSB99PP3', 'x-amz-id-2': 'vmTYh6g26hF/wU5PspZopMGFOepUe0tIzmU/9QT4DAe2BlvOW1Qk1raP5e46zTdsRImZZhyzB4tUtrAYPflUrQ==', 'Content-Type': 'application/xml', 'Transfer-Encoding': 'chunked', 'Date': 'Sat, 29 Mar 2025 00:06:49 GMT', 'Connection': 'close', 'Server': 'AmazonS3'}
Response body:
b'<Error><Code>InvalidRequest</Code><Message>The specified copy source is larger than the maximum allowable size for a copy source: 5368709120</Message><RequestId>CZWYF5KNMSB99PP3</RequestId><HostId>vmTYh6g26hF/wU5PspZopMGFOepUe0tIzmU/9QT4DAe2BlvOW1Qk1raP5e46zTdsRImZZhyzB4tUtrAYPflUrQ==</HostId></Error>'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added multipart copy, adjusted the copyBlob signature to take a length, and branched to multipart copy when the length is too long for plain copyObject in f4aad52.

public HttpStatsCollectorHandler(HttpHandler delegate, String[] operations) {
this.delegate = delegate;
for (String operation : operations) {
operationCount.put(operation, 0L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this change? Just pre-initializing to 0? If we preinitialize here, when are we then still doing operationCount.getOrDefault(requestType, 0L) + 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I noticed that we are currently filtering out 0 stats from the actual metrics because we only lazily create them on the expected metrics, and it seemed more symmetric with the actual metrics to prepopulate the expected ones. You're right that we shouldn't need getOrDefault any more, I'll replace that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 27656b2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And faa0a83

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you pull these out into a separate PR @bcully? Happy to review it (or even pull it out myself if you'd prefer), just seems nicer to keep this one focussed on the copyBlob implementation.

Copy link
Contributor Author

@bcully bcully Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have said how I noticed that we're filtering out 0 stats (removed in the hunk just above this). It's that without this change, a couple tests break, S3BlobStoreRepositoryTests::testRequestStats:

Expected :{HeadObject=3, PutMultipartObject=36, GetObject=120, ListObjects=12, PutObject=114, DeleteObjects=6}
Actual   :{CopyObject=0, PutMultipartObject=36, HeadObject=3, PutObject=114, DeleteObjects=6, ListObjects=12, GetObject=120}

and
testAbortRequestStats:

Expected :{GetObject=2, ListObjects=4, DeleteObjects=3, PutObject=30, AbortMultipartObject=2, HeadObject=1, PutMultipartObject=4}
Actual   :{ListObjects=4, HeadObject=1, PutMultipartObject=4, AbortMultipartObject=2, DeleteObjects=3, PutObject=30, CopyObject=0, GetObject=2}

The fixes I saw were to either add additional filtering for both tests or preinitialize the metrics (generating non-zero copy metrics for this test didn't seem like an option since this test appears to be running higher level API operations and none of those use copy yet), and it seemed like preinitializing was cleaner and only a little longer.

I'm happy to replace this with additional filtering if you prefer it, just wanted to provide some context I should have noted earlier.

bcully added 9 commits March 28, 2025 14:11
We are now preinitializing the metric map rather than updating it
lazily.
And remove the constructor that depends on lazy initialization.
s3's multipart copy initiates the multipart operation on the
destination, so this avoids confusion.

Also include the blob length in the API to make it convenient for S3.
We can add a no-length API later if we don't always have it at hand.
@bcully
Copy link
Contributor Author

bcully commented Apr 7, 2025

I'd suggest doing something with BlobAnalyzeAction to sometimes copy the blob it just wrote and then perform (some of?) the reads from the copy. Ideally we'd be checking for atomicity of reads from the copy just like we do today, making sure that we never see a partiallly-copied blob.

I've been poking at this and while I don't think it's an enormous lift I do feel like it involves some choices that will need a couple of round trips. I wonder if you'd be ok with me doing it in a separate followup PR to a) keep it focused and b) unblock using the primitive to make progress on autosharding?

@DaveCTurner
Copy link
Contributor

I would rather it happened at the same time if at all possible. We have a bit of a habit of not quite finding the time to do this kind of follow-up later on, and gaps in the repo analysis process are a massive headache in the long run.

Repo analysis doesn't have particularly strong bwc guarantees so don't worry too much about getting a perfect design up front. As long as we hit the CopyObject API I'll be happy - I'm less concerned about stringent atomicity checks or anything like that.

@bcully
Copy link
Contributor Author

bcully commented Apr 7, 2025

I would rather it happened at the same time if at all possible. We have a bit of a habit of not quite finding the time to do this kind of follow-up later on, and gaps in the repo analysis process are a massive headache in the long run.

Repo analysis doesn't have particularly strong bwc guarantees so don't worry too much about getting a perfect design up front. As long as we hit the CopyObject API I'll be happy - I'm less concerned about stringent atomicity checks or anything like that.

Alright, I'll push something today. It may not have coverage of overwriting the source during the copy but the skeleton should be in place.

The control flow is a bit of a mess.
@bcully
Copy link
Contributor Author

bcully commented Apr 8, 2025

I'm frankly a bit twisted around by the control flow in BlobAnalyzeAction but I've pushed something anyway just as a basis for conversation.

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it. Left a couple of small comments.

@@ -521,6 +521,13 @@ public void run() {
final long targetLength = blobSizes.get(i);
final boolean smallBlob = targetLength <= MAX_ATOMIC_WRITE_SIZE; // avoid the atomic API for larger blobs
final boolean abortWrite = smallBlob && request.isAbortWritePermitted() && rarely(random);
final boolean doCopy = random.nextBoolean();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd be inclined to have the RepositoryAnalyzeAction control the name of the copied blob rather than just setting a flag. That way we can record it in AsyncAction#expectedBlobs and check it exists in the listing at the end.

Also this can be a rare action, we don't need to copy half of the blobs (+50% storage size on average).

Copy link
Contributor

@DaveCTurner DaveCTurner Apr 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually on reflection it's probably simplest not to put this in expectedBlobs (for now anyway). Not only do we have to think about the doEarlyCopy case but also we need to handle the fact that GCS/Azure will fail the copy with a UOE.

I'd still be inclined to have the RepositoryAnalyzeAction specify the blob name rather than just set a flag in the request because we'll want to do these things in future, but that's the only change I'd suggest in this area.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'll pass a nullable target blob name instead of the flag.

Also this can be a rare action, we don't need to copy half of the blobs (+50% storage size on average).

FWIW the trade I made was to decrement the number of blobs analyzed by one for each analysis that chose to do a copy, to try to better respect the limits being set. But yeah, more copies does mean less exercising of the regular path. Making it rare probably also gives me the knob I need to fix the FailureIT :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed d9db70d which should fix the FailureIT

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'll pass a nullable target blob name instead of the flag.

done in dd7ded8

@@ -230,6 +258,7 @@ private static class BlobAnalysis {
checksumStart = randomLongBetween(0L, request.targetLength);
checksumEnd = randomLongBetween(checksumStart + 1, request.targetLength + 1);
}
doEarlyCopy = random.nextBoolean();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, but consider this an optional bonus feature, I'd be happy even if we only covered the copy-after-write path. Note that we can only track the copy as an expected blob in RepositoryAnalyzeAction if it doesn't potentially fail because of an early copy, so we'd need to pull this parameter up to the top level too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you prefer that I remove early copy from this PR then?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine as-is, but if it causes any issues getting tests to pass etc. then removing it is also fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good. I'll just take a look at why S3HttpHandler's missing multipart implementation isn't causing failures then, unless I see other failures.

@DaveCTurner DaveCTurner dismissed their stale review April 9, 2025 09:45

repo analysis now exercises copyBlob 👍

@@ -467,6 +508,27 @@ static List<String> extractPartEtags(BytesReference completeMultipartUploadBody)
}
}

private static final Pattern rangePattern = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$");

private static Tuple<Long, Long> parsePartRange(final HttpExchange exchange) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use org.elasticsearch.test.fixture.HttpHeaderParser#parseRangeHeader here? Although I do like how this pattern is anchored at start and end, maybe we should do that in HttpHeaderParser too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sure, I hadn't seen it, will do. I am inclined not to change that pattern here but happy to do it as a self-contained change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 25e3a1b

Comment on lines 308 to 310
long getLargeCopyThresholdInBytes() {
return MAX_FILE_SIZE.getBytes();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make this a setting similar to the buffer_size and chunk_size ones? Ideally imposing the same limitations as S3 proper (so a minimum of 5MiB and a max of 5GiB)? That way we can adjust it in S3RepositoryAnalysisRestIT to get the coverage of both paths there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in a9ff4b9

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except, please can we backport this to 8.19 too? It's going to make the AWS SDK upgrade even more painful than it already is if we have this extra API in 9.1 vs in 8.19. scratch that, we discussed elsewhere - we'll work around the conflicts somehow

@bcully bcully merged commit c1a71ff into elastic:main Apr 9, 2025
17 checks passed
@bcully bcully deleted the ES-11059 branch April 9, 2025 17:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Indexing/Distributed A catch all label for anything in the Distributed Indexing Area. Please avoid if you can. >non-issue Team:Distributed Indexing Meta label for Distributed Indexing team v9.1.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants