Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ jobs:
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket-for-spawn
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket-for-checksum
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket-for-copy-if-not-exists
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket-for-multipart-copy-large
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket-for-multipart-copy-small
aws --endpoint-url=http://localhost:4566 s3api create-bucket --bucket test-object-lock --object-lock-enabled-for-bucket

KMS_KEY=$(aws --endpoint-url=http://localhost:4566 kms create-key --description "test key")
Expand Down
60 changes: 60 additions & 0 deletions src/aws/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ use url::Url;
/// Default metadata endpoint
static DEFAULT_METADATA_ENDPOINT: &str = "http://169.254.169.254";

/// AWS S3 does not support copy operations larger than 5 GiB in a single request. See
/// [CopyObject](https://docs.aws.amazon.com/AmazonS3/latest/userguide/copy-object.html) for more
/// details.
const MAX_SINGLE_REQUEST_COPY_SIZE: u64 = 5 * 1024 * 1024 * 1024;

/// A specialized `Error` for object store-related errors
#[derive(Debug, thiserror::Error)]
enum Error {
Expand Down Expand Up @@ -189,6 +194,10 @@ pub struct AmazonS3Builder {
request_payer: ConfigValue<bool>,
/// The [`HttpConnector`] to use
http_connector: Option<Arc<dyn HttpConnector>>,
/// Threshold (bytes) above which copy uses multipart copy. If not set, defaults to 5 GiB.
multipart_copy_threshold: Option<ConfigValue<u64>>,
/// Preferred multipart copy part size (bytes). If not set, defaults to 5 GiB.
multipart_copy_part_size: Option<ConfigValue<u64>>,
}

/// Configuration keys for [`AmazonS3Builder`]
Expand Down Expand Up @@ -423,6 +432,10 @@ pub enum AmazonS3ConfigKey {

/// Encryption options
Encryption(S3EncryptionConfigKey),
/// Threshold (bytes) to switch to multipart copy
MultipartCopyThreshold,
/// Preferred multipart copy part size (bytes)
MultipartCopyPartSize,
}

impl AsRef<str> for AmazonS3ConfigKey {
Expand Down Expand Up @@ -455,6 +468,8 @@ impl AsRef<str> for AmazonS3ConfigKey {
Self::RequestPayer => "aws_request_payer",
Self::Client(opt) => opt.as_ref(),
Self::Encryption(opt) => opt.as_ref(),
Self::MultipartCopyThreshold => "aws_multipart_copy_threshold",
Self::MultipartCopyPartSize => "aws_multipart_copy_part_size",
}
}
}
Expand Down Expand Up @@ -499,6 +514,12 @@ impl FromStr for AmazonS3ConfigKey {
"aws_conditional_put" | "conditional_put" => Ok(Self::ConditionalPut),
"aws_disable_tagging" | "disable_tagging" => Ok(Self::DisableTagging),
"aws_request_payer" | "request_payer" => Ok(Self::RequestPayer),
"aws_multipart_copy_threshold" | "multipart_copy_threshold" => {
Ok(Self::MultipartCopyThreshold)
}
"aws_multipart_copy_part_size" | "multipart_copy_part_size" => {
Ok(Self::MultipartCopyPartSize)
}
// Backwards compatibility
"aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
"aws_server_side_encryption" | "server_side_encryption" => Ok(Self::Encryption(
Expand Down Expand Up @@ -666,6 +687,12 @@ impl AmazonS3Builder {
self.encryption_customer_key_base64 = Some(value.into())
}
},
AmazonS3ConfigKey::MultipartCopyThreshold => {
self.multipart_copy_threshold = Some(ConfigValue::Deferred(value.into()))
}
AmazonS3ConfigKey::MultipartCopyPartSize => {
self.multipart_copy_part_size = Some(ConfigValue::Deferred(value.into()))
}
};
self
}
Expand Down Expand Up @@ -733,6 +760,14 @@ impl AmazonS3Builder {
self.encryption_customer_key_base64.clone()
}
},
AmazonS3ConfigKey::MultipartCopyThreshold => self
.multipart_copy_threshold
.as_ref()
.map(|x| x.to_string()),
AmazonS3ConfigKey::MultipartCopyPartSize => self
.multipart_copy_part_size
.as_ref()
.map(|x| x.to_string()),
}
}

