Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ jobs:
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket-for-spawn
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket-for-checksum
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket-for-copy-if-not-exists
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket-for-multipart-copy-large
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket-for-multipart-copy-small
aws --endpoint-url=http://localhost:4566 s3api create-bucket --bucket test-object-lock --object-lock-enabled-for-bucket
KMS_KEY=$(aws --endpoint-url=http://localhost:4566 kms create-key --description "test key")
Expand Down
60 changes: 60 additions & 0 deletions src/aws/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ use url::Url;
/// Default metadata endpoint
static DEFAULT_METADATA_ENDPOINT: &str = "http://169.254.169.254";

/// AWS S3 does not support copy operations larger than 5 GiB in a single request. See
/// [CopyObject](https://docs.aws.amazon.com/AmazonS3/latest/userguide/copy-object.html) for more
/// details.
const MAX_SINGLE_REQUEST_COPY_SIZE: u64 = 5 * 1024 * 1024 * 1024;

/// A specialized `Error` for object store-related errors
#[derive(Debug, thiserror::Error)]
enum Error {
Expand Down Expand Up @@ -189,6 +194,10 @@ pub struct AmazonS3Builder {
request_payer: ConfigValue<bool>,
/// The [`HttpConnector`] to use
http_connector: Option<Arc<dyn HttpConnector>>,
/// Threshold (bytes) above which copy uses multipart copy. If not set, defaults to 5 GiB.
multipart_copy_threshold: Option<ConfigValue<u64>>,
/// Preferred multipart copy part size (bytes). If not set, defaults to 5 GiB.
multipart_copy_part_size: Option<ConfigValue<u64>>,
}

/// Configuration keys for [`AmazonS3Builder`]
Expand Down Expand Up @@ -423,6 +432,10 @@ pub enum AmazonS3ConfigKey {

/// Encryption options
Encryption(S3EncryptionConfigKey),
/// Threshold (bytes) to switch to multipart copy
MultipartCopyThreshold,
/// Preferred multipart copy part size (bytes)
MultipartCopyPartSize,
}

impl AsRef<str> for AmazonS3ConfigKey {
Expand Down Expand Up @@ -455,6 +468,8 @@ impl AsRef<str> for AmazonS3ConfigKey {
Self::RequestPayer => "aws_request_payer",
Self::Client(opt) => opt.as_ref(),
Self::Encryption(opt) => opt.as_ref(),
Self::MultipartCopyThreshold => "aws_multipart_copy_threshold",
Self::MultipartCopyPartSize => "aws_multipart_copy_part_size",
}
}
}
Expand Down Expand Up @@ -499,6 +514,12 @@ impl FromStr for AmazonS3ConfigKey {
"aws_conditional_put" | "conditional_put" => Ok(Self::ConditionalPut),
"aws_disable_tagging" | "disable_tagging" => Ok(Self::DisableTagging),
"aws_request_payer" | "request_payer" => Ok(Self::RequestPayer),
"aws_multipart_copy_threshold" | "multipart_copy_threshold" => {
Ok(Self::MultipartCopyThreshold)
}
"aws_multipart_copy_part_size" | "multipart_copy_part_size" => {
Ok(Self::MultipartCopyPartSize)
}
// Backwards compatibility
"aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
"aws_server_side_encryption" | "server_side_encryption" => Ok(Self::Encryption(
Expand Down Expand Up @@ -666,6 +687,12 @@ impl AmazonS3Builder {
self.encryption_customer_key_base64 = Some(value.into())
}
},
AmazonS3ConfigKey::MultipartCopyThreshold => {
self.multipart_copy_threshold = Some(ConfigValue::Deferred(value.into()))
}
AmazonS3ConfigKey::MultipartCopyPartSize => {
self.multipart_copy_part_size = Some(ConfigValue::Deferred(value.into()))
}
};
self
}
Expand Down Expand Up @@ -733,6 +760,14 @@ impl AmazonS3Builder {
self.encryption_customer_key_base64.clone()
}
},
AmazonS3ConfigKey::MultipartCopyThreshold => self
.multipart_copy_threshold
.as_ref()
.map(|x| x.to_string()),
AmazonS3ConfigKey::MultipartCopyPartSize => self
.multipart_copy_part_size
.as_ref()
.map(|x| x.to_string()),
}
}

