Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub(crate) enum Error {
#[error("Error performing DeleteObjects request: {}", source)]
DeleteObjectsRequest {
source: crate::client::retry::RetryError,
paths: Vec<String>,
},

#[error(
Expand Down Expand Up @@ -127,6 +128,7 @@ impl From<Error> for crate::Error {
fn from(err: Error) -> Self {
match err {
Error::CompleteMultipartRequest { source, path } => source.error(STORE, path),
Error::DeleteObjectsRequest { source, paths } => source.error(STORE, paths.join(",")),
_ => Self::Generic {
store: STORE,
source: Box::new(err),
Expand Down Expand Up @@ -551,7 +553,10 @@ impl S3Client {
.with_aws_sigv4(credential.authorizer(), Some(digest.as_ref()))
.send_retry(&self.config.retry_config)
.await
.map_err(|source| Error::DeleteObjectsRequest { source })?
.map_err(|source| Error::DeleteObjectsRequest {
source,
paths: paths.iter().map(|p| p.to_string()).collect(),
})?
.into_body()
.bytes()
.await
Expand Down
5 changes: 0 additions & 5 deletions src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,6 @@ impl ObjectStore for AmazonS3 {
self.client.get_opts(location, options).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.client.request(Method::DELETE, location).send().await?;
Ok(())
}

fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
Expand Down
42 changes: 3 additions & 39 deletions src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,6 @@ pub(crate) enum Error {
path: String,
},

#[error("Error performing delete request {}: {}", path, source)]
DeleteRequest {
source: crate::client::retry::RetryError,
path: String,
},

#[error("Error performing bulk delete request: {}", source)]
BulkDeleteRequest {
source: crate::client::retry::RetryError,
Expand Down Expand Up @@ -150,9 +144,9 @@ pub(crate) enum Error {
impl From<Error> for crate::Error {
fn from(err: Error) -> Self {
match err {
Error::GetRequest { source, path }
| Error::DeleteRequest { source, path }
| Error::PutRequest { source, path } => source.error(STORE, path),
Error::GetRequest { source, path } | Error::PutRequest { source, path } => {
source.error(STORE, path)
}
_ => Self::Generic {
store: STORE,
source: Box::new(err),
Expand Down Expand Up @@ -627,36 +621,6 @@ impl AzureClient {
.map_err(|source| Error::Metadata { source })?)
}

/// Make an Azure Delete request <https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob>
pub(crate) async fn delete_request<T: Serialize + ?Sized + Sync>(
&self,
path: &Path,
query: &T,
) -> Result<()> {
let credential = self.get_credential().await?;
let url = self.config.path_url(path);

let sensitive = credential
.as_deref()
.map(|c| c.sensitive_request())
.unwrap_or_default();
self.client
.delete(url.as_str())
.query(query)
.header(&DELETE_SNAPSHOTS, "include")
.with_azure_authorization(&credential, &self.config.account)
.retryable(&self.config.retry_config)
.sensitive(sensitive)
.send()
.await
.map_err(|source| {
let path = path.as_ref().into();
Error::DeleteRequest { source, path }
})?;

Ok(())
}

fn build_bulk_delete_body(
&self,
boundary: &str,
Expand Down
1 change: 0 additions & 1 deletion src/azure/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ use url::Url;
static AZURE_VERSION: HeaderValue = HeaderValue::from_static("2023-11-03");
static VERSION: HeaderName = HeaderName::from_static("x-ms-version");
pub(crate) static BLOB_TYPE: HeaderName = HeaderName::from_static("x-ms-blob-type");
pub(crate) static DELETE_SNAPSHOTS: HeaderName = HeaderName::from_static("x-ms-delete-snapshots");
pub(crate) static COPY_SOURCE: HeaderName = HeaderName::from_static("x-ms-copy-source");
static CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5");
static PARTNER_TOKEN: HeaderName = HeaderName::from_static("x-ms-partner-token");
Expand Down
5 changes: 1 addition & 4 deletions src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,10 @@ impl ObjectStore for MicrosoftAzure {
self.client.get_opts(location, options).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.client.delete_request(location, &()).await
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.client.list(prefix)
}

fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
Expand Down
4 changes: 0 additions & 4 deletions src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,6 @@ impl ObjectStore for ChunkedStore {
self.inner.get_ranges(location, ranges).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.inner.delete(location).await
}

fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
Expand Down
4 changes: 0 additions & 4 deletions src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,6 @@ impl ObjectStore for GoogleCloudStorage {
self.client.get_opts(location, options).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.client.delete_request(location).await
}

fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
Expand Down
4 changes: 0 additions & 4 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,6 @@ impl ObjectStore for HttpStore {
self.client.get_opts(location, options).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.client.delete(location).await
}

fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
Expand Down
32 changes: 21 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -853,9 +853,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
.await
}

/// Delete the object at the specified location.
async fn delete(&self, location: &Path) -> Result<()>;

/// Delete all the objects at the specified locations
///
/// When supported, this method will use bulk operations that delete more
Expand Down Expand Up @@ -958,10 +955,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// # todo!()
/// # }
/// #
/// # async fn delete(&self, _: &Path) -> Result<()> {
/// # todo!()
/// # }
/// #
/// fn delete_stream(
/// &self,
/// locations: BoxStream<'static, Result<Path>>,
Expand Down Expand Up @@ -1107,10 +1100,6 @@ macro_rules! as_ref_impl {
self.as_ref().get_ranges(location, ranges).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.as_ref().delete(location).await
}

fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
Expand Down Expand Up @@ -1250,6 +1239,9 @@ pub trait ObjectStoreExt: ObjectStore {
/// Return the metadata for the specified location
fn head(&self, location: &Path) -> impl Future<Output = Result<ObjectMeta>>;

/// Delete the object at the specified location.
fn delete(&self, location: &Path) -> impl Future<Output = Result<()>>;

/// Copy an object from one path to another in the same object store.
///
/// If there exists an object at the destination, it will be overwritten.
Expand Down Expand Up @@ -1306,6 +1298,24 @@ where
Ok(self.get_opts(location, options).await?.meta)
}

async fn delete(&self, location: &Path) -> Result<()> {
let location = location.clone();
let mut stream =
self.delete_stream(futures::stream::once(async move { Ok(location) }).boxed());
let _path = stream.try_next().await?.ok_or_else(|| Error::Generic {
store: "ext",
source: "`delete_stream` with one location should yield once but didn't".into(),
})?;
if stream.next().await.is_some() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Err(Error::Generic {
store: "ext",
source: "`delete_stream` with one location expected to yield exactly once, but yielded more than once".into(),
})
} else {
Ok(())
}
}

async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
let options = CopyOptions::new().with_mode(CopyMode::Overwrite);
self.copy_opts(from, to, options).await
Expand Down
14 changes: 8 additions & 6 deletions src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,18 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
self.inner.get_ranges(location, ranges).await
}

async fn delete(&self, location: &Path) -> Result<()> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.delete(location).await
}

fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
) -> BoxStream<'static, Result<Path>> {
self.inner.delete_stream(locations)
let inner = Arc::clone(&self.inner);
let fut = Arc::clone(&self.semaphore)
.acquire_owned()
.map(move |permit| {
let s = inner.delete_stream(locations);
PermitWrapper::new(s, permit.unwrap())
});
fut.into_stream().flatten().boxed()
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
Expand Down
10 changes: 1 addition & 9 deletions src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::{
path::{Path, absolute_path_to_url},
util::InvalidGetRange,
};
use crate::{CopyMode, CopyOptions, RenameOptions, RenameTargetMode};
use crate::{CopyMode, CopyOptions, ObjectStoreExt, RenameOptions, RenameTargetMode};

/// A specialized `Error` for filesystem object store-related errors
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -444,14 +444,6 @@ impl ObjectStore for LocalFileSystem {
.await
}

async fn delete(&self, location: &Path) -> Result<()> {
let config = Arc::clone(&self.config);
let automatic_cleanup = self.automatic_cleanup;
let location = location.clone();
maybe_spawn_blocking(move || Self::delete_location(config, automatic_cleanup, &location))
.await
}

fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
Expand Down
5 changes: 0 additions & 5 deletions src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,6 @@ impl ObjectStore for InMemory {
.collect()
}

async fn delete(&self, location: &Path) -> Result<()> {
self.storage.write().map.remove(location);
Ok(())
}

fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
Expand Down
5 changes: 0 additions & 5 deletions src/prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,6 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
self.inner.get_ranges(&full_path, ranges).await
}

async fn delete(&self, location: &Path) -> Result<()> {
let full_path = self.full_path(location);
self.inner.delete(&full_path).await
}

fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
Expand Down
11 changes: 4 additions & 7 deletions src/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ use std::time::Duration;
/// Configuration settings for throttled store
#[derive(Debug, Default, Clone, Copy)]
pub struct ThrottleConfig {
/// Sleep duration for every call to [`delete`](ThrottledStore::delete).
/// Sleep duration for every call to [`delete`], or every element in [`delete_stream`].
///
/// Sleeping is done before the underlying store is called and independently of the success of
/// the operation.
///
/// [`delete`]: crate::ObjectStoreExt::delete
/// [`delete_stream`]: ThrottledStore::delete_stream
pub wait_delete_per_call: Duration,

/// Sleep duration for every byte received during [`get_opts`](ThrottledStore::get_opts).
Expand Down Expand Up @@ -193,12 +196,6 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
self.inner.get_ranges(location, ranges).await
}

async fn delete(&self, location: &Path) -> Result<()> {
sleep(self.config().wait_delete_per_call).await;

self.inner.delete(location).await
}

fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
Expand Down
4 changes: 0 additions & 4 deletions tests/get_range_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ impl ObjectStore for MyStore {
self.0.get_opts(location, options).await
}

async fn delete(&self, _: &Path) -> Result<()> {
todo!()
}

fn delete_stream(
&self,
_: BoxStream<'static, Result<Path>>,
Expand Down