diff --git a/src/prefix.rs b/src/prefix.rs index cecf03f..67456fd 100644 --- a/src/prefix.rs +++ b/src/prefix.rs @@ -20,26 +20,27 @@ use bytes::Bytes; use futures::{StreamExt, TryStreamExt, stream::BoxStream}; use std::ops::Range; +use crate::multipart::{MultipartStore, PartId}; use crate::path::Path; use crate::{ - CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, - PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions, Result, + CopyOptions, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, + ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions, Result, }; /// Store wrapper that applies a constant prefix to all paths handled by the store. #[derive(Debug, Clone)] -pub struct PrefixStore { +pub struct PrefixStore { prefix: Path, inner: T, } -impl std::fmt::Display for PrefixStore { +impl std::fmt::Display for PrefixStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "PrefixObjectStore({})", self.prefix.as_ref()) } } -impl PrefixStore { +impl PrefixStore { /// Create a new instance of [`PrefixStore`] pub fn new(store: T, prefix: impl Into) -> Self { Self { @@ -190,6 +191,40 @@ impl ObjectStore for PrefixStore { } } +#[async_trait::async_trait] +impl MultipartStore for PrefixStore { + async fn create_multipart(&self, path: &Path) -> Result { + let full_path = self.full_path(path); + self.inner.create_multipart(&full_path).await + } + + async fn put_part( + &self, + path: &Path, + id: &MultipartId, + part_idx: usize, + data: PutPayload, + ) -> Result { + let full_path = self.full_path(path); + self.inner.put_part(&full_path, id, part_idx, data).await + } + + async fn complete_multipart( + &self, + path: &Path, + id: &MultipartId, + parts: Vec, + ) -> Result { + let full_path = self.full_path(path); + self.inner.complete_multipart(&full_path, id, parts).await + } + + async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> { + let full_path = self.full_path(path); + self.inner.abort_multipart(&full_path, id).await + } +} + #[cfg(not(target_arch = "wasm32"))] #[cfg(test)] mod tests { @@ -197,6 +232,7 @@ mod tests { use super::*; use crate::local::LocalFileSystem; + use crate::memory::InMemory; use crate::{ObjectStoreExt, integration::*}; use tempfile::TempDir; @@ -262,4 +298,13 @@ mod tests { let read_data = local.get(&location).await.unwrap().bytes().await.unwrap(); assert_eq!(&*read_data, data) } + + #[tokio::test] + async fn prefix_multipart() { + let store = PrefixStore::new(InMemory::new(), "prefix"); + + multipart(&store, &store).await; + multipart_out_of_order(&store).await; + multipart_race_condition(&store, true).await; + } }