diff --git a/src/chunked.rs b/src/chunked.rs index 53dbf1e9..49632ed6 100644 --- a/src/chunked.rs +++ b/src/chunked.rs @@ -29,7 +29,7 @@ use futures::stream::BoxStream; use crate::path::Path; use crate::{ CopyOptions, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, - ObjectStore, PutMultipartOptions, PutOptions, PutResult, + ObjectStore, PutMultipartOptions, PutOptions, PutResult, RenameOptions, }; use crate::{PutPayload, Result}; @@ -170,12 +170,8 @@ impl ObjectStore for ChunkedStore { self.inner.copy_opts(from, to, options).await } - async fn rename(&self, from: &Path, to: &Path) -> Result<()> { - self.inner.rename(from, to).await - } - - async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - self.inner.rename_if_not_exists(from, to).await + async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> { + self.inner.rename_opts(from, to, options).await } } diff --git a/src/lib.rs b/src/lib.rs index 9b596279..57e1a8d3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1035,19 +1035,22 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// /// By default, this is implemented as a copy and then delete source. It may not /// check when deleting source that it was the same object that was originally copied. - /// - /// If there exists an object at the destination, it will be overwritten. - async fn rename(&self, from: &Path, to: &Path) -> Result<()> { - self.copy(from, to).await?; - self.delete(from).await - } - - /// Move an object from one path to another in the same object store. - /// - /// Will return an error if the destination already has an object. - async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - self.copy_if_not_exists(from, to).await?; - self.delete(from).await + async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> { + let RenameOptions { + target_mode, + extensions, + } = options; + let copy_mode = match target_mode { + RenameTargetMode::Overwrite => CopyMode::Overwrite, + RenameTargetMode::Create => CopyMode::Create, + }; + let copy_options = CopyOptions { + mode: copy_mode, + extensions, + }; + self.copy_opts(from, to, copy_options).await?; + self.delete(from).await?; + Ok(()) } } @@ -1116,12 +1119,13 @@ macro_rules! as_ref_impl { self.as_ref().copy_opts(from, to, options).await } - async fn rename(&self, from: &Path, to: &Path) -> Result<()> { - self.as_ref().rename(from, to).await - } - - async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - self.as_ref().rename_if_not_exists(from, to).await + async fn rename_opts( + &self, + from: &Path, + to: &Path, + options: RenameOptions, + ) -> Result<()> { + self.as_ref().rename_opts(from, to, options).await } } }; @@ -1238,6 +1242,19 @@ pub trait ObjectStoreExt: ObjectStore { /// If atomic operations are not supported by the underlying object storage (like S3) /// it will return an error. fn copy_if_not_exists(&self, from: &Path, to: &Path) -> impl Future>; + + /// Move an object from one path to another in the same object store. + /// + /// By default, this is implemented as a copy and then delete source. It may not + /// check when deleting source that it was the same object that was originally copied. + /// + /// If there exists an object at the destination, it will be overwritten. + fn rename(&self, from: &Path, to: &Path) -> impl Future>; + + /// Move an object from one path to another in the same object store. + /// + /// Will return an error if the destination already has an object. + fn rename_if_not_exists(&self, from: &Path, to: &Path) -> impl Future>; } impl ObjectStoreExt for T @@ -1277,6 +1294,16 @@ where let options = CopyOptions::new().with_mode(CopyMode::Create); self.copy_opts(from, to, options).await } + + async fn rename(&self, from: &Path, to: &Path) -> Result<()> { + let options = RenameOptions::new().with_target_mode(RenameTargetMode::Overwrite); + self.rename_opts(from, to, options).await + } + + async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + let options = RenameOptions::new().with_target_mode(RenameTargetMode::Create); + self.rename_opts(from, to, options).await + } } /// Result of a list call that includes objects, prefixes (directories) and a @@ -1828,6 +1855,76 @@ impl PartialEq for CopyOptions { impl Eq for CopyOptions {} +/// Configure preconditions for the target of rename operation. +/// +/// Note though that the source location may or not be deleted at the same time in an atomic operation. There is +/// currently NO flag to control the atomicity of "delete source at the same time as creating the target". +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum RenameTargetMode { + /// Perform a write operation on the target, overwriting any object present at the provided path. + #[default] + Overwrite, + /// Perform an atomic write operation of the target, returning [`Error::AlreadyExists`] if an + /// object already exists at the provided path. + Create, +} + +/// Options for a rename request +#[derive(Debug, Clone, Default)] +pub struct RenameOptions { + /// Configure the [`RenameTargetMode`] for this operation + pub target_mode: RenameTargetMode, + /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations + /// that need to pass context-specific information (like tracing spans) via trait methods. + /// + /// These extensions are ignored entirely by backends offered through this crate. + /// + /// They are also excluded from [`PartialEq`] and [`Eq`]. + pub extensions: Extensions, +} + +impl RenameOptions { + /// Create a new [`RenameOptions`] + pub fn new() -> Self { + Self::default() + } + + /// Sets the `target_mode=. + /// + /// See [`RenameOptions::target_mode`]. + #[must_use] + pub fn with_target_mode(mut self, target_mode: RenameTargetMode) -> Self { + self.target_mode = target_mode; + self + } + + /// Sets the `extensions`. + /// + /// See [`RenameOptions::extensions`]. + #[must_use] + pub fn with_extensions(mut self, extensions: Extensions) -> Self { + self.extensions = extensions; + self + } +} + +impl PartialEq for RenameOptions { + fn eq(&self, other: &Self) -> bool { + let Self { + target_mode, + extensions: _, + } = self; + let Self { + target_mode: target_mode_other, + extensions: _, + } = other; + + target_mode == target_mode_other + } +} + +impl Eq for RenameOptions {} + /// A specialized `Result` for object store-related errors pub type Result = std::result::Result; diff --git a/src/limit.rs b/src/limit.rs index 7e1e2b4b..7fddd63f 100644 --- a/src/limit.rs +++ b/src/limit.rs @@ -19,8 +19,8 @@ use crate::{ BoxStream, CopyOptions, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, - ObjectMeta, ObjectStore, Path, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, - StreamExt, UploadPart, + ObjectMeta, ObjectStore, Path, PutMultipartOptions, PutOptions, PutPayload, PutResult, + RenameOptions, Result, StreamExt, UploadPart, }; use async_trait::async_trait; use bytes::Bytes; @@ -156,14 +156,9 @@ impl ObjectStore for LimitStore { self.inner.copy_opts(from, to, options).await } - async fn rename(&self, from: &Path, to: &Path) -> Result<()> { + async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> { let _permit = self.semaphore.acquire().await.unwrap(); - self.inner.rename(from, to).await - } - - async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - let _permit = self.semaphore.acquire().await.unwrap(); - self.inner.rename_if_not_exists(from, to).await + self.inner.rename_opts(from, to, options).await } } diff --git a/src/local.rs b/src/local.rs index ebe95272..5b46a7d8 100644 --- a/src/local.rs +++ b/src/local.rs @@ -40,7 +40,7 @@ use crate::{ path::{Path, absolute_path_to_url}, util::InvalidGetRange, }; -use crate::{CopyMode, CopyOptions}; +use crate::{CopyMode, CopyOptions, RenameOptions, RenameTargetMode}; /// A specialized `Error` for filesystem object store-related errors #[derive(Debug, thiserror::Error)] @@ -610,24 +610,52 @@ impl ObjectStore for LocalFileSystem { } } - async fn rename(&self, from: &Path, to: &Path) -> Result<()> { - let from = self.path_to_filesystem(from)?; - let to = self.path_to_filesystem(to)?; - maybe_spawn_blocking(move || { - loop { - match std::fs::rename(&from, &to) { - Ok(_) => return Ok(()), - Err(source) => match source.kind() { - ErrorKind::NotFound => match from.exists() { - true => create_parent_dirs(&to, source)?, - false => return Err(Error::NotFound { path: from, source }.into()), - }, - _ => return Err(Error::UnableToCopyFile { from, to, source }.into()), + async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> { + let RenameOptions { + target_mode, + extensions, + } = options; + + match target_mode { + // optimized implementation + RenameTargetMode::Overwrite => { + let from = self.path_to_filesystem(from)?; + let to = self.path_to_filesystem(to)?; + maybe_spawn_blocking(move || { + loop { + match std::fs::rename(&from, &to) { + Ok(_) => return Ok(()), + Err(source) => match source.kind() { + ErrorKind::NotFound => match from.exists() { + true => create_parent_dirs(&to, source)?, + false => { + return Err(Error::NotFound { path: from, source }.into()); + } + }, + _ => { + return Err(Error::UnableToCopyFile { from, to, source }.into()); + } + }, + } + } + }) + .await + } + // fall-back to copy & delete + RenameTargetMode::Create => { + self.copy_opts( + from, + to, + CopyOptions { + mode: CopyMode::Create, + extensions, }, - } + ) + .await?; + self.delete(from).await?; + Ok(()) } - }) - .await + } } } diff --git a/src/prefix.rs b/src/prefix.rs index c37f55dc..52173dd4 100644 --- a/src/prefix.rs +++ b/src/prefix.rs @@ -23,7 +23,7 @@ use std::ops::Range; use crate::path::Path; use crate::{ CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, - PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, + PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions, Result, }; /// Store wrapper that applies a constant prefix to all paths handled by the store. @@ -188,16 +188,10 @@ impl ObjectStore for PrefixStore { self.inner.copy_opts(&full_from, &full_to, options).await } - async fn rename(&self, from: &Path, to: &Path) -> Result<()> { + async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> { let full_from = self.full_path(from); let full_to = self.full_path(to); - self.inner.rename(&full_from, &full_to).await - } - - async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - let full_from = self.full_path(from); - let full_to = self.full_path(to); - self.inner.rename_if_not_exists(&full_from, &full_to).await + self.inner.rename_opts(&full_from, &full_to, options).await } } diff --git a/src/throttle.rs b/src/throttle.rs index bd5795ef..3820608b 100644 --- a/src/throttle.rs +++ b/src/throttle.rs @@ -21,7 +21,7 @@ use std::ops::Range; use std::{convert::TryInto, sync::Arc}; use crate::multipart::{MultipartStore, PartId}; -use crate::{CopyOptions, GetOptions, UploadPart}; +use crate::{CopyOptions, GetOptions, RenameOptions, UploadPart}; use crate::{ GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, path::Path, @@ -261,16 +261,10 @@ impl ObjectStore for ThrottledStore { self.inner.copy_opts(from, to, options).await } - async fn rename(&self, from: &Path, to: &Path) -> Result<()> { + async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> { sleep(self.config().wait_put_per_call).await; - self.inner.rename(from, to).await - } - - async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - sleep(self.config().wait_put_per_call).await; - - self.inner.rename_if_not_exists(from, to).await + self.inner.rename_opts(from, to, options).await } }