Expand Down Expand Up @@ -1029,6 +1064,18 @@ impl AmazonS3Builder {
self
}

/// Set threshold (bytes) above which copy uses multipart copy
pub fn with_multipart_copy_threshold(mut self, threshold_bytes: u64) -> Self {
self.multipart_copy_threshold = Some(ConfigValue::Parsed(threshold_bytes));
self
}

/// Set preferred multipart copy part size (bytes)
pub fn with_multipart_copy_part_size(mut self, part_size_bytes: u64) -> Self {
self.multipart_copy_part_size = Some(ConfigValue::Parsed(part_size_bytes));
self
}

/// Create a [`AmazonS3`] instance from the provided values,
/// consuming `self`.
pub fn build(mut self) -> Result<AmazonS3> {
Expand Down Expand Up @@ -1185,6 +1232,17 @@ impl AmazonS3Builder {
S3EncryptionHeaders::default()
};

let multipart_copy_threshold = self
.multipart_copy_threshold
.map(|val| val.get())
.transpose()?
.unwrap_or(MAX_SINGLE_REQUEST_COPY_SIZE);
let multipart_copy_part_size = self
.multipart_copy_part_size
.map(|val| val.get())
.transpose()?
.unwrap_or(MAX_SINGLE_REQUEST_COPY_SIZE);

let config = S3Config {
region,
bucket,
Expand All @@ -1201,6 +1259,8 @@ impl AmazonS3Builder {
conditional_put: self.conditional_put.get()?,
encryption_headers,
request_payer: self.request_payer.get()?,
multipart_copy_threshold,
multipart_copy_part_size,
};

let http_client = http.connect(&config.client_options)?;
Expand Down
24 changes: 23 additions & 1 deletion src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ impl From<Error> for crate::Error {
pub(crate) enum PutPartPayload<'a> {
Part(PutPayload),
Copy(&'a Path),
CopyRange(&'a Path, std::ops::Range<u64>),
}

impl Default for PutPartPayload<'_> {
Expand Down Expand Up @@ -209,6 +210,10 @@ pub(crate) struct S3Config {
pub conditional_put: S3ConditionalPut,
pub request_payer: bool,
pub(super) encryption_headers: S3EncryptionHeaders,
/// Threshold in bytes above which copy will use multipart copy
pub multipart_copy_threshold: u64,
/// Preferred multipart copy part size in bytes (None => auto)
pub multipart_copy_part_size: u64,
}

impl S3Config {
Expand Down Expand Up @@ -681,7 +686,10 @@ impl S3Client {
part_idx: usize,
data: PutPartPayload<'_>,
) -> Result<PartId> {
let is_copy = matches!(data, PutPartPayload::Copy(_));
let is_copy = matches!(
data,
PutPartPayload::Copy(_) | PutPartPayload::CopyRange(_, _)
);
let part = (part_idx + 1).to_string();

let mut request = self
Expand All @@ -695,6 +703,18 @@ impl S3Client {
"x-amz-copy-source",
&format!("{}/{}", self.config.bucket, encode_path(path)),
),
PutPartPayload::CopyRange(path, range) => {
// AWS expects inclusive end for copy range header
let start = range.start;
let end_inclusive = range.end.saturating_sub(1);
let range_value = format!("bytes={}-{}", start, end_inclusive);
request
.header(
"x-amz-copy-source",
&format!("{}/{}", self.config.bucket, encode_path(path)),
)
.header("x-amz-copy-source-range", &range_value)
}
};

if self
Expand Down Expand Up @@ -1000,6 +1020,8 @@ mod tests {
conditional_put: Default::default(),
encryption_headers: Default::default(),
request_payer: false,
multipart_copy_threshold: 5 * 1024 * 1024 * 1024,
multipart_copy_part_size: 5 * 1024 * 1024 * 1024,
};

let client = S3Client::new(config, HttpClient::new(reqwest::Client::new()));
Expand Down
Loading