Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 50 additions & 5 deletions src/prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,27 @@ use bytes::Bytes;
use futures::{StreamExt, TryStreamExt, stream::BoxStream};
use std::ops::Range;

use crate::multipart::{MultipartStore, PartId};
use crate::path::Path;
use crate::{
CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions, Result,
CopyOptions, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta,
ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions, Result,
};

/// Store wrapper that applies a constant prefix to all paths handled by the store.
#[derive(Debug, Clone)]
pub struct PrefixStore<T: ObjectStore> {
pub struct PrefixStore<T> {
prefix: Path,
inner: T,
}

impl<T: ObjectStore> std::fmt::Display for PrefixStore<T> {
impl<T> std::fmt::Display for PrefixStore<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PrefixObjectStore({})", self.prefix.as_ref())
}
}

impl<T: ObjectStore> PrefixStore<T> {
impl<T> PrefixStore<T> {
/// Create a new instance of [`PrefixStore`]
pub fn new(store: T, prefix: impl Into<Path>) -> Self {
Self {
Expand Down Expand Up @@ -190,13 +191,48 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
}
}

#[async_trait::async_trait]
impl<T: MultipartStore> MultipartStore for PrefixStore<T> {
async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
let full_path = self.full_path(path);
self.inner.create_multipart(&full_path).await
}

async fn put_part(
&self,
path: &Path,
id: &MultipartId,
part_idx: usize,
data: PutPayload,
) -> Result<PartId> {
let full_path = self.full_path(path);
self.inner.put_part(&full_path, id, part_idx, data).await
}

async fn complete_multipart(
&self,
path: &Path,
id: &MultipartId,
parts: Vec<PartId>,
) -> Result<PutResult> {
let full_path = self.full_path(path);
self.inner.complete_multipart(&full_path, id, parts).await
}

async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
let full_path = self.full_path(path);
self.inner.abort_multipart(&full_path, id).await
}
}

#[cfg(not(target_arch = "wasm32"))]
#[cfg(test)]
mod tests {
use std::slice;

use super::*;
use crate::local::LocalFileSystem;
use crate::memory::InMemory;
use crate::{ObjectStoreExt, integration::*};

use tempfile::TempDir;
Expand Down Expand Up @@ -262,4 +298,13 @@ mod tests {
let read_data = local.get(&location).await.unwrap().bytes().await.unwrap();
assert_eq!(&*read_data, data)
}

#[tokio::test]
async fn prefix_multipart() {
let store = PrefixStore::new(InMemory::new(), "prefix");

multipart(&store, &store).await;
multipart_out_of_order(&store).await;
multipart_race_condition(&store, true).await;
}
}