Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/feature-s3-fe521b0.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "s3",
"contributor": "",
"description": "Add CRT shouldStream config as CRT_MEMORY_BUFFER_DISABLED SDK advanced client option"
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ public final class SdkAdvancedAsyncClientOption<T> extends ClientOption<T> {
public static final SdkAdvancedAsyncClientOption<Executor> FUTURE_COMPLETION_EXECUTOR =
new SdkAdvancedAsyncClientOption<>(Executor.class);

/**
* Advanced configuration for the native S3CrtAsyncClient which only applies for multipart uploads. When set to true,
* the client will skip buffering the part in native memory before sending the request. Default to false on small objects,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this a little hard to parse and I'm not quite sure I understand the actual behavior... what about:

When set to true the the client will never buffer parts in native memory before sending the request.  When set to false the client will buffer parts for large objects.  Parts for small objects are never buffered. 

* and true when the object size exceed a certain threshold. When set to true, the client will also skip
* buffering for small objects.
*/
public static final SdkAdvancedAsyncClientOption<Boolean> CRT_MEMORY_BUFFER_DISABLED =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there as reason this should be on SdkAdvanced options? That seems to imply that it might apply to any async client rather than specifically to S3 CRT only?

Is there a reason to not put it on S3CrtAsyncClientBuilder only? I assume its because we want to "hide" it in "advanced" options rather than increasing the discoverability by having it on the S3 CRT client's builder direclty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly, we don't want this to be as discoverable as other options. This is what we decided when we discussed about it this week.

new SdkAdvancedAsyncClientOption<>(Boolean.class);

private SdkAdvancedAsyncClientOption(Class<T> valueClass) {
super(valueClass);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.crt.CrtResource;
Expand Down Expand Up @@ -79,6 +80,25 @@ public static void teardown() throws IOException {

@Test
void putObject_fileRequestBody_objectSentCorrectly() throws Exception {
S3AsyncClient crtClientWithMemoryBufferDisabled = S3CrtAsyncClient.builder()
.credentialsProvider(AwsTestBase.CREDENTIALS_PROVIDER_CHAIN)
.region(S3IntegrationTestBase.DEFAULT_REGION)
.advancedOption(SdkAdvancedAsyncClientOption.CRT_MEMORY_BUFFER_DISABLED, true)
.build();

AsyncRequestBody body = AsyncRequestBody.fromFile(testFile.toPath());
crtClientWithMemoryBufferDisabled.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), body).join();

ResponseInputStream<GetObjectResponse> objContent = S3IntegrationTestBase.s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY),
ResponseTransformer.toInputStream());

byte[] expectedSum = ChecksumUtils.computeCheckSum(Files.newInputStream(testFile.toPath()));

Assertions.assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum);
}

@Test
void putObject_withMemoryBufferDisabled_fileRequestBody_objectSentCorrectly() throws Exception {
AsyncRequestBody body = AsyncRequestBody.fromFile(testFile.toPath());
s3Crt.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), body).join();

Expand All @@ -90,6 +110,7 @@ void putObject_fileRequestBody_objectSentCorrectly() throws Exception {
Assertions.assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum);
}