Expand Down Expand Up @@ -1029,6 +1064,18 @@ impl AmazonS3Builder {
self
}

/// Set threshold (bytes) above which copy uses multipart copy
pub fn with_multipart_copy_threshold(mut self, threshold_bytes: u64) -> Self {
self.multipart_copy_threshold = Some(ConfigValue::Parsed(threshold_bytes));
self
}

/// Set preferred multipart copy part size (bytes)
pub fn with_multipart_copy_part_size(mut self, part_size_bytes: u64) -> Self {
self.multipart_copy_part_size = Some(ConfigValue::Parsed(part_size_bytes));
self
}

/// Create a [`AmazonS3`] instance from the provided values,
/// consuming `self`.
pub fn build(mut self) -> Result<AmazonS3> {
Expand Down Expand Up @@ -1185,6 +1232,17 @@ impl AmazonS3Builder {
S3EncryptionHeaders::default()
};

let multipart_copy_threshold = self
.multipart_copy_threshold
.map(|val| val.get())
.transpose()?
.unwrap_or(MAX_SINGLE_REQUEST_COPY_SIZE);
let multipart_copy_part_size = self
.multipart_copy_part_size
.map(|val| val.get())
.transpose()?
.unwrap_or(MAX_SINGLE_REQUEST_COPY_SIZE);

let config = S3Config {
region,
bucket,
Expand All @@ -1201,6 +1259,8 @@ impl AmazonS3Builder {
conditional_put: self.conditional_put.get()?,
encryption_headers,
request_payer: self.request_payer.get()?,
multipart_copy_threshold,
multipart_copy_part_size,
};

let http_client = http.connect(&config.client_options)?;
Expand Down
24 changes: 23 additions & 1 deletion src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ impl From<Error> for crate::Error {
pub(crate) enum PutPartPayload<'a> {
Part(PutPayload),
Copy(&'a Path),
CopyRange(&'a Path, std::ops::Range<u64>),
}

impl Default for PutPartPayload<'_> {
Expand Down Expand Up @@ -209,6 +210,10 @@ pub(crate) struct S3Config {
pub conditional_put: S3ConditionalPut,
pub request_payer: bool,
pub(super) encryption_headers: S3EncryptionHeaders,
/// Threshold in bytes above which copy will use multipart copy
pub multipart_copy_threshold: u64,
/// Preferred multipart copy part size in bytes (None => auto)
pub multipart_copy_part_size: u64,
}

impl S3Config {
Expand Down Expand Up @@ -681,7 +686,10 @@ impl S3Client {
part_idx: usize,
data: PutPartPayload<'_>,
) -> Result<PartId> {
let is_copy = matches!(data, PutPartPayload::Copy(_));
let is_copy = matches!(
data,
PutPartPayload::Copy(_) | PutPartPayload::CopyRange(_, _)
);
let part = (part_idx + 1).to_string();

let mut request = self
Expand All @@ -695,6 +703,18 @@ impl S3Client {
"x-amz-copy-source",
&format!("{}/{}", self.config.bucket, encode_path(path)),
),
PutPartPayload::CopyRange(path, range) => {
// AWS expects inclusive end for copy range header
let start = range.start;
let end_inclusive = range.end.saturating_sub(1);
let range_value = format!("bytes={}-{}", start, end_inclusive);
request
.header(
"x-amz-copy-source",
&format!("{}/{}", self.config.bucket, encode_path(path)),
)
.header("x-amz-copy-source-range", &range_value)
}
};

if self
Expand Down Expand Up @@ -1000,6 +1020,8 @@ mod tests {
conditional_put: Default::default(),
encryption_headers: Default::default(),
request_payer: false,
multipart_copy_threshold: 5 * 1024 * 1024 * 1024,
multipart_copy_part_size: 5 * 1024 * 1024 * 1024,
};

let client = S3Client::new(config, HttpClient::new(reqwest::Client::new()));
Expand Down
Loading