diff --git a/src/aws/client.rs b/src/aws/client.rs index 63371875..3394dbba 100644 --- a/src/aws/client.rs +++ b/src/aws/client.rs @@ -69,6 +69,7 @@ pub(crate) enum Error { #[error("Error performing DeleteObjects request: {}", source)] DeleteObjectsRequest { source: crate::client::retry::RetryError, + paths: Vec, }, #[error( @@ -127,6 +128,7 @@ impl From for crate::Error { fn from(err: Error) -> Self { match err { Error::CompleteMultipartRequest { source, path } => source.error(STORE, path), + Error::DeleteObjectsRequest { source, paths } => source.error(STORE, paths.join(",")), _ => Self::Generic { store: STORE, source: Box::new(err), @@ -551,7 +553,10 @@ impl S3Client { .with_aws_sigv4(credential.authorizer(), Some(digest.as_ref())) .send_retry(&self.config.retry_config) .await - .map_err(|source| Error::DeleteObjectsRequest { source })? + .map_err(|source| Error::DeleteObjectsRequest { + source, + paths: paths.iter().map(|p| p.to_string()).collect(), + })? .into_body() .bytes() .await diff --git a/src/aws/mod.rs b/src/aws/mod.rs index 3e658afc..7b90c5bc 100644 --- a/src/aws/mod.rs +++ b/src/aws/mod.rs @@ -250,11 +250,6 @@ impl ObjectStore for AmazonS3 { self.client.get_opts(location, options).await } - async fn delete(&self, location: &Path) -> Result<()> { - self.client.request(Method::DELETE, location).send().await?; - Ok(()) - } - fn delete_stream( &self, locations: BoxStream<'static, Result>, diff --git a/src/azure/client.rs b/src/azure/client.rs index bce2c246..54ab3077 100644 --- a/src/azure/client.rs +++ b/src/azure/client.rs @@ -74,12 +74,6 @@ pub(crate) enum Error { path: String, }, - #[error("Error performing delete request {}: {}", path, source)] - DeleteRequest { - source: crate::client::retry::RetryError, - path: String, - }, - #[error("Error performing bulk delete request: {}", source)] BulkDeleteRequest { source: crate::client::retry::RetryError, @@ -150,9 +144,9 @@ pub(crate) enum Error { impl From for crate::Error { fn from(err: Error) -> Self { match err { - Error::GetRequest { source, path } - | Error::DeleteRequest { source, path } - | Error::PutRequest { source, path } => source.error(STORE, path), + Error::GetRequest { source, path } | Error::PutRequest { source, path } => { + source.error(STORE, path) + } _ => Self::Generic { store: STORE, source: Box::new(err), @@ -627,36 +621,6 @@ impl AzureClient { .map_err(|source| Error::Metadata { source })?) } - /// Make an Azure Delete request - pub(crate) async fn delete_request( - &self, - path: &Path, - query: &T, - ) -> Result<()> { - let credential = self.get_credential().await?; - let url = self.config.path_url(path); - - let sensitive = credential - .as_deref() - .map(|c| c.sensitive_request()) - .unwrap_or_default(); - self.client - .delete(url.as_str()) - .query(query) - .header(&DELETE_SNAPSHOTS, "include") - .with_azure_authorization(&credential, &self.config.account) - .retryable(&self.config.retry_config) - .sensitive(sensitive) - .send() - .await - .map_err(|source| { - let path = path.as_ref().into(); - Error::DeleteRequest { source, path } - })?; - - Ok(()) - } - fn build_bulk_delete_body( &self, boundary: &str, diff --git a/src/azure/credential.rs b/src/azure/credential.rs index ae630a64..dcc6cdd0 100644 --- a/src/azure/credential.rs +++ b/src/azure/credential.rs @@ -47,7 +47,6 @@ use url::Url; static AZURE_VERSION: HeaderValue = HeaderValue::from_static("2023-11-03"); static VERSION: HeaderName = HeaderName::from_static("x-ms-version"); pub(crate) static BLOB_TYPE: HeaderName = HeaderName::from_static("x-ms-blob-type"); -pub(crate) static DELETE_SNAPSHOTS: HeaderName = HeaderName::from_static("x-ms-delete-snapshots"); pub(crate) static COPY_SOURCE: HeaderName = HeaderName::from_static("x-ms-copy-source"); static CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5"); static PARTNER_TOKEN: HeaderName = HeaderName::from_static("x-ms-partner-token"); diff --git a/src/azure/mod.rs b/src/azure/mod.rs index d22ffcf8..04c8f31d 100644 --- a/src/azure/mod.rs +++ b/src/azure/mod.rs @@ -117,13 +117,10 @@ impl ObjectStore for MicrosoftAzure { self.client.get_opts(location, options).await } - async fn delete(&self, location: &Path) -> Result<()> { - self.client.delete_request(location, &()).await - } - fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { self.client.list(prefix) } + fn delete_stream( &self, locations: BoxStream<'static, Result>, diff --git a/src/chunked.rs b/src/chunked.rs index 49632ed6..b362366d 100644 --- a/src/chunked.rs +++ b/src/chunked.rs @@ -139,10 +139,6 @@ impl ObjectStore for ChunkedStore { self.inner.get_ranges(location, ranges).await } - async fn delete(&self, location: &Path) -> Result<()> { - self.inner.delete(location).await - } - fn delete_stream( &self, locations: BoxStream<'static, Result>, diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs index 270b89ae..2fb74b4f 100644 --- a/src/gcp/mod.rs +++ b/src/gcp/mod.rs @@ -181,10 +181,6 @@ impl ObjectStore for GoogleCloudStorage { self.client.get_opts(location, options).await } - async fn delete(&self, location: &Path) -> Result<()> { - self.client.delete_request(location).await - } - fn delete_stream( &self, locations: BoxStream<'static, Result>, diff --git a/src/http/mod.rs b/src/http/mod.rs index 673419c6..e2410020 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -132,10 +132,6 @@ impl ObjectStore for HttpStore { self.client.get_opts(location, options).await } - async fn delete(&self, location: &Path) -> Result<()> { - self.client.delete(location).await - } - fn delete_stream( &self, locations: BoxStream<'static, Result>, diff --git a/src/lib.rs b/src/lib.rs index b0cf542d..6ffdf6d1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -853,9 +853,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { .await } - /// Delete the object at the specified location. - async fn delete(&self, location: &Path) -> Result<()>; - /// Delete all the objects at the specified locations /// /// When supported, this method will use bulk operations that delete more @@ -958,10 +955,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// # todo!() /// # } /// # - /// # async fn delete(&self, _: &Path) -> Result<()> { - /// # todo!() - /// # } - /// # /// fn delete_stream( /// &self, /// locations: BoxStream<'static, Result>, @@ -1107,10 +1100,6 @@ macro_rules! as_ref_impl { self.as_ref().get_ranges(location, ranges).await } - async fn delete(&self, location: &Path) -> Result<()> { - self.as_ref().delete(location).await - } - fn delete_stream( &self, locations: BoxStream<'static, Result>, @@ -1250,6 +1239,9 @@ pub trait ObjectStoreExt: ObjectStore { /// Return the metadata for the specified location fn head(&self, location: &Path) -> impl Future>; + /// Delete the object at the specified location. + fn delete(&self, location: &Path) -> impl Future>; + /// Copy an object from one path to another in the same object store. /// /// If there exists an object at the destination, it will be overwritten. @@ -1306,6 +1298,24 @@ where Ok(self.get_opts(location, options).await?.meta) } + async fn delete(&self, location: &Path) -> Result<()> { + let location = location.clone(); + let mut stream = + self.delete_stream(futures::stream::once(async move { Ok(location) }).boxed()); + let _path = stream.try_next().await?.ok_or_else(|| Error::Generic { + store: "ext", + source: "`delete_stream` with one location should yield once but didn't".into(), + })?; + if stream.next().await.is_some() { + Err(Error::Generic { + store: "ext", + source: "`delete_stream` with one location expected to yield exactly once, but yielded more than once".into(), + }) + } else { + Ok(()) + } + } + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { let options = CopyOptions::new().with_mode(CopyMode::Overwrite); self.copy_opts(from, to, options).await diff --git a/src/limit.rs b/src/limit.rs index 7fddd63f..30fe2b65 100644 --- a/src/limit.rs +++ b/src/limit.rs @@ -105,16 +105,18 @@ impl ObjectStore for LimitStore { self.inner.get_ranges(location, ranges).await } - async fn delete(&self, location: &Path) -> Result<()> { - let _permit = self.semaphore.acquire().await.unwrap(); - self.inner.delete(location).await - } - fn delete_stream( &self, locations: BoxStream<'static, Result>, ) -> BoxStream<'static, Result> { - self.inner.delete_stream(locations) + let inner = Arc::clone(&self.inner); + let fut = Arc::clone(&self.semaphore) + .acquire_owned() + .map(move |permit| { + let s = inner.delete_stream(locations); + PermitWrapper::new(s, permit.unwrap()) + }); + fut.into_stream().flatten().boxed() } fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { diff --git a/src/local.rs b/src/local.rs index 5b46a7d8..003e7d70 100644 --- a/src/local.rs +++ b/src/local.rs @@ -40,7 +40,7 @@ use crate::{ path::{Path, absolute_path_to_url}, util::InvalidGetRange, }; -use crate::{CopyMode, CopyOptions, RenameOptions, RenameTargetMode}; +use crate::{CopyMode, CopyOptions, ObjectStoreExt, RenameOptions, RenameTargetMode}; /// A specialized `Error` for filesystem object store-related errors #[derive(Debug, thiserror::Error)] @@ -444,14 +444,6 @@ impl ObjectStore for LocalFileSystem { .await } - async fn delete(&self, location: &Path) -> Result<()> { - let config = Arc::clone(&self.config); - let automatic_cleanup = self.automatic_cleanup; - let location = location.clone(); - maybe_spawn_blocking(move || Self::delete_location(config, automatic_cleanup, &location)) - .await - } - fn delete_stream( &self, locations: BoxStream<'static, Result>, diff --git a/src/memory.rs b/src/memory.rs index 08e41c29..f026907d 100644 --- a/src/memory.rs +++ b/src/memory.rs @@ -294,11 +294,6 @@ impl ObjectStore for InMemory { .collect() } - async fn delete(&self, location: &Path) -> Result<()> { - self.storage.write().map.remove(location); - Ok(()) - } - fn delete_stream( &self, locations: BoxStream<'static, Result>, diff --git a/src/prefix.rs b/src/prefix.rs index 52173dd4..cecf03ff 100644 --- a/src/prefix.rs +++ b/src/prefix.rs @@ -124,11 +124,6 @@ impl ObjectStore for PrefixStore { self.inner.get_ranges(&full_path, ranges).await } - async fn delete(&self, location: &Path) -> Result<()> { - let full_path = self.full_path(location); - self.inner.delete(&full_path).await - } - fn delete_stream( &self, locations: BoxStream<'static, Result>, diff --git a/src/throttle.rs b/src/throttle.rs index 3820608b..1fc90d7e 100644 --- a/src/throttle.rs +++ b/src/throttle.rs @@ -34,10 +34,13 @@ use std::time::Duration; /// Configuration settings for throttled store #[derive(Debug, Default, Clone, Copy)] pub struct ThrottleConfig { - /// Sleep duration for every call to [`delete`](ThrottledStore::delete). + /// Sleep duration for every call to [`delete`], or every element in [`delete_stream`]. /// /// Sleeping is done before the underlying store is called and independently of the success of /// the operation. + /// + /// [`delete`]: crate::ObjectStoreExt::delete + /// [`delete_stream`]: ThrottledStore::delete_stream pub wait_delete_per_call: Duration, /// Sleep duration for every byte received during [`get_opts`](ThrottledStore::get_opts). @@ -193,12 +196,6 @@ impl ObjectStore for ThrottledStore { self.inner.get_ranges(location, ranges).await } - async fn delete(&self, location: &Path) -> Result<()> { - sleep(self.config().wait_delete_per_call).await; - - self.inner.delete(location).await - } - fn delete_stream( &self, locations: BoxStream<'static, Result>, diff --git a/tests/get_range_file.rs b/tests/get_range_file.rs index df0a4142..95027eb2 100644 --- a/tests/get_range_file.rs +++ b/tests/get_range_file.rs @@ -58,10 +58,6 @@ impl ObjectStore for MyStore { self.0.get_opts(location, options).await } - async fn delete(&self, _: &Path) -> Result<()> { - todo!() - } - fn delete_stream( &self, _: BoxStream<'static, Result>,