@Test
void putObject_file_objectSentCorrectly() throws Exception {
s3Crt.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), testFile.toPath()).join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.net.URI;
import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -25,6 +26,7 @@
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
import software.amazon.awssdk.core.checksums.ResponseChecksumValidation;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
import software.amazon.awssdk.identity.spi.IdentityProvider;
import software.amazon.awssdk.regions.Region;
Expand Down Expand Up @@ -361,6 +363,21 @@ default S3CrtAsyncClientBuilder retryConfiguration(Consumer<S3CrtRetryConfigurat
*/
S3CrtAsyncClientBuilder disableS3ExpressSessionAuth(Boolean disableS3ExpressSessionAuth);

/**
* Configure an advanced async option. These values are used very rarely, and the majority of SDK customers can ignore
* them.
*
* @param option The option to configure.
* @param value The value of the option.
* @param <T> The type of the option.
*/
<T> S3CrtAsyncClientBuilder advancedOption(SdkAdvancedAsyncClientOption<T> option, T value);

/**
* Configure the map of advanced override options. This will override all values currently configured. The values in the
* map must match the key type of the map, or a runtime exception will be raised.
*/
S3CrtAsyncClientBuilder advancedOptions(Map<SdkAdvancedAsyncClientOption<?>, ?> advancedOptions);

@Override
S3AsyncClient build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import software.amazon.awssdk.annotations.SdkInternalApi;
Expand All @@ -47,6 +48,7 @@
import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
import software.amazon.awssdk.core.checksums.ResponseChecksumValidation;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttribute;
Expand Down Expand Up @@ -78,6 +80,7 @@
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.awssdk.utils.Validate;

Expand Down Expand Up @@ -222,7 +225,8 @@ private static S3CrtAsyncHttpClient.Builder initializeS3CrtAsyncHttpClient(Defau
.readBufferSizeInBytes(builder.readBufferSizeInBytes)
.httpConfiguration(builder.httpConfiguration)
.thresholdInBytes(builder.thresholdInBytes)
.maxNativeMemoryLimitInBytes(builder.maxNativeMemoryLimitInBytes);
.maxNativeMemoryLimitInBytes(builder.maxNativeMemoryLimitInBytes)
.advancedOptions(builder.advancedOptions.build());

if (builder.retryConfiguration != null) {
nativeClientBuilder.standardRetryOptions(
Expand Down Expand Up @@ -257,6 +261,7 @@ public static final class DefaultS3CrtClientBuilder implements S3CrtAsyncClientB
private Executor futureCompletionExecutor;
private Boolean disableS3ExpressSessionAuth;

private AttributeMap.Builder advancedOptions = AttributeMap.builder();

@Override
public DefaultS3CrtClientBuilder credentialsProvider(AwsCredentialsProvider credentialsProvider) {
Expand Down Expand Up @@ -388,6 +393,18 @@ public DefaultS3CrtClientBuilder disableS3ExpressSessionAuth(Boolean disableS3Ex
return this;
}

@Override
public <T> DefaultS3CrtClientBuilder advancedOption(SdkAdvancedAsyncClientOption<T> option, T value) {
this.advancedOptions.put(option, value);
return this;
}

@Override
public DefaultS3CrtClientBuilder advancedOptions(Map<SdkAdvancedAsyncClientOption<?>, ?> advancedOptions) {
this.advancedOptions.putAll(advancedOptions);
return this;
}

@Override
public S3CrtAsyncClient build() {
return new DefaultS3CrtAsyncClient(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import software.amazon.awssdk.crt.http.HttpProxyEnvironmentVariableSetting;
import software.amazon.awssdk.crt.http.HttpRequest;
import software.amazon.awssdk.crt.s3.ChecksumConfig;
import software.amazon.awssdk.crt.s3.FileIoOptions;
import software.amazon.awssdk.crt.s3.ResumeToken;
import software.amazon.awssdk.crt.s3.S3Client;
import software.amazon.awssdk.crt.s3.S3ClientOptions;
Expand Down Expand Up @@ -127,6 +128,8 @@ private S3ClientOptions createS3ClientOption() {
.ifPresent(options::withConnectTimeoutMs);
Optional.ofNullable(s3NativeClientConfiguration.httpMonitoringOptions())
.ifPresent(options::withHttpMonitoringOptions);
Optional.ofNullable(s3NativeClientConfiguration.memoryBufferDisabled())
.ifPresent(memoryBufferDisabled -> options.withFileIoOptions(new FileIoOptions(memoryBufferDisabled, 0.0, false)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be setting the other FileIOptions here? I think the diskThroughputGbps (the 0.0?) applies only shouldStream/memoryBufferDisabled is true - so what if the user has set memoryBufferDisabled=true and we now set the diskThroughput to 0?

Ditto on setting directIO - I assume false is the default and we don't expose configuration for this anywhere else?

Copy link
Contributor Author

@L-Applin L-Applin Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when disk throughput is set to 0, it uses the default value, which is the same as the throughputTargetGbps config. We decided to only expose shouldStream/memoryBufferDisabled so I am just setting the default value s here for he other FileIoOptions as they can't be left null.

The default for directIO is indeed false and yes we don't expose it.

return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package software.amazon.awssdk.services.s3.internal.crt;

import static software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption.CRT_MEMORY_BUFFER_DISABLED;
import static software.amazon.awssdk.crtcore.CrtConfigurationUtils.resolveHttpMonitoringOptions;
import static software.amazon.awssdk.crtcore.CrtConfigurationUtils.resolveProxy;

Expand All @@ -33,6 +34,7 @@
import software.amazon.awssdk.identity.spi.IdentityProvider;
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration;
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.SdkAutoCloseable;
import software.amazon.awssdk.utils.Validate;
Expand Down Expand Up @@ -64,6 +66,7 @@ public class S3NativeClientConfiguration implements SdkAutoCloseable {
private final HttpMonitoringOptions httpMonitoringOptions;
private final Boolean useEnvironmentVariableProxyOptionsValues;
private final long maxNativeMemoryLimitInBytes;
private final Boolean memoryBufferDisabled;

public S3NativeClientConfiguration(Builder builder) {
this.signingRegion = builder.signingRegion == null ? DefaultAwsRegionProviderChain.builder().build().getRegion().id() :
Expand Down Expand Up @@ -113,6 +116,8 @@ public S3NativeClientConfiguration(Builder builder) {
}
this.standardRetryOptions = builder.standardRetryOptions;
this.useEnvironmentVariableProxyOptionsValues = resolveUseEnvironmentVariableValues(builder);
this.memoryBufferDisabled =
builder.advancedOptions == null ? null : builder.advancedOptions.get(CRT_MEMORY_BUFFER_DISABLED);
}

private static Boolean resolveUseEnvironmentVariableValues(Builder builder) {
Expand Down Expand Up @@ -191,6 +196,10 @@ public Long readBufferSizeInBytes() {
return readBufferSizeInBytes;
}

public Boolean memoryBufferDisabled() {
return memoryBufferDisabled;
}

@Override
public void close() {
clientBootstrap.close();
Expand All @@ -213,6 +222,8 @@ public static final class Builder {
private Long thresholdInBytes;
private Long maxNativeMemoryLimitInBytes;

private AttributeMap advancedOptions;

private Builder() {
}

Expand Down Expand Up @@ -274,5 +285,10 @@ public Builder thresholdInBytes(Long thresholdInBytes) {
this.thresholdInBytes = thresholdInBytes;
return this;
}

public Builder advancedOptions(AttributeMap advancedOptions) {
this.advancedOptions = advancedOptions;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption.CRT_MEMORY_BUFFER_DISABLED;
import static software.amazon.awssdk.http.Header.CONTENT_LENGTH;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.HTTP_CHECKSUM;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.OPERATION_NAME;
Expand Down Expand Up @@ -68,6 +69,7 @@
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration;
import software.amazon.awssdk.testutils.RandomTempFile;
import software.amazon.awssdk.utils.AttributeMap;

public class S3CrtAsyncHttpClientTest {
private static final URI DEFAULT_ENDPOINT = URI.create("https://127.0.0.1:443");
Expand Down Expand Up @@ -124,7 +126,7 @@ public void defaultRequest_shouldSetMetaRequestOptionsCorrectly(Integer port) {
.collect(HashMap::new, (m, h) -> m.put(h.getName(), h.getValue())
, Map::putAll);

String expectedPort = port == null || port.equals(443) ? "" : ":" + port;
String expectedPort = port == null || port.equals(443) ? "" : ":" + port;
assertThat(headers).hasSize(4)
.containsEntry("Host", DEFAULT_ENDPOINT.getHost() + expectedPort)
.containsEntry("custom-header", "foobar")
Expand All @@ -135,7 +137,7 @@ public void defaultRequest_shouldSetMetaRequestOptionsCorrectly(Integer port) {
@Test
public void getObject_shouldSetMetaRequestTypeCorrectly() {
AsyncExecuteRequest asyncExecuteRequest = getExecuteRequestBuilder().putHttpExecutionAttribute(OPERATION_NAME,
"GetObject").build();
"GetObject").build();

S3MetaRequestOptions actual = makeRequest(asyncExecuteRequest);
assertThat(actual.getMetaRequestType()).isEqualTo(S3MetaRequestOptions.MetaRequestType.GET_OBJECT);
Expand All @@ -145,7 +147,7 @@ public void getObject_shouldSetMetaRequestTypeCorrectly() {
@Test
public void putObject_shouldSetMetaRequestTypeCorrectly() {
AsyncExecuteRequest asyncExecuteRequest = getExecuteRequestBuilder().putHttpExecutionAttribute(OPERATION_NAME,
"PutObject").build();
"PutObject").build();

S3MetaRequestOptions actual = makeRequest(asyncExecuteRequest);
assertThat(actual.getMetaRequestType()).isEqualTo(S3MetaRequestOptions.MetaRequestType.PUT_OBJECT);
Expand Down Expand Up @@ -318,7 +320,7 @@ public void operationWithResponseAlgorithms_optInFromRequest_shouldHonor() {
s3NativeClientConfiguration = S3NativeClientConfiguration.builder()
.endpointOverride(DEFAULT_ENDPOINT)
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test",
"test")))
"test")))
.build();

asyncHttpClient = new S3CrtAsyncHttpClient(s3Client, S3CrtAsyncHttpClient.builder()
Expand Down Expand Up @@ -443,6 +445,7 @@ void build_shouldPassThroughParameters() {
.minimumThroughputTimeout(Duration.ofSeconds(2)))
.proxyConfiguration(p -> p.host("127.0.0.1").port(8080))
.build())
.advancedOptions(AttributeMap.builder().put(CRT_MEMORY_BUFFER_DISABLED, true).build())
.build();
try (S3CrtAsyncHttpClient client =
(S3CrtAsyncHttpClient) S3CrtAsyncHttpClient.builder().s3ClientConfiguration(configuration).build()) {
Expand All @@ -466,6 +469,26 @@ void build_shouldPassThroughParameters() {
assertThat(clientOptions.getMaxConnections()).isEqualTo(100);
assertThat(clientOptions.getThroughputTargetGbps()).isEqualTo(3.5);
assertThat(clientOptions.getMemoryLimitInBytes()).isEqualTo(5L * 1024 * 1024 * 1024);
assertThat(clientOptions.getFileIoOptions()).isNotNull();
assertThat(clientOptions.getFileIoOptions().getShouldStream()).isTrue();
assertThat(clientOptions.getFileIoOptions().getDiskThroughputGbps()).isZero();
assertThat(clientOptions.getFileIoOptions().getDirectIo()).isFalse();
}
}

@Test
void build_advancedOptionsNotSet_shouldUseDefault() {
String signingRegion = "us-west-2";
S3NativeClientConfiguration configuration =
S3NativeClientConfiguration.builder()
.signingRegion(signingRegion)
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test",
"test")))
.build();
try (S3CrtAsyncHttpClient client =
(S3CrtAsyncHttpClient) S3CrtAsyncHttpClient.builder().s3ClientConfiguration(configuration).build()) {
S3ClientOptions clientOptions = client.s3ClientOptions();
assertThat(clientOptions.getFileIoOptions()).isNull();
}
}

Expand Down Expand Up @@ -554,8 +577,8 @@ void build_ProxyConfigurationWithEnvironmentVariables(S3CrtHttpConfiguration s3C
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test",
"test")))
.build();
try(S3CrtAsyncHttpClient client =
(S3CrtAsyncHttpClient) S3CrtAsyncHttpClient.builder().s3ClientConfiguration(configuration).build()) {
try (S3CrtAsyncHttpClient client =
(S3CrtAsyncHttpClient) S3CrtAsyncHttpClient.builder().s3ClientConfiguration(configuration).build()) {
S3ClientOptions clientOptions = client.s3ClientOptions();
if (environmentVariableType == null) {
assertThat(clientOptions.getHttpProxyEnvironmentVariableSetting()).isNull();
Expand Down
Loading