Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub(crate) enum Error {
#[error("Error performing DeleteObjects request: {}", source)]
DeleteObjectsRequest {
source: crate::client::retry::RetryError,
paths: Vec<String>,
},

#[error(
Expand Down Expand Up @@ -127,6 +128,7 @@ impl From<Error> for crate::Error {
fn from(err: Error) -> Self {
match err {
Error::CompleteMultipartRequest { source, path } => source.error(STORE, path),
Error::DeleteObjectsRequest { source, paths } => source.error(STORE, paths.join(",")),
_ => Self::Generic {
store: STORE,
source: Box::new(err),
Expand Down Expand Up @@ -551,7 +553,10 @@ impl S3Client {
.with_aws_sigv4(credential.authorizer(), Some(digest.as_ref()))
.send_retry(&self.config.retry_config)
.await
.map_err(|source| Error::DeleteObjectsRequest { source })?
.map_err(|source| Error::DeleteObjectsRequest {
source,
paths: paths.iter().map(|p| p.to_string()).collect(),
})?
.into_body()
.bytes()
.await
Expand Down
5 changes: 0 additions & 5 deletions src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,6 @@ impl ObjectStore for AmazonS3 {
self.client.get_opts(location, options).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.client.request(Method::DELETE, location).send().await?;
Ok(())
}

fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
Expand Down
42 changes: 3 additions & 39 deletions src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,6 @@ pub(crate) enum Error {
path: String,
},

#[error("Error performing delete request {}: {}", path, source)]
DeleteRequest {
source: crate::client::retry::RetryError,
path: String,
},

#[error("Error performing bulk delete request: {}", source)]
BulkDeleteRequest {
source: crate::client::retry::RetryError,
Expand Down Expand Up @@ -150,9 +144,9 @@ pub(crate) enum Error {
impl From<Error> for crate::Error {
fn from(err: Error) -> Self {
match err {
Error::GetRequest { source, path }
| Error::DeleteRequest { source, path }
| Error::PutRequest { source, path } => source.error(STORE, path),
Error::GetRequest { source, path } | Error::PutRequest { source, path } => {
source.error(STORE, path)
}
_ => Self::Generic {
store: STORE,
source: Box::new(err),
Expand Down Expand Up @@ -627,36 +621,6 @@ impl AzureClient {
.map_err(|source| Error::Metadata { source })?)
}

/// Make an Azure Delete request <https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob>
pub(crate) async fn delete_request<T: Serialize + ?Sized + Sync>(
&self,
path: &Path,
query: &T,
) -> Result<()> {
let credential = self.get_credential().await?;
let url = self.config.path_url(path);

let sensitive = credential
.as_deref()
.map(|c| c.sensitive_request())
.unwrap_or_default();
self.client
.delete(url.as_str())
.query(query)
.header(&DELETE_SNAPSHOTS, "include")
.with_azure_authorization(&credential, &self.config.account)
.retryable(&self.config.retry_config)
.sensitive(sensitive)
.send()
.await
.map_err(|source| {
let path = path.as_ref().into();
Error::DeleteRequest { source, path }
})?;

Ok(())
}

fn build_bulk_delete_body(
&self,
boundary: &str,
Expand Down
1 change: 0 additions & 1 deletion src/azure/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ use url::Url;
static AZURE_VERSION: HeaderValue = HeaderValue::from_static("2023-11-03");
static VERSION: HeaderName = HeaderName::from_static("x-ms-version");
pub(crate) static BLOB_TYPE: HeaderName = HeaderName::from_static("x-ms-blob-type");
pub(crate) static DELETE_SNAPSHOTS: HeaderName = HeaderName::from_static("x-ms-delete-snapshots");
pub(crate) static COPY_SOURCE: HeaderName = HeaderName::from_static("x-ms-copy-source");
static CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5");
static PARTNER_TOKEN: HeaderName = HeaderName::from_static("x-ms-partner-token");
Expand Down
5 changes: 1 addition & 4 deletions src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,10 @@ impl ObjectStore for MicrosoftAzure {
self.client.get_opts(location, options).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.client.delete_request(location, &()).await
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.client.list(prefix)
}

fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
Expand Down
4 changes: 0 additions & 4 deletions src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,6 @@ impl ObjectStore for ChunkedStore {
self.inner.get_ranges(location, ranges).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.inner.delete(location).await
}

fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
Expand Down
4 changes: 0 additions & 4 deletions src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,6 @@ impl ObjectStore for GoogleCloudStorage {
self.client.get_opts(location, options).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.client.delete_request(location).await
}

fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
Expand Down
4 changes: 0 additions & 4 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,6 @@ impl ObjectStore for HttpStore {
self.client.get_opts(location, options).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.client.delete(location).await
}

fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
Expand Down
32 changes: 21 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -853,9 +853,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
.await
}

/// Delete the object at the specified location.
async fn delete(&self, location: &Path) -> Result<()>;

/// Delete all the objects at the specified locations
///
/// When supported, this method will use bulk operations that delete more
Expand Down Expand Up @@ -958,10 +955,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// # todo!()
/// # }
/// #
/// # async fn delete(&self, _: &Path) -> Result<()> {
/// # todo!()
/// # }
/// #
/// fn delete_stream(
/// &self,
/// locations: BoxStream<'static, Result<Path>>,
Expand Down Expand Up @@ -1107,10 +1100,6 @@ macro_rules! as_ref_impl {
self.as_ref().get_ranges(location, ranges).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.as_ref().delete(location).await
}

fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
Expand Down Expand Up @@ -1250,6 +1239,9 @@ pub trait ObjectStoreExt: ObjectStore {
/// Return the metadata for the specified location
fn head(&self, location: &Path) -> impl Future<Output = Result<ObjectMeta>>;

/// Delete the object at the specified location.
fn delete(&self, location: &Path) -> impl Future<Output = Result<()>>;

/// Copy an object from one path to another in the same object store.
///
/// If there exists an object at the destination, it will be overwritten.
Expand Down Expand Up @@ -1306,6 +1298,24 @@ where
Ok(self.get_opts(location, options).await?.meta)
}

async fn delete(&self, location: &Path) -> Result<()> {
let location = location.clone();
let mut stream =
self.delete_stream(futures::stream::once(async move { Ok(location) }).boxed());
let _path = stream.try_next().await?.ok_or_else(|| Error::Generic {
store: "ext",
source: "`delete_stream` was supposed to yield once but didn't".into(),
})?;
if stream.next().await.is_some() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Err(Error::Generic {
store: "ext",
source: "`delete_stream` yielded more than once".into(),
})
} else {
Ok(())
}
}

async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
let options = CopyOptions::new().with_mode(CopyMode::Overwrite);
self.copy_opts(from, to, options).await
Expand Down
14 changes: 8 additions & 6 deletions src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,18 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
self.inner.get_ranges(location, ranges).await
}

async fn delete(&self, location: &Path) -> Result<()> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.delete(location).await
}

fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
) -> BoxStream<'static, Result<Path>> {
self.inner.delete_stream(locations)
let inner = Arc::clone(&self.inner);
let fut = Arc::clone(&self.semaphore)
.acquire_owned()
.map(move |permit| {
let s = inner.delete_stream(locations);
PermitWrapper::new(s, permit.unwrap())
});
fut.into_stream().flatten().boxed()
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
Expand Down
10 changes: 1 addition & 9 deletions src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::{
path::{Path, absolute_path_to_url},
util::InvalidGetRange,
};
use crate::{CopyMode, CopyOptions, RenameOptions, RenameTargetMode};
use crate::{CopyMode, CopyOptions, ObjectStoreExt, RenameOptions, RenameTargetMode};

/// A specialized `Error` for filesystem object store-related errors
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -444,14 +444,6 @@ impl ObjectStore for LocalFileSystem {
.await
}

async fn delete(&self, location: &Path) -> Result<()> {
let config = Arc::clone(&self.config);
let automatic_cleanup = self.automatic_cleanup;
let location = location.clone();
maybe_spawn_blocking(move || Self::delete_location(config, automatic_cleanup, &location))
.await
}

fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
Expand Down
5 changes: 0 additions & 5 deletions src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,6 @@ impl ObjectStore for InMemory {
.collect()
}

async fn delete(&self, location: &Path) -> Result<()> {
self.storage.write().map.remove(location);
Ok(())
}

fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
Expand Down
5 changes: 0 additions & 5 deletions src/prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,6 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
self.inner.get_ranges(&full_path, ranges).await
}

async fn delete(&self, location: &Path) -> Result<()> {
let full_path = self.full_path(location);
self.inner.delete(&full_path).await
}

fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
Expand Down
11 changes: 4 additions & 7 deletions src/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ use std::time::Duration;
/// Configuration settings for throttled store
#[derive(Debug, Default, Clone, Copy)]
pub struct ThrottleConfig {
/// Sleep duration for every call to [`delete`](ThrottledStore::delete).
/// Sleep duration for every call to [`delete`], or every element in [`delete_stream`].
///
/// Sleeping is done before the underlying store is called and independently of the success of
/// the operation.
///
/// [`delete`]: crate::ObjectStoreExt::delete
/// [`delete_stream`]: ThrottledStore::delete_stream
pub wait_delete_per_call: Duration,

/// Sleep duration for every byte received during [`get_opts`](ThrottledStore::get_opts).
Expand Down Expand Up @@ -193,12 +196,6 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
self.inner.get_ranges(location, ranges).await
}

async fn delete(&self, location: &Path) -> Result<()> {
sleep(self.config().wait_delete_per_call).await;

self.inner.delete(location).await
}

fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
Expand Down
4 changes: 0 additions & 4 deletions tests/get_range_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ impl ObjectStore for MyStore {
self.0.get_opts(location, options).await
}

async fn delete(&self, _: &Path) -> Result<()> {
todo!()
}

fn delete_stream(
&self,
_: BoxStream<'static, Result<Path>>,
Expand Down