From 3ceee7fffd6e1d25010f712ac886d5df9ff20212 Mon Sep 17 00:00:00 2001 From: hdavidh Date: Wed, 30 Jul 2025 23:40:09 -0700 Subject: [PATCH 1/8] Fix bug in MultipartS3AsyncClient GetObject retryable errors --- .../next-release/bugfix-AmazonS3-263fed5.json | 6 + .../internal/async/SplittingTransformer.java | 107 ++++-- .../IndividualPartSubscriberTckTest.java | 3 +- services/s3/pom.xml | 5 + .../multipart/DownloadObjectHelper.java | 2 + .../MultipartDownloaderSubscriber.java | 6 + .../multipart/MultipartS3AsyncClient.java | 5 +- ...3MultipartClientGetObjectWiremockTest.java | 313 ++++++++++++++++++ 8 files changed, 414 insertions(+), 33 deletions(-) create mode 100644 .changes/next-release/bugfix-AmazonS3-263fed5.json create mode 100644 services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java diff --git a/.changes/next-release/bugfix-AmazonS3-263fed5.json b/.changes/next-release/bugfix-AmazonS3-263fed5.json new file mode 100644 index 000000000000..7ac89a1b1e95 --- /dev/null +++ b/.changes/next-release/bugfix-AmazonS3-263fed5.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "Amazon S3", + "contributor": "", + "description": "Fix MultipartS3AsyncClient GetObject retryable errors" +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java index 2c76bbc1d88f..ea94e9caec95 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java @@ -25,6 +25,10 @@ import software.amazon.awssdk.core.SplittingTransformerConfiguration; import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.async.SdkPublisher; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.core.exception.SdkServiceException; +import software.amazon.awssdk.core.internal.retry.SdkDefaultRetrySetting; +import software.amazon.awssdk.core.retry.RetryUtils; import software.amazon.awssdk.utils.CompletableFutureUtils; import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.Validate; @@ -54,16 +58,6 @@ public class SplittingTransformer implements SdkPublisher upstreamResponseTransformer; - /** - * Set to true once {@code .prepare()} is called on the upstreamResponseTransformer - */ - private final AtomicBoolean preparedCalled = new AtomicBoolean(false); - - /** - * Set to true once {@code .onResponse()} is called on the upstreamResponseTransformer - */ - private final AtomicBoolean onResponseCalled = new AtomicBoolean(false); - /** * Set to true once {@code .onStream()} is called on the upstreamResponseTransformer */ @@ -111,6 +105,24 @@ public class SplittingTransformer implements SdkPublisher upstreamFuture; + + /** + * Tracks whether an {@code IndividualTransformer} has been created for the first part yet. Errors will only be retried for + * the first part. + */ + private final AtomicBoolean isFirstIndividualTransformer = new AtomicBoolean(true); + + /** + * Tracks whether an {@code IndividualPartSubscriber} has been created for the first part yet. Errors will only be retried for + * the first part. + */ + private final AtomicBoolean isFirstIndividualSubscriber = new AtomicBoolean(true); + private SplittingTransformer(AsyncResponseTransformer upstreamResponseTransformer, Long maximumBufferSizeInBytes, CompletableFuture resultFuture) { @@ -198,7 +210,7 @@ private boolean doEmit() { } if (outstandingDemand.get() > 0) { demand = outstandingDemand.decrementAndGet(); - downstreamSubscriber.onNext(new IndividualTransformer()); + downstreamSubscriber.onNext(new IndividualTransformer(isFirstIndividualTransformer.compareAndSet(true, false))); } } return false; @@ -230,6 +242,7 @@ private void handleSubscriptionCancel() { } else { log.trace(() -> "calling downstreamSubscriber.onComplete()"); downstreamSubscriber.onComplete(); + CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture); } downstreamSubscriber = null; }); @@ -259,28 +272,27 @@ private void handleFutureCancel(Throwable e) { * body publisher. */ private class IndividualTransformer implements AsyncResponseTransformer { + private final boolean isFirstPart; private ResponseT response; private CompletableFuture individualFuture; + IndividualTransformer(boolean isFirstPart) { + this.isFirstPart = isFirstPart; + } + @Override public CompletableFuture prepare() { this.individualFuture = new CompletableFuture<>(); - if (preparedCalled.compareAndSet(false, true)) { + + if (isFirstPart) { if (isCancelled.get()) { return individualFuture; } log.trace(() -> "calling prepare on the upstream transformer"); - CompletableFuture upstreamFuture = upstreamResponseTransformer.prepare(); - if (!resultFuture.isDone()) { - CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture); - } + upstreamFuture = upstreamResponseTransformer.prepare(); + } - resultFuture.whenComplete((r, e) -> { - if (e == null) { - return; - } - individualFuture.completeExceptionally(e); - }); + individualFuture.whenComplete((r, e) -> { if (isCancelled.get()) { handleSubscriptionCancel(); @@ -291,7 +303,7 @@ public CompletableFuture prepare() { @Override public void onResponse(ResponseT response) { - if (onResponseCalled.compareAndSet(false, true)) { + if (isFirstPart) { log.trace(() -> "calling onResponse on the upstream transformer"); upstreamResponseTransformer.onResponse(response); } @@ -304,7 +316,8 @@ public void onStream(SdkPublisher publisher) { return; } synchronized (cancelLock) { - if (onStreamCalled.compareAndSet(false, true)) { + if (isFirstPart) { + onStreamCalled.set(true); log.trace(() -> "calling onStream on the upstream transformer"); upstreamResponseTransformer.onStream(upstreamSubscriber -> publisherToUpstream.subscribe( DelegatingBufferingSubscriber.builder() @@ -314,17 +327,51 @@ public void onStream(SdkPublisher publisher) { ); } } - publisher.subscribe(new IndividualPartSubscriber<>(this.individualFuture, response)); + + if (!resultFuture.isDone()) { + CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture); + } + + publisher.subscribe(new IndividualPartSubscriber<>(this.individualFuture, response, + isFirstIndividualSubscriber.compareAndSet(true, false))); } @Override public void exceptionOccurred(Throwable error) { - publisherToUpstream.error(error); log.trace(() -> "calling exceptionOccurred on the upstream transformer"); upstreamResponseTransformer.exceptionOccurred(error); + + if (!isFirstPart || onStreamCalled.get()) { + publisherToUpstream.error(error); + } + + if (!isRetryableError(error)) { + log.trace(() -> "error is non-retryable, forwarding upstream future to result future"); + CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture); + } } } + private boolean isRetryableError(Throwable error) { + if (error instanceof SdkException) { + SdkException ex = (SdkException) error; + return retryOnStatusCodes(ex) + || RetryUtils.isRetryableException(ex) + || RetryUtils.isClockSkewException(ex) + || RetryUtils.isClockSkewException(ex); + + } + return false; + } + + private static boolean retryOnStatusCodes(Throwable ex) { + if (ex instanceof SdkServiceException) { + SdkServiceException failure = (SdkServiceException) ex; + return SdkDefaultRetrySetting.RETRYABLE_STATUS_CODES.contains(failure.statusCode()); + } + return false; + } + /** * the Subscriber for each of the individual request's ByteBuffer publisher */ @@ -332,11 +379,13 @@ class IndividualPartSubscriber implements Subscriber { private final CompletableFuture future; private final T response; + private final boolean isFirstPart; private Subscription subscription; - IndividualPartSubscriber(CompletableFuture future, T response) { + IndividualPartSubscriber(CompletableFuture future, T response, boolean isFirstPart) { this.future = future; this.response = response; + this.isFirstPart = isFirstPart; } @Override @@ -376,7 +425,9 @@ public void onComplete() { } private void handleError(Throwable t) { - publisherToUpstream.error(t); + if (!isFirstPart) { + publisherToUpstream.error(t); + } future.completeExceptionally(t); } } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/IndividualPartSubscriberTckTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/IndividualPartSubscriberTckTest.java index a72a3ab7aa1f..7a59134ea9bd 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/IndividualPartSubscriberTckTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/IndividualPartSubscriberTckTest.java @@ -18,7 +18,6 @@ import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.reactivestreams.tck.SubscriberWhiteboxVerification; @@ -45,7 +44,7 @@ public Subscriber createSubscriber(WhiteboxSubscriberProbe()) .build(); - return transformer.new IndividualPartSubscriber(future, ByteBuffer.wrap(new byte[0])) { + return transformer.new IndividualPartSubscriber(future, ByteBuffer.wrap(new byte[0]), true) { @Override public void onSubscribe(Subscription s) { super.onSubscribe(s); diff --git a/services/s3/pom.xml b/services/s3/pom.xml index 82cf66dbee03..a90cd13e89ae 100644 --- a/services/s3/pom.xml +++ b/services/s3/pom.xml @@ -128,6 +128,11 @@ retries-spi ${awsjavasdk.version} + + software.amazon.awssdk + retries + ${awsjavasdk.version} + commons-io diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/DownloadObjectHelper.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/DownloadObjectHelper.java index 2d6fadc5f505..57110b38e811 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/DownloadObjectHelper.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/DownloadObjectHelper.java @@ -23,6 +23,7 @@ import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.utils.CompletableFutureUtils; import software.amazon.awssdk.utils.Logger; @SdkInternalApi @@ -49,6 +50,7 @@ public CompletableFuture downloadObject( .build()); MultipartDownloaderSubscriber subscriber = subscriber(getObjectRequest); split.publisher().subscribe(subscriber); + CompletableFutureUtils.forwardExceptionTo(subscriber.future(), split.resultFuture()); return split.resultFuture(); } diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java index d369d0caff02..0bb75136c2cd 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java @@ -15,6 +15,8 @@ package software.amazon.awssdk.services.s3.internal.multipart; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import org.reactivestreams.Subscriber; @@ -79,6 +81,8 @@ public class MultipartDownloaderSubscriber implements Subscriber> getObjectFutures = new ArrayList<>(); + public MultipartDownloaderSubscriber(S3AsyncClient s3, GetObjectRequest getObjectRequest) { this(s3, getObjectRequest, 0); } @@ -119,6 +123,7 @@ public void onNext(AsyncResponseTransformer "Sending GetObjectRequest for next part with partNumber=" + nextPartToGet); CompletableFuture getObjectFuture = s3.getObject(actualRequest, asyncResponseTransformer); + getObjectFutures.add(getObjectFuture); getObjectFuture.whenComplete((response, error) -> { if (error != null) { log.debug(() -> "Error encountered during GetObjectRequest with partNumber=" + nextPartToGet); @@ -166,6 +171,7 @@ private void requestMoreIfNeeded(GetObjectResponse response) { @Override public void onError(Throwable t) { + getObjectFutures.forEach(future -> future.cancel(true)); future.completeExceptionally(t); } diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java index 6142db7772f7..d638ca2f4b6a 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java @@ -38,9 +38,8 @@ import software.amazon.awssdk.utils.Validate; /** - * An {@link S3AsyncClient} that automatically converts PUT, COPY requests to their respective multipart call. CRC32 will be - * enabled for the PUT and COPY requests, unless the the checksum is specified or checksum validation is disabled. - * Note: GET is not yet supported. + * An {@link S3AsyncClient} that automatically converts PUT, COPY, and GET requests to their respective multipart call. CRC32 + * will be enabled for the requests, unless the checksum is specified or checksum validation is disabled. * * @see MultipartConfiguration */ diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java new file mode 100644 index 000000000000..350534d8e503 --- /dev/null +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java @@ -0,0 +1,313 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.s3.internal.multipart; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.any; +import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.matching; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static com.github.tomakehurst.wiremock.client.WireMock.verify; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; + +import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; +import com.github.tomakehurst.wiremock.junit5.WireMockTest; +import com.github.tomakehurst.wiremock.stubbing.Scenario; +import java.net.URI; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.awscore.retry.AwsRetryStrategy; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; + +@WireMockTest +public class S3MultipartClientGetObjectWiremockTest { + public static final String ERROR_CODE = "InternalError"; + public static final String ERROR_MESSAGE = "We encountered an internal error. Please try again."; + public static final String ERROR_BODY = "\n" + + "\n" + + " " + ERROR_CODE + "\n" + + " " + ERROR_MESSAGE + "\n" + + ""; + public static final String BUCKET = "Example-Bucket"; + public static final String KEY = "Key"; + + private S3AsyncClient multipartClient; + + @BeforeEach + public void setup(WireMockRuntimeInfo wm) { + multipartClient = S3AsyncClient.builder() + .region(Region.US_EAST_1) + .endpointOverride(URI.create(wm.getHttpBaseUrl())) + .multipartEnabled(true) + .httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(100).connectionAcquisitionTimeout(Duration.ofSeconds(100))) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("key", "secret"))) + .overrideConfiguration( + o -> o.retryStrategy(AwsRetryStrategy.standardRetryStrategy().toBuilder() + .maxAttempts(10) + .circuitBreakerEnabled(false) + .build())) + .build(); + } + + @Test + public void stub_200_only() { + List>> futures = new ArrayList<>(); + + int numRuns = 1000; + for (int i = 0; i < numRuns; i++) { + CompletableFuture> resp = mock200Response(multipartClient, i); + futures.add(resp); + } + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + } + + @Test + public void stub_200s_one503_more200s() { + List>> futures = new ArrayList<>(); + + int numRuns = 1000; + for (int i = 0; i < numRuns; i++) { + CompletableFuture> resp = mock200Response(multipartClient, i); + futures.add(resp); + } + + CompletableFuture> requestWithRetryableError = + mockRetryableErrorThen200Response(multipartClient, 1); + futures.add(requestWithRetryableError); + + for (int i = 0; i < numRuns; i++) { + CompletableFuture> resp = mock200Response(multipartClient, i + 1000); + futures.add(resp); + } + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + } + + @Test + public void stub_503_then_200_multipleTimes() { + List>> futures = new ArrayList<>(); + + int numRuns = 1000; + for (int i = 0; i < numRuns; i++) { + CompletableFuture> resp = mockRetryableErrorThen200Response(multipartClient, i); + futures.add(resp); + } + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + } + + // TODO - assert first request ID not reused on retries + /*@Test + public void stub_503_only(WireMockRuntimeInfo wm) { + stubFor(any(anyUrl()) + .inScenario("errors") + .whenScenarioStateIs(Scenario.STARTED) + .willReturn(aResponse() + .withHeader("x-amz-request-id", String.valueOf(UUID.randomUUID())) + .withStatus(503) + .withBody(ERROR_BODY)) + .willSetStateTo("SecondAttempt")); + + stubFor(any(anyUrl()) + .inScenario("errors") + .whenScenarioStateIs("SecondAttempt") + .willReturn(aResponse().withHeader("x-amz-request-id", String.valueOf(UUID.randomUUID())) + .withStatus(503))); + + multipartClient.getObject(b -> b.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toBytes()).join(); + }*/ + + private CompletableFuture> mock200Response(S3AsyncClient s3Client, int runNumber) { + String runId = runNumber + " sucess"; + + stubFor(any(anyUrl()) + .withHeader("RunNum", matching(runId)) + .inScenario(runId) + .whenScenarioStateIs(Scenario.STARTED) + .willReturn(aResponse().withStatus(200) + .withHeader("x-amz-request-id", String.valueOf(UUID.randomUUID())) + .withBody("Hello World"))); + + return s3Client.getObject(r -> r.bucket(BUCKET).key("key") + .overrideConfiguration(c -> c.putHeader("RunNum", runId)), + AsyncResponseTransformer.toBytes()); + } + + private CompletableFuture> mockRetryableErrorThen200Response(S3AsyncClient s3Client, int runNumber) { + String runId = String.valueOf(runNumber); + + stubFor(any(anyUrl()) + .withHeader("RunNum", matching(runId)) + .inScenario(runId) + .whenScenarioStateIs(Scenario.STARTED) + .willReturn(aResponse() + .withHeader("x-amz-request-id", String.valueOf(UUID.randomUUID())) + .withStatus(503).withBody(ERROR_BODY) + ) + .willSetStateTo("SecondAttempt" + runId)); + + stubFor(any(anyUrl()) + .inScenario(runId) + .withHeader("RunNum", matching(runId)) + .whenScenarioStateIs("SecondAttempt" + runId) + .willReturn(aResponse().withStatus(200) + .withHeader("x-amz-request-id", String.valueOf(UUID.randomUUID())) + .withBody("Hello World"))); + + return s3Client.getObject(r -> r.bucket(BUCKET).key("key") + .overrideConfiguration(c -> c.putHeader("RunNum", runId)), + AsyncResponseTransformer.toBytes()); + } + + @Test + public void multipleParts_all_200() { + int totalParts = 3; + int partSize = 1024; + + byte[] part1Data = new byte[partSize]; + byte[] part2Data = new byte[partSize]; + byte[] part3Data = new byte[partSize]; + new Random().nextBytes(part1Data); + new Random().nextBytes(part2Data); + new Random().nextBytes(part3Data); + + byte[] expectedBody = new byte[totalParts * partSize]; + System.arraycopy(part1Data, 0, expectedBody, 0, partSize); + System.arraycopy(part2Data, 0, expectedBody, partSize, partSize); + System.arraycopy(part3Data, 0, expectedBody, 2 * partSize, partSize); + + stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=1", BUCKET, KEY))) + .willReturn(aResponse() + .withHeader("x-amz-request-id", String.valueOf(UUID.randomUUID())) + .withHeader("x-amz-mp-parts-count", String.valueOf(totalParts)) + .withStatus(200).withBody(part1Data))); + + stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=2", BUCKET, KEY))) + .willReturn(aResponse() + .withStatus(200) + .withHeader("x-amz-mp-parts-count", String.valueOf(totalParts)) + .withHeader("x-amz-request-id", UUID.randomUUID().toString()) + .withBody(part2Data))); + + stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=3", BUCKET, KEY))) + .willReturn(aResponse() + .withStatus(200) + .withHeader("x-amz-mp-parts-count", String.valueOf(totalParts)) + .withHeader("x-amz-request-id", UUID.randomUUID().toString()) + .withBody(part3Data))); + + CompletableFuture> future = + multipartClient.getObject(GetObjectRequest.builder().bucket(BUCKET).key(KEY).build(), + AsyncResponseTransformer.toBytes()); + + ResponseBytes response = future.join(); + byte[] actualBody = response.asByteArray(); + assertArrayEquals(expectedBody, actualBody, "Downloaded body should match expected combined parts"); + + // Verify that all 3 parts were requested only once + verify(1, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=1", BUCKET, KEY)))); + verify(1, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=2", BUCKET, KEY)))); + verify(1, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=3", BUCKET, KEY)))); + } + + @Test + public void multipleParts_503OnFirstPart_then_200s() { + int totalParts = 3; + int partSize = 1024; + + byte[] part1Data = new byte[partSize]; + byte[] part2Data = new byte[partSize]; + byte[] part3Data = new byte[partSize]; + new Random().nextBytes(part1Data); + new Random().nextBytes(part2Data); + new Random().nextBytes(part3Data); + + byte[] expectedBody = new byte[totalParts * partSize]; + System.arraycopy(part1Data, 0, expectedBody, 0, partSize); + System.arraycopy(part2Data, 0, expectedBody, partSize, partSize); + System.arraycopy(part3Data, 0, expectedBody, 2 * partSize, partSize); + + + // Stub Part 1 - 503 on first attempt, 200 on retry + String part1Scenario = "part1-retry"; + stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=1", BUCKET, KEY))) + .inScenario(part1Scenario) + .whenScenarioStateIs(Scenario.STARTED) + .willReturn(aResponse() + .withStatus(503) + .withHeader("x-amz-request-id", UUID.randomUUID().toString()) + .withBody("\n" + + "\n" + + " SlowDown\n" + + " Please reduce your request rate.\n" + + "")) + .willSetStateTo("retry-attempt")); + + stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=1", BUCKET, KEY))) + .inScenario(part1Scenario) + .whenScenarioStateIs("retry-attempt") + .willReturn(aResponse() + .withStatus(200) + .withHeader("x-amz-mp-parts-count", String.valueOf(totalParts)) + .withHeader("x-amz-request-id", UUID.randomUUID().toString()) + .withBody(part1Data))); + + stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=2", BUCKET, KEY))) + .willReturn(aResponse() + .withStatus(200) + .withHeader("x-amz-mp-parts-count", String.valueOf(totalParts)) + .withHeader("x-amz-request-id", UUID.randomUUID().toString()) + .withBody(part2Data))); + + stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=3", BUCKET, KEY))) + .willReturn(aResponse() + .withStatus(200) + .withHeader("x-amz-mp-parts-count", String.valueOf(totalParts)) + .withHeader("x-amz-request-id", UUID.randomUUID().toString()) + .withBody(part3Data))); + + CompletableFuture> future = + multipartClient.getObject(GetObjectRequest.builder().bucket(BUCKET).key(KEY).build(), + AsyncResponseTransformer.toBytes()); + + ResponseBytes response = future.join(); + byte[] actualBody = response.asByteArray(); + assertArrayEquals(expectedBody, actualBody, "Downloaded body should match expected combined parts"); + + // Verify that part 1 was requested twice (initial 503 + retry) + verify(2, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=1", BUCKET, KEY)))); + verify(1, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=2", BUCKET, KEY)))); + verify(1, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=3", BUCKET, KEY)))); + } +} From 4cdded1b690e5acd872092d00076c3f9c5f37bb5 Mon Sep 17 00:00:00 2001 From: hdavidh Date: Thu, 31 Jul 2025 10:36:38 -0700 Subject: [PATCH 2/8] Remove retry logic from SplittingTransformer --- .../internal/async/SplittingTransformer.java | 29 ------------------- services/s3/pom.xml | 3 +- ...ipartDownloaderSubscriberWiremockTest.java | 26 ----------------- 3 files changed, 2 insertions(+), 56 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java index ea94e9caec95..cba90becfdf9 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java @@ -25,10 +25,6 @@ import software.amazon.awssdk.core.SplittingTransformerConfiguration; import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.async.SdkPublisher; -import software.amazon.awssdk.core.exception.SdkException; -import software.amazon.awssdk.core.exception.SdkServiceException; -import software.amazon.awssdk.core.internal.retry.SdkDefaultRetrySetting; -import software.amazon.awssdk.core.retry.RetryUtils; import software.amazon.awssdk.utils.CompletableFutureUtils; import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.Validate; @@ -344,32 +340,7 @@ public void exceptionOccurred(Throwable error) { if (!isFirstPart || onStreamCalled.get()) { publisherToUpstream.error(error); } - - if (!isRetryableError(error)) { - log.trace(() -> "error is non-retryable, forwarding upstream future to result future"); - CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture); - } - } - } - - private boolean isRetryableError(Throwable error) { - if (error instanceof SdkException) { - SdkException ex = (SdkException) error; - return retryOnStatusCodes(ex) - || RetryUtils.isRetryableException(ex) - || RetryUtils.isClockSkewException(ex) - || RetryUtils.isClockSkewException(ex); - - } - return false; - } - - private static boolean retryOnStatusCodes(Throwable ex) { - if (ex instanceof SdkServiceException) { - SdkServiceException failure = (SdkServiceException) ex; - return SdkDefaultRetrySetting.RETRYABLE_STATUS_CODES.contains(failure.statusCode()); } - return false; } /** diff --git a/services/s3/pom.xml b/services/s3/pom.xml index 12b24081fd5d..57bf2245a8db 100644 --- a/services/s3/pom.xml +++ b/services/s3/pom.xml @@ -128,12 +128,13 @@ retries-spi ${awsjavasdk.version} + software.amazon.awssdk retries + test ${awsjavasdk.version} - commons-io commons-io diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberWiremockTest.java index 1c6eb666a9c2..341febda47b1 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberWiremockTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberWiremockTest.java @@ -98,32 +98,6 @@ void happyPath_shouldReceiveAllBodyPartInCorrectOrder(AsyncResponseTransform util.verifyCorrectAmountOfRequestsMade(amountOfPartToTest); } - @ParameterizedTest - @MethodSource("argumentsProvider") - void errorOnFirstRequest_shouldCompleteExceptionally(AsyncResponseTransformerTestSupplier supplier, - int amountOfPartToTest, - int partSize) { - stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=1", testBucket, testKey))).willReturn( - aResponse() - .withStatus(400) - .withBody("400test error message"))); - AsyncResponseTransformer transformer = supplier.transformer(); - AsyncResponseTransformer.SplitResult split = transformer.split( - SplittingTransformerConfiguration.builder() - .bufferSizeInBytes(1024 * 32L) - .build()); - Subscriber> subscriber = new MultipartDownloaderSubscriber( - s3AsyncClient, - GetObjectRequest.builder() - .bucket(testBucket) - .key(testKey) - .build()); - - split.publisher().subscribe(subscriber); - assertThatThrownBy(() -> split.resultFuture().join()) - .hasMessageContaining("test error message"); - } - @ParameterizedTest @MethodSource("argumentsProvider") void errorOnThirdRequest_shouldCompleteExceptionallyOnlyPartsGreaterThanTwo( From 54457df9020890a42456a3cbdb55586705cb8821 Mon Sep 17 00:00:00 2001 From: hdavidh Date: Thu, 31 Jul 2025 12:06:15 -0700 Subject: [PATCH 3/8] Update mock error test --- ...3MultipartClientGetObjectWiremockTest.java | 140 ++++++++++++------ 1 file changed, 96 insertions(+), 44 deletions(-) diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java index 350534d8e503..97a106010152 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java @@ -25,6 +25,10 @@ import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; import static com.github.tomakehurst.wiremock.client.WireMock.verify; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; import com.github.tomakehurst.wiremock.junit5.WireMockTest; @@ -36,6 +40,7 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; @@ -43,6 +48,10 @@ import software.amazon.awssdk.awscore.retry.AwsRetryStrategy; import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.interceptor.Context; +import software.amazon.awssdk.core.interceptor.ExecutionAttributes; +import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; +import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -60,11 +69,14 @@ public class S3MultipartClientGetObjectWiremockTest { + ""; public static final String BUCKET = "Example-Bucket"; public static final String KEY = "Key"; + private static final int MAX_ATTEMPTS = 7; + private static final CapturingInterceptor capturingInterceptor = new CapturingInterceptor(); private S3AsyncClient multipartClient; @BeforeEach public void setup(WireMockRuntimeInfo wm) { + capturingInterceptor.clear(); multipartClient = S3AsyncClient.builder() .region(Region.US_EAST_1) .endpointOverride(URI.create(wm.getHttpBaseUrl())) @@ -73,9 +85,10 @@ public void setup(WireMockRuntimeInfo wm) { .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("key", "secret"))) .overrideConfiguration( o -> o.retryStrategy(AwsRetryStrategy.standardRetryStrategy().toBuilder() - .maxAttempts(10) + .maxAttempts(MAX_ATTEMPTS) .circuitBreakerEnabled(false) - .build())) + .build()) + .addExecutionInterceptor(capturingInterceptor)) .build(); } @@ -127,14 +140,16 @@ public void stub_503_then_200_multipleTimes() { CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); } - // TODO - assert first request ID not reused on retries - /*@Test + @Test public void stub_503_only(WireMockRuntimeInfo wm) { + String firstRequestId = UUID.randomUUID().toString(); + String secondRequestId = UUID.randomUUID().toString(); + stubFor(any(anyUrl()) .inScenario("errors") .whenScenarioStateIs(Scenario.STARTED) .willReturn(aResponse() - .withHeader("x-amz-request-id", String.valueOf(UUID.randomUUID())) + .withHeader("x-amz-request-id", firstRequestId) .withStatus(503) .withBody(ERROR_BODY)) .willSetStateTo("SecondAttempt")); @@ -142,52 +157,30 @@ public void stub_503_only(WireMockRuntimeInfo wm) { stubFor(any(anyUrl()) .inScenario("errors") .whenScenarioStateIs("SecondAttempt") - .willReturn(aResponse().withHeader("x-amz-request-id", String.valueOf(UUID.randomUUID())) - .withStatus(503))); + .willReturn(aResponse() + .withHeader("x-amz-request-id", secondRequestId) + .withStatus(503))); - multipartClient.getObject(b -> b.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toBytes()).join(); - }*/ + assertThrows(CompletionException.class, () -> { + multipartClient.getObject(b -> b.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toBytes()).join(); + }); - private CompletableFuture> mock200Response(S3AsyncClient s3Client, int runNumber) { - String runId = runNumber + " sucess"; + List responses = capturingInterceptor.getResponses(); + assertEquals(MAX_ATTEMPTS, responses.size(), () -> String.format("Expected exactly %s responses", MAX_ATTEMPTS)); - stubFor(any(anyUrl()) - .withHeader("RunNum", matching(runId)) - .inScenario(runId) - .whenScenarioStateIs(Scenario.STARTED) - .willReturn(aResponse().withStatus(200) - .withHeader("x-amz-request-id", String.valueOf(UUID.randomUUID())) - .withBody("Hello World"))); + String actualFirstRequestId = responses.get(0).firstMatchingHeader("x-amz-request-id").orElse(null); + String actualSecondRequestId = responses.get(1).firstMatchingHeader("x-amz-request-id").orElse(null); - return s3Client.getObject(r -> r.bucket(BUCKET).key("key") - .overrideConfiguration(c -> c.putHeader("RunNum", runId)), - AsyncResponseTransformer.toBytes()); - } + assertNotNull(actualFirstRequestId, "First response should have x-amz-request-id header"); + assertNotNull(actualSecondRequestId, "Second response should have x-amz-request-id header"); - private CompletableFuture> mockRetryableErrorThen200Response(S3AsyncClient s3Client, int runNumber) { - String runId = String.valueOf(runNumber); + assertNotEquals(actualFirstRequestId, actualSecondRequestId, "First request ID should not be reused on retry"); - stubFor(any(anyUrl()) - .withHeader("RunNum", matching(runId)) - .inScenario(runId) - .whenScenarioStateIs(Scenario.STARTED) - .willReturn(aResponse() - .withHeader("x-amz-request-id", String.valueOf(UUID.randomUUID())) - .withStatus(503).withBody(ERROR_BODY) - ) - .willSetStateTo("SecondAttempt" + runId)); - - stubFor(any(anyUrl()) - .inScenario(runId) - .withHeader("RunNum", matching(runId)) - .whenScenarioStateIs("SecondAttempt" + runId) - .willReturn(aResponse().withStatus(200) - .withHeader("x-amz-request-id", String.valueOf(UUID.randomUUID())) - .withBody("Hello World"))); + assertEquals(firstRequestId, actualFirstRequestId, "First response should have expected request ID"); + assertEquals(secondRequestId, actualSecondRequestId, "Second response should have expected request ID"); - return s3Client.getObject(r -> r.bucket(BUCKET).key("key") - .overrideConfiguration(c -> c.putHeader("RunNum", runId)), - AsyncResponseTransformer.toBytes()); + assertEquals(503, responses.get(0).statusCode()); + assertEquals(503, responses.get(1).statusCode()); } @Test @@ -310,4 +303,63 @@ public void multipleParts_503OnFirstPart_then_200s() { verify(1, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=2", BUCKET, KEY)))); verify(1, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=3", BUCKET, KEY)))); } + + private CompletableFuture> mock200Response(S3AsyncClient s3Client, int runNumber) { + String runId = runNumber + " sucess"; + + stubFor(any(anyUrl()) + .withHeader("RunNum", matching(runId)) + .inScenario(runId) + .whenScenarioStateIs(Scenario.STARTED) + .willReturn(aResponse().withStatus(200) + .withHeader("x-amz-request-id", String.valueOf(UUID.randomUUID())) + .withBody("Hello World"))); + + return s3Client.getObject(r -> r.bucket(BUCKET).key("key") + .overrideConfiguration(c -> c.putHeader("RunNum", runId)), + AsyncResponseTransformer.toBytes()); + } + + private CompletableFuture> mockRetryableErrorThen200Response(S3AsyncClient s3Client, int runNumber) { + String runId = String.valueOf(runNumber); + + stubFor(any(anyUrl()) + .withHeader("RunNum", matching(runId)) + .inScenario(runId) + .whenScenarioStateIs(Scenario.STARTED) + .willReturn(aResponse() + .withHeader("x-amz-request-id", String.valueOf(UUID.randomUUID())) + .withStatus(503).withBody(ERROR_BODY) + ) + .willSetStateTo("SecondAttempt" + runId)); + + stubFor(any(anyUrl()) + .inScenario(runId) + .withHeader("RunNum", matching(runId)) + .whenScenarioStateIs("SecondAttempt" + runId) + .willReturn(aResponse().withStatus(200) + .withHeader("x-amz-request-id", String.valueOf(UUID.randomUUID())) + .withBody("Hello World"))); + + return s3Client.getObject(r -> r.bucket(BUCKET).key("key") + .overrideConfiguration(c -> c.putHeader("RunNum", runId)), + AsyncResponseTransformer.toBytes()); + } + + static class CapturingInterceptor implements ExecutionInterceptor { + private final List responses = new ArrayList<>(); + + @Override + public void afterTransmission(Context.AfterTransmission context, ExecutionAttributes executionAttributes) { + responses.add(context.httpResponse()); + } + + public List getResponses() { + return new ArrayList<>(responses); + } + + public void clear() { + responses.clear(); + } + } } From 7f8803f46d85eed94093a92ba0c17e74eb932b7b Mon Sep 17 00:00:00 2001 From: hdavidh Date: Fri, 1 Aug 2025 17:34:18 -0700 Subject: [PATCH 4/8] Address comments pt 1 --- .../next-release/bugfix-AmazonS3-263fed5.json | 2 +- .../internal/async/SplittingTransformer.java | 61 +++++++++---------- .../IndividualPartSubscriberTckTest.java | 2 +- .../MultipartDownloaderSubscriber.java | 12 ++-- ...3MultipartClientGetObjectWiremockTest.java | 4 +- 5 files changed, 42 insertions(+), 39 deletions(-) diff --git a/.changes/next-release/bugfix-AmazonS3-263fed5.json b/.changes/next-release/bugfix-AmazonS3-263fed5.json index 7ac89a1b1e95..6cd1226c9d1d 100644 --- a/.changes/next-release/bugfix-AmazonS3-263fed5.json +++ b/.changes/next-release/bugfix-AmazonS3-263fed5.json @@ -2,5 +2,5 @@ "type": "bugfix", "category": "Amazon S3", "contributor": "", - "description": "Fix MultipartS3AsyncClient GetObject retryable errors" + "description": "Fix bug in MultipartS3AsyncClient GET where retryable errors may not retried, and if retried, successful responses are incorrectly processed with the initial error." } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java index cba90becfdf9..0a0eaadceb2c 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java @@ -18,6 +18,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -57,7 +58,7 @@ public class SplittingTransformer implements SdkPublisher implements SdkPublisher upstreamFuture; /** - * Tracks whether an {@code IndividualTransformer} has been created for the first part yet. Errors will only be retried for - * the first part. + * Tracks the part number. Errors will only be retried for the first part. */ - private final AtomicBoolean isFirstIndividualTransformer = new AtomicBoolean(true); - - /** - * Tracks whether an {@code IndividualPartSubscriber} has been created for the first part yet. Errors will only be retried for - * the first part. - */ - private final AtomicBoolean isFirstIndividualSubscriber = new AtomicBoolean(true); + private final AtomicInteger partNumber = new AtomicInteger(0); private SplittingTransformer(AsyncResponseTransformer upstreamResponseTransformer, Long maximumBufferSizeInBytes, @@ -206,7 +200,7 @@ private boolean doEmit() { } if (outstandingDemand.get() > 0) { demand = outstandingDemand.decrementAndGet(); - downstreamSubscriber.onNext(new IndividualTransformer(isFirstIndividualTransformer.compareAndSet(true, false))); + downstreamSubscriber.onNext(new IndividualTransformer(partNumber.incrementAndGet())); } } return false; @@ -224,7 +218,7 @@ private void handleSubscriptionCancel() { log.trace(() -> "downstreamSubscriber already null, skipping downstreamSubscriber.onComplete()"); return; } - if (!onStreamCalled.get()) { + if (!onStreamCalled) { // we never subscribe publisherToUpstream to the upstream, it would not complete downstreamSubscriber = null; return; @@ -268,19 +262,19 @@ private void handleFutureCancel(Throwable e) { * body publisher. */ private class IndividualTransformer implements AsyncResponseTransformer { - private final boolean isFirstPart; + private final int partNumber; private ResponseT response; private CompletableFuture individualFuture; - IndividualTransformer(boolean isFirstPart) { - this.isFirstPart = isFirstPart; + IndividualTransformer(int partNumber) { + this.partNumber = partNumber; } @Override public CompletableFuture prepare() { this.individualFuture = new CompletableFuture<>(); - if (isFirstPart) { + if (partNumber == 1) { if (isCancelled.get()) { return individualFuture; } @@ -299,7 +293,7 @@ public CompletableFuture prepare() { @Override public void onResponse(ResponseT response) { - if (isFirstPart) { + if (partNumber == 1) { log.trace(() -> "calling onResponse on the upstream transformer"); upstreamResponseTransformer.onResponse(response); } @@ -312,8 +306,8 @@ public void onStream(SdkPublisher publisher) { return; } synchronized (cancelLock) { - if (isFirstPart) { - onStreamCalled.set(true); + if (partNumber == 1) { + onStreamCalled = true; log.trace(() -> "calling onStream on the upstream transformer"); upstreamResponseTransformer.onStream(upstreamSubscriber -> publisherToUpstream.subscribe( DelegatingBufferingSubscriber.builder() @@ -324,22 +318,25 @@ public void onStream(SdkPublisher publisher) { } } - if (!resultFuture.isDone()) { - CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture); - } - - publisher.subscribe(new IndividualPartSubscriber<>(this.individualFuture, response, - isFirstIndividualSubscriber.compareAndSet(true, false))); + CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture); + publisher.subscribe(new IndividualPartSubscriber<>(this.individualFuture, response, partNumber)); } @Override public void exceptionOccurred(Throwable error) { log.trace(() -> "calling exceptionOccurred on the upstream transformer"); - upstreamResponseTransformer.exceptionOccurred(error); - if (!isFirstPart || onStreamCalled.get()) { - publisherToUpstream.error(error); + if (partNumber == 1) { + upstreamResponseTransformer.exceptionOccurred(error); } + + // TODO - add comments explaining + synchronized (cancelLock) { + if (partNumber > 1 || onStreamCalled) { + publisherToUpstream.error(error); + } + } + } } @@ -350,13 +347,13 @@ class IndividualPartSubscriber implements Subscriber { private final CompletableFuture future; private final T response; - private final boolean isFirstPart; + private final int partNumber; private Subscription subscription; - IndividualPartSubscriber(CompletableFuture future, T response, boolean isFirstPart) { + IndividualPartSubscriber(CompletableFuture future, T response, int partNumber) { this.future = future; this.response = response; - this.isFirstPart = isFirstPart; + this.partNumber = partNumber; } @Override @@ -396,7 +393,7 @@ public void onComplete() { } private void handleError(Throwable t) { - if (!isFirstPart) { + if (partNumber > 1) { publisherToUpstream.error(t); } future.completeExceptionally(t); diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/IndividualPartSubscriberTckTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/IndividualPartSubscriberTckTest.java index 7a59134ea9bd..b5872e6e35fd 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/IndividualPartSubscriberTckTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/IndividualPartSubscriberTckTest.java @@ -44,7 +44,7 @@ public Subscriber createSubscriber(WhiteboxSubscriberProbe()) .build(); - return transformer.new IndividualPartSubscriber(future, ByteBuffer.wrap(new byte[0]), true) { + return transformer.new IndividualPartSubscriber(future, ByteBuffer.wrap(new byte[0]), 1) { @Override public void onSubscribe(Subscription s) { super.onSubscribe(s); diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java index 0bb75136c2cd..a9a2925b77d6 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java @@ -15,9 +15,9 @@ package software.amazon.awssdk.services.s3.internal.multipart; -import java.util.ArrayList; -import java.util.List; +import java.util.Queue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -81,7 +81,7 @@ public class MultipartDownloaderSubscriber implements Subscriber> getObjectFutures = new ArrayList<>(); + private final Queue> getObjectFutures = new ConcurrentLinkedQueue<>(); public MultipartDownloaderSubscriber(S3AsyncClient s3, GetObjectRequest getObjectRequest) { this(s3, getObjectRequest, 0); @@ -125,6 +125,7 @@ public void onNext(AsyncResponseTransformer getObjectFuture = s3.getObject(actualRequest, asyncResponseTransformer); getObjectFutures.add(getObjectFuture); getObjectFuture.whenComplete((response, error) -> { + getObjectFutures.remove(getObjectFuture); if (error != null) { log.debug(() -> "Error encountered during GetObjectRequest with partNumber=" + nextPartToGet); onError(error); @@ -171,7 +172,10 @@ private void requestMoreIfNeeded(GetObjectResponse response) { @Override public void onError(Throwable t) { - getObjectFutures.forEach(future -> future.cancel(true)); + CompletableFuture partFuture; + while ((partFuture = getObjectFutures.poll()) != null) { + partFuture.cancel(true); + } future.completeExceptionally(t); } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java index 97a106010152..aa0f7b3ce2af 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java @@ -92,6 +92,8 @@ public void setup(WireMockRuntimeInfo wm) { .build(); } + // TODO - 1) update test names 2) add test for I/O error + @Test public void stub_200_only() { List>> futures = new ArrayList<>(); @@ -305,7 +307,7 @@ public void multipleParts_503OnFirstPart_then_200s() { } private CompletableFuture> mock200Response(S3AsyncClient s3Client, int runNumber) { - String runId = runNumber + " sucess"; + String runId = runNumber + " success"; stubFor(any(anyUrl()) .withHeader("RunNum", matching(runId)) From 22b0a0b8ab74304114296f1eb20dea9f22c5884a Mon Sep 17 00:00:00 2001 From: hdavidh Date: Fri, 1 Aug 2025 22:46:33 -0700 Subject: [PATCH 5/8] Address comments pt2 --- .../internal/async/SplittingTransformer.java | 8 +-- ...3MultipartClientGetObjectWiremockTest.java | 55 ++++++++++++++++--- 2 files changed, 50 insertions(+), 13 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java index 0a0eaadceb2c..ef2c59b788ad 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java @@ -324,19 +324,19 @@ public void onStream(SdkPublisher publisher) { @Override public void exceptionOccurred(Throwable error) { - log.trace(() -> "calling exceptionOccurred on the upstream transformer"); - if (partNumber == 1) { + log.trace(() -> "calling exceptionOccurred on the upstream transformer"); upstreamResponseTransformer.exceptionOccurred(error); } - // TODO - add comments explaining + // Invoking publisherToUpstream.error() essentially fails the request immediately. We should only call this if + // 1) The part number is greater than 1, since we want to retry errors on the first part OR 2) onStream() has + // already been invoked and data has started to be written synchronized (cancelLock) { if (partNumber > 1 || onStreamCalled) { publisherToUpstream.error(error); } } - } } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java index aa0f7b3ce2af..ea6311fd4928 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java @@ -30,10 +30,12 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import com.github.tomakehurst.wiremock.http.Fault; import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; import com.github.tomakehurst.wiremock.junit5.WireMockTest; import com.github.tomakehurst.wiremock.stubbing.Scenario; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -92,10 +94,8 @@ public void setup(WireMockRuntimeInfo wm) { .build(); } - // TODO - 1) update test names 2) add test for I/O error - @Test - public void stub_200_only() { + public void getObject_concurrentCallsReturn200_shouldSucceed() { List>> futures = new ArrayList<>(); int numRuns = 1000; @@ -108,7 +108,7 @@ public void stub_200_only() { } @Test - public void stub_200s_one503_more200s() { + public void getObject_single500WithinMany200s_shouldRetrySuccessfully() { List>> futures = new ArrayList<>(); int numRuns = 1000; @@ -130,7 +130,7 @@ public void stub_200s_one503_more200s() { } @Test - public void stub_503_then_200_multipleTimes() { + public void getObject_concurrent503s_shouldRetrySuccessfully() { List>> futures = new ArrayList<>(); int numRuns = 1000; @@ -143,7 +143,7 @@ public void stub_503_then_200_multipleTimes() { } @Test - public void stub_503_only(WireMockRuntimeInfo wm) { + public void getObject_503Response_shouldNotReuseInitialRequestId() { String firstRequestId = UUID.randomUUID().toString(); String secondRequestId = UUID.randomUUID().toString(); @@ -186,7 +186,7 @@ public void stub_503_only(WireMockRuntimeInfo wm) { } @Test - public void multipleParts_all_200() { + public void multipartDownload_200Response_shouldSucceed() { int totalParts = 3; int partSize = 1024; @@ -237,7 +237,7 @@ public void multipleParts_all_200() { } @Test - public void multipleParts_503OnFirstPart_then_200s() { + public void multipartDownload_503OnFirstPart_shouldRetrySuccessfully() { int totalParts = 3; int partSize = 1024; @@ -306,6 +306,43 @@ public void multipleParts_503OnFirstPart_then_200s() { verify(1, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=3", BUCKET, KEY)))); } + @Test + public void getObject_iOError_shouldRetrySuccessfully() { + String requestId = UUID.randomUUID().toString(); + + stubFor(any(anyUrl()) + .inScenario("io-error") + .whenScenarioStateIs(Scenario.STARTED) + .willReturn(aResponse() + .withFault(Fault.CONNECTION_RESET_BY_PEER)) + .willSetStateTo("retry")); + + stubFor(any(anyUrl()) + .inScenario("io-error") + .whenScenarioStateIs("retry") + .willReturn(aResponse() + .withStatus(200) + .withHeader("x-amz-request-id", requestId) + .withBody("Hello World"))); + + ResponseBytes response = multipartClient.getObject(GetObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .build(), + AsyncResponseTransformer.toBytes()).join(); + + assertArrayEquals("Hello World".getBytes(StandardCharsets.UTF_8), response.asByteArray()); + + verify(2, getRequestedFor(urlEqualTo("/" + BUCKET + "/" + KEY + "?partNumber=1"))); + + List responses = capturingInterceptor.getResponses(); + String finalRequestId = responses.get(responses.size() - 1) + .firstMatchingHeader("x-amz-request-id") + .orElse(null); + + assertEquals(requestId, finalRequestId); + } + private CompletableFuture> mock200Response(S3AsyncClient s3Client, int runNumber) { String runId = runNumber + " success"; @@ -331,7 +368,7 @@ private CompletableFuture> mockRetryableErrorTh .whenScenarioStateIs(Scenario.STARTED) .willReturn(aResponse() .withHeader("x-amz-request-id", String.valueOf(UUID.randomUUID())) - .withStatus(503).withBody(ERROR_BODY) + .withStatus(500).withBody(ERROR_BODY) ) .willSetStateTo("SecondAttempt" + runId)); From 05ea027a584c3efbe9c98e087d32b4a26ed3bf1b Mon Sep 17 00:00:00 2001 From: hdavidh Date: Mon, 4 Aug 2025 19:53:57 -0700 Subject: [PATCH 6/8] Add test case and update changelog --- .../next-release/bugfix-AmazonS3-263fed5.json | 2 +- ...3MultipartClientGetObjectWiremockTest.java | 92 ++++++++++++++++++- 2 files changed, 90 insertions(+), 4 deletions(-) diff --git a/.changes/next-release/bugfix-AmazonS3-263fed5.json b/.changes/next-release/bugfix-AmazonS3-263fed5.json index 6cd1226c9d1d..22e8f47a7271 100644 --- a/.changes/next-release/bugfix-AmazonS3-263fed5.json +++ b/.changes/next-release/bugfix-AmazonS3-263fed5.json @@ -2,5 +2,5 @@ "type": "bugfix", "category": "Amazon S3", "contributor": "", - "description": "Fix bug in MultipartS3AsyncClient GET where retryable errors may not retried, and if retried, successful responses are incorrectly processed with the initial error." + "description": "Fix bug in MultipartS3AsyncClient GET where retryable errors may not be retried, and if retried, successful responses are incorrectly processed with the initial error." } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java index ea6311fd4928..e6662d5b31dd 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java @@ -29,12 +29,14 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.github.tomakehurst.wiremock.http.Fault; import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; import com.github.tomakehurst.wiremock.junit5.WireMockTest; import com.github.tomakehurst.wiremock.stubbing.Scenario; import java.net.URI; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; @@ -43,13 +45,17 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.awscore.retry.AwsRetryStrategy; import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.interceptor.Context; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; @@ -163,9 +169,8 @@ public void getObject_503Response_shouldNotReuseInitialRequestId() { .withHeader("x-amz-request-id", secondRequestId) .withStatus(503))); - assertThrows(CompletionException.class, () -> { - multipartClient.getObject(b -> b.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toBytes()).join(); - }); + assertThrows(CompletionException.class, () -> + multipartClient.getObject(b -> b.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toBytes()).join()); List responses = capturingInterceptor.getResponses(); assertEquals(MAX_ATTEMPTS, responses.size(), () -> String.format("Expected exactly %s responses", MAX_ATTEMPTS)); @@ -343,6 +348,87 @@ public void getObject_iOError_shouldRetrySuccessfully() { assertEquals(requestId, finalRequestId); } + @Test + public void multipartDownload_errorDuringFirstPartAfterOnStream_shouldFailAndNotRetry() { + stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=1", BUCKET, KEY))) + .willReturn(aResponse() + .withHeader("x-amz-mp-parts-count", String.valueOf(2)) + .withStatus(200) + .withBody("Hello "))); + + stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=2", BUCKET, KEY))) + .willReturn(aResponse() + .withStatus(200) + .withHeader("x-amz-mp-parts-count", "2") + .withBody("World"))); + + StreamingErrorTransformer failingTransformer = new StreamingErrorTransformer(); + assertThrows(CompletionException.class, () -> + multipartClient.getObject(b -> b.bucket(BUCKET).key(KEY), failingTransformer).join()); + + assertTrue(failingTransformer.onStreamCalled.get()); + // Verify that the first part was requested only once and not retried + verify(1, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=1", BUCKET, KEY)))); + } + + /** + * Custom AsyncResponseTransformer that simulates an error occurring after onStream() has been called + */ + private static final class StreamingErrorTransformer + implements AsyncResponseTransformer> { + + private final CompletableFuture> future = new CompletableFuture<>(); + private final AtomicBoolean errorThrown = new AtomicBoolean(); + private final AtomicBoolean onStreamCalled = new AtomicBoolean(); + + @Override + public CompletableFuture> prepare() { + return future; + } + + @Override + public void onResponse(GetObjectResponse response) { + // + } + + @Override + public void onStream(SdkPublisher publisher) { + onStreamCalled.set(true); + publisher.subscribe(new Subscriber() { + private Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + this.subscription = s; + s.request(1); + } + + @Override + public void onNext(ByteBuffer byteBuffer) { + if (errorThrown.compareAndSet(false, true)) { + future.completeExceptionally(new RuntimeException()); + subscription.cancel(); + } + } + + @Override + public void onError(Throwable t) { + future.completeExceptionally(t); + } + + @Override + public void onComplete() { + // + } + }); + } + + @Override + public void exceptionOccurred(Throwable throwable) { + future.completeExceptionally(throwable); + } + } + private CompletableFuture> mock200Response(S3AsyncClient s3Client, int runNumber) { String runId = runNumber + " success"; From 14cccdab45692bed8a64d58bee74a58b50c93d20 Mon Sep 17 00:00:00 2001 From: hdavidh Date: Tue, 5 Aug 2025 14:50:30 -0700 Subject: [PATCH 7/8] Address comments --- .../next-release/bugfix-AmazonS3-263fed5.json | 2 +- .../internal/async/SplittingTransformer.java | 13 +- .../IndividualPartSubscriberTckTest.java | 2 +- ...3MultipartClientGetObjectWiremockTest.java | 356 ++++++++++-------- 4 files changed, 210 insertions(+), 163 deletions(-) diff --git a/.changes/next-release/bugfix-AmazonS3-263fed5.json b/.changes/next-release/bugfix-AmazonS3-263fed5.json index 22e8f47a7271..e3e053c54ecc 100644 --- a/.changes/next-release/bugfix-AmazonS3-263fed5.json +++ b/.changes/next-release/bugfix-AmazonS3-263fed5.json @@ -2,5 +2,5 @@ "type": "bugfix", "category": "Amazon S3", "contributor": "", - "description": "Fix bug in MultipartS3AsyncClient GET where retryable errors may not be retried, and if retried, successful responses are incorrectly processed with the initial error." + "description": "Fix a bug in the Java based multipart client where retryable errors from getObject may not be retried correctly." } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java index ef2c59b788ad..753b21e43aab 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java @@ -307,6 +307,7 @@ public void onStream(SdkPublisher publisher) { } synchronized (cancelLock) { if (partNumber == 1) { + CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture); onStreamCalled = true; log.trace(() -> "calling onStream on the upstream transformer"); upstreamResponseTransformer.onStream(upstreamSubscriber -> publisherToUpstream.subscribe( @@ -317,9 +318,7 @@ public void onStream(SdkPublisher publisher) { ); } } - - CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture); - publisher.subscribe(new IndividualPartSubscriber<>(this.individualFuture, response, partNumber)); + publisher.subscribe(new IndividualPartSubscriber<>(this.individualFuture, response)); } @Override @@ -347,13 +346,11 @@ class IndividualPartSubscriber implements Subscriber { private final CompletableFuture future; private final T response; - private final int partNumber; private Subscription subscription; - IndividualPartSubscriber(CompletableFuture future, T response, int partNumber) { + IndividualPartSubscriber(CompletableFuture future, T response) { this.future = future; this.response = response; - this.partNumber = partNumber; } @Override @@ -393,9 +390,7 @@ public void onComplete() { } private void handleError(Throwable t) { - if (partNumber > 1) { - publisherToUpstream.error(t); - } + publisherToUpstream.error(t); future.completeExceptionally(t); } } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/IndividualPartSubscriberTckTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/IndividualPartSubscriberTckTest.java index b5872e6e35fd..f5bcba85d70a 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/IndividualPartSubscriberTckTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/IndividualPartSubscriberTckTest.java @@ -44,7 +44,7 @@ public Subscriber createSubscriber(WhiteboxSubscriberProbe()) .build(); - return transformer.new IndividualPartSubscriber(future, ByteBuffer.wrap(new byte[0]), 1) { + return transformer.new IndividualPartSubscriber(future, ByteBuffer.wrap(new byte[0])) { @Override public void onSubscribe(Subscription s) { super.onSubscribe(s); diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java index e6662d5b31dd..e45646e32061 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java @@ -24,11 +24,11 @@ import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; import static com.github.tomakehurst.wiremock.client.WireMock.verify; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import com.github.tomakehurst.wiremock.http.Fault; @@ -45,9 +45,12 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; @@ -65,23 +68,35 @@ import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Exception; @WireMockTest +@Timeout(value = 30, unit = TimeUnit.SECONDS) public class S3MultipartClientGetObjectWiremockTest { - public static final String ERROR_CODE = "InternalError"; - public static final String ERROR_MESSAGE = "We encountered an internal error. Please try again."; - public static final String ERROR_BODY = "\n" - + "\n" - + " " + ERROR_CODE + "\n" - + " " + ERROR_MESSAGE + "\n" - + ""; + private static final CapturingInterceptor capturingInterceptor = new CapturingInterceptor(); public static final String BUCKET = "Example-Bucket"; public static final String KEY = "Key"; private static final int MAX_ATTEMPTS = 7; - private static final CapturingInterceptor capturingInterceptor = new CapturingInterceptor(); - + private static final int TOTAL_PARTS = 3; + private static final int PART_SIZE = 1024; + private static final byte[] PART_1_DATA = new byte[PART_SIZE]; + private static final byte[] PART_2_DATA = new byte[PART_SIZE]; + private static final byte[] PART_3_DATA = new byte[PART_SIZE]; + private static byte[] expectedBody; private S3AsyncClient multipartClient; + @BeforeAll + public static void init() { + new Random().nextBytes(PART_1_DATA); + new Random().nextBytes(PART_2_DATA); + new Random().nextBytes(PART_3_DATA); + + expectedBody = new byte[TOTAL_PARTS * PART_SIZE]; + System.arraycopy(PART_1_DATA, 0, expectedBody, 0, PART_SIZE); + System.arraycopy(PART_2_DATA, 0, expectedBody, PART_SIZE, PART_SIZE); + System.arraycopy(PART_3_DATA, 0, expectedBody, 2 * PART_SIZE, PART_SIZE); + } + @BeforeEach public void setup(WireMockRuntimeInfo wm) { capturingInterceptor.clear(); @@ -96,7 +111,7 @@ public void setup(WireMockRuntimeInfo wm) { .maxAttempts(MAX_ATTEMPTS) .circuitBreakerEnabled(false) .build()) - .addExecutionInterceptor(capturingInterceptor)) + .addExecutionInterceptor(capturingInterceptor)) .build(); } @@ -149,7 +164,7 @@ public void getObject_concurrent503s_shouldRetrySuccessfully() { } @Test - public void getObject_503Response_shouldNotReuseInitialRequestId() { + public void getObject_5xxErrorResponses_shouldNotReuseInitialRequestId() { String firstRequestId = UUID.randomUUID().toString(); String secondRequestId = UUID.randomUUID().toString(); @@ -159,7 +174,7 @@ public void getObject_503Response_shouldNotReuseInitialRequestId() { .willReturn(aResponse() .withHeader("x-amz-request-id", firstRequestId) .withStatus(503) - .withBody(ERROR_BODY)) + .withBody(internalErrorBody())) .willSetStateTo("SecondAttempt")); stubFor(any(anyUrl()) @@ -167,10 +182,13 @@ public void getObject_503Response_shouldNotReuseInitialRequestId() { .whenScenarioStateIs("SecondAttempt") .willReturn(aResponse() .withHeader("x-amz-request-id", secondRequestId) - .withStatus(503))); + .withStatus(500))); + + assertThatThrownBy(() -> multipartClient.getObject(b -> b.bucket(BUCKET).key(KEY), + AsyncResponseTransformer.toBytes()).join()) + .isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(S3Exception.class); - assertThrows(CompletionException.class, () -> - multipartClient.getObject(b -> b.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toBytes()).join()); List responses = capturingInterceptor.getResponses(); assertEquals(MAX_ATTEMPTS, responses.size(), () -> String.format("Expected exactly %s responses", MAX_ATTEMPTS)); @@ -178,54 +196,23 @@ public void getObject_503Response_shouldNotReuseInitialRequestId() { String actualFirstRequestId = responses.get(0).firstMatchingHeader("x-amz-request-id").orElse(null); String actualSecondRequestId = responses.get(1).firstMatchingHeader("x-amz-request-id").orElse(null); - assertNotNull(actualFirstRequestId, "First response should have x-amz-request-id header"); - assertNotNull(actualSecondRequestId, "Second response should have x-amz-request-id header"); + assertNotNull(actualFirstRequestId); + assertNotNull(actualSecondRequestId); - assertNotEquals(actualFirstRequestId, actualSecondRequestId, "First request ID should not be reused on retry"); + assertNotEquals(actualFirstRequestId, actualSecondRequestId); - assertEquals(firstRequestId, actualFirstRequestId, "First response should have expected request ID"); - assertEquals(secondRequestId, actualSecondRequestId, "Second response should have expected request ID"); + assertEquals(firstRequestId, actualFirstRequestId); + assertEquals(secondRequestId, actualSecondRequestId); assertEquals(503, responses.get(0).statusCode()); - assertEquals(503, responses.get(1).statusCode()); + assertEquals(500, responses.get(1).statusCode()); } @Test public void multipartDownload_200Response_shouldSucceed() { - int totalParts = 3; - int partSize = 1024; - - byte[] part1Data = new byte[partSize]; - byte[] part2Data = new byte[partSize]; - byte[] part3Data = new byte[partSize]; - new Random().nextBytes(part1Data); - new Random().nextBytes(part2Data); - new Random().nextBytes(part3Data); - - byte[] expectedBody = new byte[totalParts * partSize]; - System.arraycopy(part1Data, 0, expectedBody, 0, partSize); - System.arraycopy(part2Data, 0, expectedBody, partSize, partSize); - System.arraycopy(part3Data, 0, expectedBody, 2 * partSize, partSize); - - stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=1", BUCKET, KEY))) - .willReturn(aResponse() - .withHeader("x-amz-request-id", String.valueOf(UUID.randomUUID())) - .withHeader("x-amz-mp-parts-count", String.valueOf(totalParts)) - .withStatus(200).withBody(part1Data))); - - stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=2", BUCKET, KEY))) - .willReturn(aResponse() - .withStatus(200) - .withHeader("x-amz-mp-parts-count", String.valueOf(totalParts)) - .withHeader("x-amz-request-id", UUID.randomUUID().toString()) - .withBody(part2Data))); - - stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=3", BUCKET, KEY))) - .willReturn(aResponse() - .withStatus(200) - .withHeader("x-amz-mp-parts-count", String.valueOf(totalParts)) - .withHeader("x-amz-request-id", UUID.randomUUID().toString()) - .withBody(part3Data))); + stub200SuccessPart1(); + stub200SuccessPart2(); + stub200SuccessPart3(); CompletableFuture> future = multipartClient.getObject(GetObjectRequest.builder().bucket(BUCKET).key(KEY).build(), @@ -243,22 +230,6 @@ public void multipartDownload_200Response_shouldSucceed() { @Test public void multipartDownload_503OnFirstPart_shouldRetrySuccessfully() { - int totalParts = 3; - int partSize = 1024; - - byte[] part1Data = new byte[partSize]; - byte[] part2Data = new byte[partSize]; - byte[] part3Data = new byte[partSize]; - new Random().nextBytes(part1Data); - new Random().nextBytes(part2Data); - new Random().nextBytes(part3Data); - - byte[] expectedBody = new byte[totalParts * partSize]; - System.arraycopy(part1Data, 0, expectedBody, 0, partSize); - System.arraycopy(part2Data, 0, expectedBody, partSize, partSize); - System.arraycopy(part3Data, 0, expectedBody, 2 * partSize, partSize); - - // Stub Part 1 - 503 on first attempt, 200 on retry String part1Scenario = "part1-retry"; stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=1", BUCKET, KEY))) @@ -279,23 +250,12 @@ public void multipartDownload_503OnFirstPart_shouldRetrySuccessfully() { .whenScenarioStateIs("retry-attempt") .willReturn(aResponse() .withStatus(200) - .withHeader("x-amz-mp-parts-count", String.valueOf(totalParts)) + .withHeader("x-amz-mp-parts-count", String.valueOf(TOTAL_PARTS)) .withHeader("x-amz-request-id", UUID.randomUUID().toString()) - .withBody(part1Data))); + .withBody(PART_1_DATA))); - stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=2", BUCKET, KEY))) - .willReturn(aResponse() - .withStatus(200) - .withHeader("x-amz-mp-parts-count", String.valueOf(totalParts)) - .withHeader("x-amz-request-id", UUID.randomUUID().toString()) - .withBody(part2Data))); - - stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=3", BUCKET, KEY))) - .willReturn(aResponse() - .withStatus(200) - .withHeader("x-amz-mp-parts-count", String.valueOf(totalParts)) - .withHeader("x-amz-request-id", UUID.randomUUID().toString()) - .withBody(part3Data))); + stub200SuccessPart2(); + stub200SuccessPart3(); CompletableFuture> future = multipartClient.getObject(GetObjectRequest.builder().bucket(BUCKET).key(KEY).build(), @@ -311,6 +271,61 @@ public void multipartDownload_503OnFirstPart_shouldRetrySuccessfully() { verify(1, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=3", BUCKET, KEY)))); } + @Test + public void multipartDownload_503OnFirstPartAndSecondPart_shouldRetryFirstPartSuccessfullyAndFailOnSecondPart() { + // Stub Part 1 - 503 on first attempt, 200 on retry + stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=1", BUCKET, KEY))) + .inScenario("part1-retry") + .whenScenarioStateIs(Scenario.STARTED) + .willReturn(aResponse() + .withStatus(503) + .withHeader("x-amz-request-id", UUID.randomUUID().toString()) + .withBody(slowdownErrorBody())) + .willSetStateTo("retry-attempt")); + + stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=1", BUCKET, KEY))) + .inScenario("part1-retry") + .whenScenarioStateIs("retry-attempt") + .willReturn(aResponse() + .withStatus(200) + .withHeader("x-amz-mp-parts-count", String.valueOf(TOTAL_PARTS)) + .withHeader("x-amz-request-id", UUID.randomUUID().toString()) + .withBody(PART_1_DATA))); + + + // Stub Part 2 - 503 on first attempt, 200 on retry + stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=2", BUCKET, KEY))) + .inScenario("part2-retry") + .whenScenarioStateIs(Scenario.STARTED) + .willReturn(aResponse() + .withStatus(500) + .withHeader("x-amz-request-id", UUID.randomUUID().toString()) + .withBody(internalErrorBody())) + .willSetStateTo("retry-attempt")); + + stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=2", BUCKET, KEY))) + .inScenario("part2-retry") + .whenScenarioStateIs("retry-attempt") + .willReturn(aResponse() + .withStatus(200) + .withHeader("x-amz-mp-parts-count", String.valueOf(TOTAL_PARTS)) + .withHeader("x-amz-request-id", UUID.randomUUID().toString()) + .withBody(PART_2_DATA))); + + stub200SuccessPart3(); + + assertThatThrownBy(() -> multipartClient.getObject(b -> b.bucket(BUCKET).key(KEY), + AsyncResponseTransformer.toBytes()).join()) + .isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(S3Exception.class) + .hasMessageContaining("We encountered an internal error. Please try again."); + + // Verify that part 1 was requested twice (initial 503 + retry) + verify(2, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=1", BUCKET, KEY)))); + // Verify that part 2 was requested once (no retry) + verify(1, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=2", BUCKET, KEY)))); + } + @Test public void getObject_iOError_shouldRetrySuccessfully() { String requestId = UUID.randomUUID().toString(); @@ -350,83 +365,61 @@ public void getObject_iOError_shouldRetrySuccessfully() { @Test public void multipartDownload_errorDuringFirstPartAfterOnStream_shouldFailAndNotRetry() { - stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=1", BUCKET, KEY))) - .willReturn(aResponse() - .withHeader("x-amz-mp-parts-count", String.valueOf(2)) - .withStatus(200) - .withBody("Hello "))); - - stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=2", BUCKET, KEY))) - .willReturn(aResponse() - .withStatus(200) - .withHeader("x-amz-mp-parts-count", "2") - .withBody("World"))); + stub200SuccessPart1(); + stub200SuccessPart2(); + stub200SuccessPart3(); StreamingErrorTransformer failingTransformer = new StreamingErrorTransformer(); - assertThrows(CompletionException.class, () -> - multipartClient.getObject(b -> b.bucket(BUCKET).key(KEY), failingTransformer).join()); + + assertThatThrownBy(() -> multipartClient.getObject(b -> b.bucket(BUCKET).key(KEY), failingTransformer).join()) + .isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(RuntimeException.class); assertTrue(failingTransformer.onStreamCalled.get()); // Verify that the first part was requested only once and not retried verify(1, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=1", BUCKET, KEY)))); } - /** - * Custom AsyncResponseTransformer that simulates an error occurring after onStream() has been called - */ - private static final class StreamingErrorTransformer - implements AsyncResponseTransformer> { - - private final CompletableFuture> future = new CompletableFuture<>(); - private final AtomicBoolean errorThrown = new AtomicBoolean(); - private final AtomicBoolean onStreamCalled = new AtomicBoolean(); - - @Override - public CompletableFuture> prepare() { - return future; - } - - @Override - public void onResponse(GetObjectResponse response) { - // - } - - @Override - public void onStream(SdkPublisher publisher) { - onStreamCalled.set(true); - publisher.subscribe(new Subscriber() { - private Subscription subscription; + private String errorBody(String errorCode, String errorMessage) { + return "\n" + + "\n" + + " " + errorCode + "\n" + + " " + errorMessage + "\n" + + ""; + } - @Override - public void onSubscribe(Subscription s) { - this.subscription = s; - s.request(1); - } + private String internalErrorBody() { + return errorBody("InternalError", "We encountered an internal error. Please try again."); + } - @Override - public void onNext(ByteBuffer byteBuffer) { - if (errorThrown.compareAndSet(false, true)) { - future.completeExceptionally(new RuntimeException()); - subscription.cancel(); - } - } + private String slowdownErrorBody() { + return errorBody("SlowDown", "Please reduce your request rate."); + } - @Override - public void onError(Throwable t) { - future.completeExceptionally(t); - } + private void stub200SuccessPart1() { + stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=1", BUCKET, KEY))) + .willReturn(aResponse() + .withHeader("x-amz-request-id", String.valueOf(UUID.randomUUID())) + .withHeader("x-amz-mp-parts-count", String.valueOf(TOTAL_PARTS)) + .withStatus(200).withBody(PART_1_DATA))); + } - @Override - public void onComplete() { - // - } - }); - } + private void stub200SuccessPart2() { + stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=2", BUCKET, KEY))) + .willReturn(aResponse() + .withStatus(200) + .withHeader("x-amz-mp-parts-count", String.valueOf(TOTAL_PARTS)) + .withHeader("x-amz-request-id", UUID.randomUUID().toString()) + .withBody(PART_2_DATA))); + } - @Override - public void exceptionOccurred(Throwable throwable) { - future.completeExceptionally(throwable); - } + private void stub200SuccessPart3() { + stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=3", BUCKET, KEY))) + .willReturn(aResponse() + .withStatus(200) + .withHeader("x-amz-mp-parts-count", String.valueOf(TOTAL_PARTS)) + .withHeader("x-amz-request-id", UUID.randomUUID().toString()) + .withBody(PART_3_DATA))); } private CompletableFuture> mock200Response(S3AsyncClient s3Client, int runNumber) { @@ -454,7 +447,8 @@ private CompletableFuture> mockRetryableErrorTh .whenScenarioStateIs(Scenario.STARTED) .willReturn(aResponse() .withHeader("x-amz-request-id", String.valueOf(UUID.randomUUID())) - .withStatus(500).withBody(ERROR_BODY) + .withStatus(500) + .withBody(internalErrorBody()) ) .willSetStateTo("SecondAttempt" + runId)); @@ -487,4 +481,62 @@ public void clear() { responses.clear(); } } + + /** + * Custom AsyncResponseTransformer that simulates an error occurring after onStream() has been called + */ + private static final class StreamingErrorTransformer + implements AsyncResponseTransformer> { + + private final CompletableFuture> future = new CompletableFuture<>(); + private final AtomicBoolean errorThrown = new AtomicBoolean(); + private final AtomicBoolean onStreamCalled = new AtomicBoolean(); + + @Override + public CompletableFuture> prepare() { + return future; + } + + @Override + public void onResponse(GetObjectResponse response) { + // + } + + @Override + public void onStream(SdkPublisher publisher) { + onStreamCalled.set(true); + publisher.subscribe(new Subscriber() { + private Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + this.subscription = s; + s.request(1); + } + + @Override + public void onNext(ByteBuffer byteBuffer) { + if (errorThrown.compareAndSet(false, true)) { + future.completeExceptionally(new RuntimeException()); + subscription.cancel(); + } + } + + @Override + public void onError(Throwable t) { + future.completeExceptionally(t); + } + + @Override + public void onComplete() { + // + } + }); + } + + @Override + public void exceptionOccurred(Throwable throwable) { + future.completeExceptionally(throwable); + } + } } From 974965753cc9186cccbd2033e7cd9ac07a72bbb4 Mon Sep 17 00:00:00 2001 From: hdavidh Date: Tue, 5 Aug 2025 20:07:55 -0700 Subject: [PATCH 8/8] Address comments --- .../s3/internal/multipart/MultipartDownloaderSubscriber.java | 1 - 1 file changed, 1 deletion(-) diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java index a9a2925b77d6..c84f55935d57 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java @@ -125,7 +125,6 @@ public void onNext(AsyncResponseTransformer getObjectFuture = s3.getObject(actualRequest, asyncResponseTransformer); getObjectFutures.add(getObjectFuture); getObjectFuture.whenComplete((response, error) -> { - getObjectFutures.remove(getObjectFuture); if (error != null) { log.debug(() -> "Error encountered during GetObjectRequest with partNumber=" + nextPartToGet); onError(error);