From 51d941998a039603288b4d09e07efffd4c0f76cb Mon Sep 17 00:00:00 2001 From: Lachlan Deakin Date: Fri, 22 Dec 2023 16:39:53 +1100 Subject: [PATCH] Revise storage traits - WritableStorageTraits no longer requires ReadableStorageTraits - Removed default implementation for WritableStorageTraits::set_partial_values(). - Adds utility functions {async_}store_set_partial_values for stores implementing ReadableWritableStorageTraits - Readd ReadableWritableStorage --- CHANGELOG.md | 12 ++ Cargo.toml | 2 +- examples/sharded_array_write_read.rs | 2 +- examples/zip_array_write_read.rs | 4 +- src/array/array_async.rs | 6 +- src/array/array_builder.rs | 3 + src/array/array_sync.rs | 6 +- src/storage.rs | 15 ++- src/storage/storage_async.rs | 107 ++++++++++-------- src/storage/storage_handle.rs | 21 +++- src/storage/storage_sync.rs | 75 +++++++----- src/storage/storage_transformer.rs | 11 +- .../performance_metrics.rs | 28 ++++- .../storage_transformer_chain.rs | 18 ++- src/storage/storage_transformer/usage_log.rs | 28 ++++- .../store/store_async/amazon_s3_store.rs | 2 +- .../store/store_async/google_cloud_store.rs | 2 +- src/storage/store/store_async/memory_store.rs | 2 +- .../store_async/microsoft_azure_store.rs | 2 +- src/storage/store/store_async/object_store.rs | 90 ++++++++++----- .../store/store_sync/filesystem_store.rs | 20 ++-- src/storage/store/store_sync/memory_store.rs | 18 +-- 22 files changed, 313 insertions(+), 161 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f00947fd..2a8f8f35 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + - Added `ReadableWritableStorage` + - Added `{async_}store_set_partial_values` + - **Breaking** Added `create_readable_writable_transformer` to `StorageTransformerExtension` trait + +### Changed + - **Breaking** `ReadableStorageTraits` is no longer a supertrait of `WritableStorageTraits` + - `WritableStorage` is no longer implicitly readable. Use `ReadableWritableStorage` + - **Breaking**: `{Async}WritableStorageTraits::set_partial_values()` no longer include default implementations + - Use new `{async_}store_set_partial_values` utility functions instead + - Add `#[must_use]` to `Array::builder`, `Array::chunk_grid_shape`, and `ArrayBuilder::from_array` + ## [0.7.3] - 2023-12-22 ### Added diff --git a/Cargo.toml b/Cargo.toml index 3a451304..fe8f11cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "zarrs" -version = "0.7.3" +version = "0.8.0" authors = ["Lachlan Deakin "] edition = "2021" rust-version = "1.70" diff --git a/examples/sharded_array_write_read.rs b/examples/sharded_array_write_read.rs index b12edd12..206c617b 100644 --- a/examples/sharded_array_write_read.rs +++ b/examples/sharded_array_write_read.rs @@ -29,7 +29,7 @@ fn sharded_array_write_read() -> Result<(), Box> { chrono::Utc::now().format("[%T%.3f] ").to_string() }); let store_readable_listable = usage_log.create_readable_listable_transformer(store.clone()); - let store = usage_log.create_writable_transformer(store); + let store = usage_log.create_readable_writable_transformer(store); // Create a group and write metadata to filesystem let group_path = "/group"; diff --git a/examples/zip_array_write_read.rs b/examples/zip_array_write_read.rs index f89ac300..2dea68db 100644 --- a/examples/zip_array_write_read.rs +++ b/examples/zip_array_write_read.rs @@ -8,13 +8,13 @@ use std::{ use zarrs::{ array::{Array, ZARR_NAN_F32}, array_subset::ArraySubset, - storage::{ReadableStorageTraits, WritableStorageTraits}, + storage::{ReadableStorageTraits, ReadableWritableStorageTraits}, }; // const ARRAY_PATH: &'static str = "/array"; const ARRAY_PATH: &str = "/"; -fn write_array_to_storage( +fn write_array_to_storage( storage: Arc, ) -> Result, Box> { use zarrs::array::{chunk_grid::ChunkGridTraits, codec, DataType, FillValue}; diff --git a/src/array/array_async.rs b/src/array/array_async.rs index 30429d97..b1dd7e9c 100644 --- a/src/array/array_async.rs +++ b/src/array/array_async.rs @@ -7,8 +7,8 @@ use crate::{ array_subset::ArraySubset, node::NodePath, storage::{ - data_key, meta_key, AsyncReadableStorageTraits, AsyncWritableStorageTraits, StorageError, - StorageHandle, + data_key, meta_key, AsyncReadableStorageTraits, AsyncReadableWritableStorageTraits, + AsyncWritableStorageTraits, StorageError, StorageHandle, }, }; @@ -886,7 +886,7 @@ impl Array { } } -impl Array { +impl Array { #[allow(clippy::too_many_lines)] async fn _async_store_array_subset( &self, diff --git a/src/array/array_builder.rs b/src/array/array_builder.rs index e95c6572..dccc841c 100644 --- a/src/array/array_builder.rs +++ b/src/array/array_builder.rs @@ -309,6 +309,9 @@ impl ArrayBuilder { additional_fields: self.additional_fields.clone(), parallel_codecs: self.parallel_codecs, include_zarrs_metadata: true, + chunk_locks: parking_lot::Mutex::default(), + #[cfg(feature = "async")] + async_chunk_locks: async_lock::Mutex::default(), }) } } diff --git a/src/array/array_sync.rs b/src/array/array_sync.rs index e262e253..78204d55 100644 --- a/src/array/array_sync.rs +++ b/src/array/array_sync.rs @@ -7,8 +7,8 @@ use crate::{ array_subset::ArraySubset, node::NodePath, storage::{ - data_key, meta_key, ReadableStorageTraits, StorageError, StorageHandle, - WritableStorageTraits, + data_key, meta_key, ReadableStorageTraits, ReadableWritableStorageTraits, StorageError, + StorageHandle, WritableStorageTraits, }, }; @@ -777,7 +777,7 @@ impl Array { } } -impl Array { +impl Array { fn _store_array_subset( &self, array_subset: &ArraySubset, diff --git a/src/storage.rs b/src/storage.rs index da86015b..372c323b 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -41,15 +41,16 @@ pub use self::storage_async::{ async_create_array, async_create_group, async_discover_children, async_discover_nodes, async_erase_chunk, async_erase_node, async_get_child_nodes, async_node_exists, async_node_exists_listable, async_retrieve_chunk, async_retrieve_partial_values, - async_store_chunk, AsyncListableStorageTraits, AsyncReadableListableStorageTraits, - AsyncReadableStorageTraits, AsyncWritableStorageTraits, + async_store_chunk, async_store_set_partial_values, AsyncListableStorageTraits, + AsyncReadableListableStorageTraits, AsyncReadableStorageTraits, + AsyncReadableWritableStorageTraits, AsyncWritableStorageTraits, }; pub use self::storage_sync::{ create_array, create_group, discover_children, discover_nodes, erase_chunk, erase_node, get_child_nodes, node_exists, node_exists_listable, retrieve_chunk, retrieve_partial_values, - store_chunk, ListableStorageTraits, ReadableListableStorageTraits, ReadableStorageTraits, - WritableStorageTraits, + store_chunk, store_set_partial_values, ListableStorageTraits, ReadableListableStorageTraits, + ReadableStorageTraits, ReadableWritableStorageTraits, WritableStorageTraits, }; pub use self::storage_transformer::StorageTransformerChain; @@ -63,6 +64,9 @@ pub type ReadableStorage<'a> = Arc; /// [`Arc`] wrapped writable storage. pub type WritableStorage<'a> = Arc; +/// [`Arc`] wrapped readable and writable storage. +pub type ReadableWritableStorage<'a> = Arc; + /// [`Arc`] wrapped listable storage. pub type ListableStorage<'a> = Arc; @@ -277,7 +281,8 @@ mod tests { // storage_transformer_usage_log.clone(), storage_transformer_performance_metrics.clone(), ]); - let transformer = storage_transformer_chain.create_writable_transformer(store.clone()); + let transformer = + storage_transformer_chain.create_readable_writable_transformer(store.clone()); let transformer_listable = storage_transformer_chain.create_listable_transformer(store); (0..10).into_par_iter().for_each(|_| { diff --git a/src/storage/storage_async.rs b/src/storage/storage_async.rs index d7b232fd..af47c8b7 100644 --- a/src/storage/storage_async.rs +++ b/src/storage/storage_async.rs @@ -159,71 +159,82 @@ pub trait AsyncListableStorageTraits: Send + Sync { async fn list_dir(&self, prefix: &StorePrefix) -> Result; } +/// Set partial values for an asynchronous store. +/// +/// # Errors +/// Returns a [`StorageError`] if an underlying store operation fails. +/// +/// # Panics +/// Panics if a key ends beyond `usize::MAX`. +pub async fn async_store_set_partial_values( + store: &T, + key_start_values: &[StoreKeyStartValue<'_>], +) -> Result<(), StorageError> { + // Group by key + let group_by_key = key_start_values + .iter() + .group_by(|key_start_value| &key_start_value.key) + .into_iter() + .map(|(key, group)| (key.clone(), group.into_iter().cloned().collect::>())) + .collect::>(); + + // Read keys + let mut futures = group_by_key + .into_iter() + .map(|(key, group)| async move { + // TODO: Lock + + // Read the store key + let mut bytes = store.get(&key.clone()).await?.unwrap_or_else(Vec::default); + + // Expand the store key if needed + let end_max = + usize::try_from(group.iter().map(StoreKeyStartValue::end).max().unwrap()).unwrap(); + if bytes.len() < end_max { + bytes.resize_with(end_max, Default::default); + } + + // Update the store key + for key_start_value in group { + let start: usize = key_start_value.start.try_into().unwrap(); + let end: usize = key_start_value.end().try_into().unwrap(); + bytes[start..end].copy_from_slice(key_start_value.value); + } + + // Write the store key + store.set(&key, &bytes).await + }) + .collect::>(); + while let Some(item) = futures.next().await { + item?; + } + + Ok(()) +} + /// Async writable storage traits. #[cfg_attr(feature = "async", async_trait::async_trait)] -pub trait AsyncWritableStorageTraits: Send + Sync + AsyncReadableStorageTraits { +pub trait AsyncWritableStorageTraits: Send + Sync { /// Store bytes at a [`StoreKey`]. /// /// # Errors - /// /// Returns a [`StorageError`] on failure to store. async fn set(&self, key: &StoreKey, value: &[u8]) -> Result<(), StorageError>; /// Store bytes according to a list of [`StoreKeyStartValue`]. /// /// # Errors - /// /// Returns a [`StorageError`] on failure to store. async fn set_partial_values( &self, key_start_values: &[StoreKeyStartValue], - ) -> Result<(), StorageError> { - // Group by key - let group_by_key = key_start_values - .iter() - .group_by(|key_start_value| &key_start_value.key) - .into_iter() - .map(|(key, group)| (key.clone(), group.into_iter().cloned().collect::>())) - .collect::>(); - - // Read keys - let mut futures = group_by_key - .into_iter() - .map(|(key, group)| async move { - let mut bytes = self.get(&key.clone()).await?.unwrap_or_else(Vec::default); - let end_max = - usize::try_from(group.iter().map(StoreKeyStartValue::end).max().unwrap()) - .unwrap(); - - // Expand the store key if needed - if bytes.len() < end_max { - bytes.resize_with(end_max, Default::default); - } - - // Update the store key - for key_start_value in group { - let start: usize = key_start_value.start.try_into().unwrap(); - let end: usize = key_start_value.end().try_into().unwrap(); - bytes[start..end].copy_from_slice(key_start_value.value); - } - - // Write the store key - self.set(&key, &bytes).await - }) - .collect::>(); - while let Some(item) = futures.next().await { - item?; - } - - Ok(()) - } + ) -> Result<(), StorageError>; /// Erase a [`StoreKey`]. /// /// Returns true if the key exists and was erased, or false if the key does not exist. /// /// # Errors - /// /// Returns a [`StorageError`] if there is an underlying storage error. async fn erase(&self, key: &StoreKey) -> Result; @@ -232,7 +243,6 @@ pub trait AsyncWritableStorageTraits: Send + Sync + AsyncReadableStorageTraits { /// Returns true if all keys existed and were erased, or false if any key does not exist. /// /// # Errors - /// /// Returns a [`StorageError`] if there is an underlying storage error. async fn erase_values(&self, keys: &[StoreKey]) -> Result { let futures_erase = keys.iter().map(|key| self.erase(key)); @@ -253,6 +263,13 @@ pub trait AsyncWritableStorageTraits: Send + Sync + AsyncReadableStorageTraits { async fn erase_prefix(&self, prefix: &StorePrefix) -> Result; } +/// A supertrait of [`AsyncReadableStorageTraits`] and [`AsyncWritableStorageTraits`]. +#[cfg_attr(feature = "async", async_trait::async_trait)] +pub trait AsyncReadableWritableStorageTraits: + AsyncReadableStorageTraits + AsyncWritableStorageTraits +{ +} + /// A supertrait of [`AsyncReadableStorageTraits`] and [`AsyncListableStorageTraits`]. pub trait AsyncReadableListableStorageTraits: AsyncReadableStorageTraits + AsyncListableStorageTraits diff --git a/src/storage/storage_handle.rs b/src/storage/storage_handle.rs index 35b9985d..1d4e63ca 100644 --- a/src/storage/storage_handle.rs +++ b/src/storage/storage_handle.rs @@ -1,12 +1,15 @@ use crate::{array::MaybeBytes, byte_range::ByteRange}; use super::{ - ListableStorageTraits, ReadableStorageTraits, StorageError, StoreKey, StorePrefix, - WritableStorageTraits, + ListableStorageTraits, ReadableStorageTraits, ReadableWritableStorageTraits, StorageError, + StoreKey, StorePrefix, WritableStorageTraits, }; #[cfg(feature = "async")] -use super::{AsyncListableStorageTraits, AsyncReadableStorageTraits, AsyncWritableStorageTraits}; +use super::{ + AsyncListableStorageTraits, AsyncReadableStorageTraits, AsyncReadableWritableStorageTraits, + AsyncWritableStorageTraits, +}; /// A storage handle. /// @@ -105,6 +108,11 @@ impl WritableStorageTraits } } +impl ReadableWritableStorageTraits + for StorageHandle<'_, TStorage> +{ +} + #[cfg(feature = "async")] #[cfg_attr(feature = "async", async_trait::async_trait)] impl AsyncReadableStorageTraits @@ -194,3 +202,10 @@ impl AsyncWritableStorageTraits self.0.erase_prefix(prefix).await } } + +#[cfg(feature = "async")] +#[cfg_attr(feature = "async", async_trait::async_trait)] +impl AsyncReadableWritableStorageTraits + for StorageHandle<'_, TStorage> +{ +} diff --git a/src/storage/storage_sync.rs b/src/storage/storage_sync.rs index 3a498a98..eb3deafe 100644 --- a/src/storage/storage_sync.rs +++ b/src/storage/storage_sync.rs @@ -152,8 +152,54 @@ pub trait ListableStorageTraits: Send + Sync { fn list_dir(&self, prefix: &StorePrefix) -> Result; } +/// Set partial values for a store. +/// +/// # Errors +/// Returns a [`StorageError`] if an underlying store operation fails. +/// +/// # Panics +/// Panics if a key ends beyond `usize::MAX`. +pub fn store_set_partial_values( + store: &T, + key_start_values: &[StoreKeyStartValue], +) -> Result<(), StorageError> { + // Group by key + let group_by_key = key_start_values + .iter() + .group_by(|key_start_value| &key_start_value.key) + .into_iter() + .map(|(key, group)| (key.clone(), group.into_iter().cloned().collect::>())) + .collect::>(); + + // Group by store key + group_by_key.into_iter().try_for_each(|(key, group)| { + // TODO: Lock + + // Read the store key + let mut bytes = store.get(&key)?.unwrap_or_default(); + + // Expand the store key if needed + let end_max = + usize::try_from(group.iter().map(StoreKeyStartValue::end).max().unwrap()).unwrap(); + if bytes.len() < end_max { + bytes.resize_with(end_max, Default::default); + } + + // Update the store key + for key_start_value in group { + let start: usize = key_start_value.start.try_into().unwrap(); + let end: usize = key_start_value.end().try_into().unwrap(); + bytes[start..end].copy_from_slice(key_start_value.value); + } + + // Write the store key + store.set(&key, &bytes) + })?; + Ok(()) +} + /// Writable storage traits. -pub trait WritableStorageTraits: Send + Sync + ReadableStorageTraits { +pub trait WritableStorageTraits: Send + Sync { /// Store bytes at a [`StoreKey`]. /// /// # Errors @@ -169,30 +215,7 @@ pub trait WritableStorageTraits: Send + Sync + ReadableStorageTraits { fn set_partial_values( &self, key_start_values: &[StoreKeyStartValue], - ) -> Result<(), StorageError> { - // Group by store key - for (key, group) in &key_start_values - .iter() - .group_by(|key_start_value| &key_start_value.key) - { - // Read the store key - let mut bytes = self.get(key)?.unwrap_or_default(); - - // Update the store key - for key_start_value in group { - let start: usize = key_start_value.start.try_into().unwrap(); - let end: usize = key_start_value.end().try_into().unwrap(); - if bytes.len() < end { - bytes.resize(end, 0); - } - bytes[start..end].copy_from_slice(key_start_value.value); - } - - // Write the store key - self.set(key, &bytes)?; - } - Ok(()) - } + ) -> Result<(), StorageError>; /// Erase a [`StoreKey`]. /// @@ -230,8 +253,6 @@ pub trait WritableStorageTraits: Send + Sync + ReadableStorageTraits { /// A supertrait of [`ReadableStorageTraits`] and [`WritableStorageTraits`]. pub trait ReadableWritableStorageTraits: ReadableStorageTraits + WritableStorageTraits {} -impl ReadableWritableStorageTraits for T where T: ReadableStorageTraits + WritableStorageTraits {} - /// A supertrait of [`ReadableStorageTraits`] and [`ListableStorageTraits`]. pub trait ReadableListableStorageTraits: ReadableStorageTraits + ListableStorageTraits {} diff --git a/src/storage/storage_transformer.rs b/src/storage/storage_transformer.rs index e66ba9e9..c09c4117 100644 --- a/src/storage/storage_transformer.rs +++ b/src/storage/storage_transformer.rs @@ -17,7 +17,10 @@ use crate::{ plugin::{Plugin, PluginCreateError}, }; -use super::{ListableStorage, ReadableListableStorage, ReadableStorage, WritableStorage}; +use super::{ + ListableStorage, ReadableListableStorage, ReadableStorage, ReadableWritableStorage, + WritableStorage, +}; #[cfg(feature = "async")] use super::{ @@ -66,6 +69,12 @@ pub trait StorageTransformerExtension: core::fmt::Debug + Send + Sync { storage: WritableStorage<'a>, ) -> WritableStorage<'a>; + /// Create a readable and writable transformer. + fn create_readable_writable_transformer<'a>( + &'a self, + storage: ReadableWritableStorage<'a>, + ) -> ReadableWritableStorage<'a>; + /// Create a listable transformer. fn create_listable_transformer<'a>( &'a self, diff --git a/src/storage/storage_transformer/performance_metrics.rs b/src/storage/storage_transformer/performance_metrics.rs index 78f91da6..548aed3a 100644 --- a/src/storage/storage_transformer/performance_metrics.rs +++ b/src/storage/storage_transformer/performance_metrics.rs @@ -5,16 +5,17 @@ use crate::{ metadata::Metadata, storage::{ ListableStorage, ListableStorageTraits, ReadableListableStorage, ReadableStorage, - ReadableStorageTraits, StorageError, StoreKey, StoreKeyRange, StoreKeyStartValue, - StoreKeys, StoreKeysPrefixes, StorePrefix, WritableStorage, WritableStorageTraits, + ReadableStorageTraits, ReadableWritableStorage, ReadableWritableStorageTraits, + StorageError, StoreKey, StoreKeyRange, StoreKeyStartValue, StoreKeys, StoreKeysPrefixes, + StorePrefix, WritableStorage, WritableStorageTraits, }, }; #[cfg(feature = "async")] use crate::storage::{ AsyncListableStorage, AsyncListableStorageTraits, AsyncReadableListableStorage, - AsyncReadableStorage, AsyncReadableStorageTraits, AsyncWritableStorage, - AsyncWritableStorageTraits, + AsyncReadableStorage, AsyncReadableStorageTraits, AsyncReadableWritableStorageTraits, + AsyncWritableStorage, AsyncWritableStorageTraits, }; use std::sync::{ @@ -90,6 +91,13 @@ impl StorageTransformerExtension for PerformanceMetricsStorageTransformer { self.create_transformer(storage) } + fn create_readable_writable_transformer<'a>( + &'a self, + storage: ReadableWritableStorage<'a>, + ) -> ReadableWritableStorage<'a> { + self.create_transformer(storage) + } + fn create_listable_transformer<'a>( &'a self, storage: ListableStorage<'a>, @@ -268,6 +276,11 @@ impl WritableStorageTraits } } +impl ReadableWritableStorageTraits + for PerformanceMetricsStorageTransformerImpl<'_, TStorage> +{ +} + #[cfg(feature = "async")] #[cfg_attr(feature = "async", async_trait::async_trait)] impl AsyncReadableStorageTraits @@ -397,3 +410,10 @@ impl AsyncWritableStorageTraits self.storage.erase_prefix(prefix).await } } + +#[cfg(feature = "async")] +#[cfg_attr(feature = "async", async_trait::async_trait)] +impl AsyncReadableWritableStorageTraits + for PerformanceMetricsStorageTransformerImpl<'_, TStorage> +{ +} diff --git a/src/storage/storage_transformer/storage_transformer_chain.rs b/src/storage/storage_transformer/storage_transformer_chain.rs index 5b7f74a5..436d6930 100644 --- a/src/storage/storage_transformer/storage_transformer_chain.rs +++ b/src/storage/storage_transformer/storage_transformer_chain.rs @@ -5,7 +5,10 @@ use derive_more::From; use crate::{ metadata::Metadata, plugin::PluginCreateError, - storage::{ListableStorage, ReadableListableStorage, ReadableStorage, WritableStorage}, + storage::{ + ListableStorage, ReadableListableStorage, ReadableStorage, ReadableWritableStorage, + WritableStorage, + }, }; #[cfg(feature = "async")] @@ -73,6 +76,17 @@ impl StorageTransformerChain { storage } + /// Create a readable and writable storage transformer. + pub fn create_readable_writable_transformer<'a>( + &'a self, + mut storage: ReadableWritableStorage<'a>, + ) -> ReadableWritableStorage<'a> { + for transformer in &self.0 { + storage = transformer.create_readable_writable_transformer(storage); + } + storage + } + /// Create a listable storage transformer. pub fn create_listable_transformer<'a>( &'a self, @@ -84,7 +98,7 @@ impl StorageTransformerChain { storage } - /// Create a listable storage transformer. + /// Create a readable and listable storage transformer. pub fn create_readable_listable_transformer<'a>( &'a self, mut storage: ReadableListableStorage<'a>, diff --git a/src/storage/storage_transformer/usage_log.rs b/src/storage/storage_transformer/usage_log.rs index 983c6c68..91f16316 100644 --- a/src/storage/storage_transformer/usage_log.rs +++ b/src/storage/storage_transformer/usage_log.rs @@ -11,16 +11,17 @@ use crate::{ metadata::Metadata, storage::{ ListableStorage, ListableStorageTraits, ReadableListableStorage, ReadableStorage, - ReadableStorageTraits, StorageError, StoreKey, StoreKeyRange, StoreKeyStartValue, - StoreKeys, StoreKeysPrefixes, StorePrefix, WritableStorage, WritableStorageTraits, + ReadableStorageTraits, ReadableWritableStorage, ReadableWritableStorageTraits, + StorageError, StoreKey, StoreKeyRange, StoreKeyStartValue, StoreKeys, StoreKeysPrefixes, + StorePrefix, WritableStorage, WritableStorageTraits, }, }; #[cfg(feature = "async")] use crate::storage::{ AsyncListableStorage, AsyncListableStorageTraits, AsyncReadableListableStorage, - AsyncReadableStorage, AsyncReadableStorageTraits, AsyncWritableStorage, - AsyncWritableStorageTraits, + AsyncReadableStorage, AsyncReadableStorageTraits, AsyncReadableWritableStorageTraits, + AsyncWritableStorage, AsyncWritableStorageTraits, }; use super::StorageTransformerExtension; @@ -67,6 +68,13 @@ impl StorageTransformerExtension for UsageLogStorageTransformer { self.create_transformer(storage) } + fn create_readable_writable_transformer<'a>( + &'a self, + storage: ReadableWritableStorage<'a>, + ) -> ReadableWritableStorage<'a> { + self.create_transformer(storage) + } + fn create_writable_transformer<'a>(&self, storage: WritableStorage<'a>) -> WritableStorage<'a> { self.create_transformer(storage) } @@ -281,6 +289,11 @@ impl WritableStorageTraits } } +impl ReadableWritableStorageTraits + for UsageLogStorageTransformerImpl +{ +} + #[cfg(feature = "async")] #[cfg_attr(feature = "async", async_trait::async_trait)] impl AsyncReadableStorageTraits @@ -442,3 +455,10 @@ impl AsyncWritableStorageTraits self.storage.erase_prefix(prefix).await } } + +#[cfg(feature = "async")] +#[cfg_attr(feature = "async", async_trait::async_trait)] +impl AsyncReadableWritableStorageTraits + for UsageLogStorageTransformerImpl +{ +} diff --git a/src/storage/store/store_async/amazon_s3_store.rs b/src/storage/store/store_async/amazon_s3_store.rs index 5e9d7d26..5851cbbc 100644 --- a/src/storage/store/store_async/amazon_s3_store.rs +++ b/src/storage/store/store_async/amazon_s3_store.rs @@ -1,6 +1,6 @@ //! Amazon S3 stores. -use crate::{object_store_impl, storage::StorageError}; +use crate::object_store_impl; /// An Amazon S3 store. #[derive(Debug)] diff --git a/src/storage/store/store_async/google_cloud_store.rs b/src/storage/store/store_async/google_cloud_store.rs index 09bdf9ce..076bd8ca 100644 --- a/src/storage/store/store_async/google_cloud_store.rs +++ b/src/storage/store/store_async/google_cloud_store.rs @@ -1,6 +1,6 @@ //! Google Cloud stores. -use crate::{object_store_impl, storage::StorageError}; +use crate::object_store_impl; /// A Google Cloud Storage store. #[derive(Debug)] diff --git a/src/storage/store/store_async/memory_store.rs b/src/storage/store/store_async/memory_store.rs index a3c0ea50..f36ea277 100644 --- a/src/storage/store/store_async/memory_store.rs +++ b/src/storage/store/store_async/memory_store.rs @@ -1,6 +1,6 @@ //! An in-memory store. -use crate::{object_store_impl, storage::StorageError}; +use crate::object_store_impl; /// An in-memory store. #[derive(Debug)] diff --git a/src/storage/store/store_async/microsoft_azure_store.rs b/src/storage/store/store_async/microsoft_azure_store.rs index b24d4642..baefce17 100644 --- a/src/storage/store/store_async/microsoft_azure_store.rs +++ b/src/storage/store/store_async/microsoft_azure_store.rs @@ -1,6 +1,6 @@ //! Azure blob storage stores. -use crate::{object_store_impl, storage::StorageError}; +use crate::object_store_impl; /// A Microsoft Azure store. #[derive(Debug)] diff --git a/src/storage/store/store_async/object_store.rs b/src/storage/store/store_async/object_store.rs index 1c35bba5..73b35ea9 100644 --- a/src/storage/store/store_async/object_store.rs +++ b/src/storage/store/store_async/object_store.rs @@ -6,7 +6,8 @@ use crate::{ byte_range::ByteRange, storage::{ AsyncListableStorageTraits, AsyncReadableStorageTraits, AsyncWritableStorageTraits, - StorageError, StoreKey, StoreKeyRange, StoreKeys, StoreKeysPrefixes, StorePrefix, + StorageError, StoreKey, StoreKeyRange, StoreKeyStartValue, StoreKeys, StoreKeysPrefixes, + StorePrefix, }, }; @@ -116,6 +117,14 @@ impl AsyncWritableStorageTraits for T { Ok(()) } + async fn set_partial_values( + &self, + _key_start_values: &[StoreKeyStartValue], + ) -> Result<(), StorageError> { + // This is implemented in the parent + unreachable!() + } + async fn erase(&self, key: &StoreKey) -> Result { Ok(handle_result(ObjectStore::delete(self, &key_to_path(key)).await)?.is_some()) } @@ -196,23 +205,23 @@ impl AsyncListableStorageTraits for T { /// Implement the storage traits for an object store #[macro_export] macro_rules! object_store_impl { - ($store:ty, $var:ident) => { + ($store:ty, $object_store:ident) => { #[cfg_attr(feature = "async", async_trait::async_trait)] impl $crate::storage::AsyncReadableStorageTraits for $store { async fn get( &self, key: &$crate::storage::StoreKey, - ) -> Result<$crate::array::MaybeBytes, StorageError> { - $crate::storage::AsyncReadableStorageTraits::get(&self.$var, key).await + ) -> Result<$crate::array::MaybeBytes, $crate::storage::StorageError> { + $crate::storage::AsyncReadableStorageTraits::get(&self.$object_store, key).await } async fn get_partial_values_key( &self, key: &$crate::storage::StoreKey, byte_ranges: &[$crate::storage::ByteRange], - ) -> Result>>, StorageError> { + ) -> Result>>, $crate::storage::StorageError> { $crate::storage::AsyncReadableStorageTraits::get_partial_values_key( - &self.$var, + &self.$object_store, key, byte_ranges, ) @@ -222,7 +231,7 @@ macro_rules! object_store_impl { async fn get_partial_values( &self, key_ranges: &[$crate::storage::StoreKeyRange], - ) -> Result, StorageError> { + ) -> Result, $crate::storage::StorageError> { $crate::storage::AsyncReadableStorageTraits::get_partial_values( &self.object_store, key_ranges, @@ -233,19 +242,24 @@ macro_rules! object_store_impl { async fn size_prefix( &self, prefix: &$crate::storage::StorePrefix, - ) -> Result { - $crate::storage::AsyncReadableStorageTraits::size_prefix(&self.$var, prefix).await + ) -> Result { + $crate::storage::AsyncReadableStorageTraits::size_prefix( + &self.$object_store, + prefix, + ) + .await } async fn size_key( &self, key: &$crate::storage::StoreKey, - ) -> Result, StorageError> { - $crate::storage::AsyncReadableStorageTraits::size_key(&self.$var, key).await + ) -> Result, $crate::storage::StorageError> { + $crate::storage::AsyncReadableStorageTraits::size_key(&self.$object_store, key) + .await } - async fn size(&self) -> Result { - $crate::storage::AsyncReadableStorageTraits::size(&self.$var).await + async fn size(&self) -> Result { + $crate::storage::AsyncReadableStorageTraits::size(&self.$object_store).await } } @@ -255,51 +269,65 @@ macro_rules! object_store_impl { &self, key: &$crate::storage::StoreKey, value: &[u8], - ) -> Result<(), StorageError> { - $crate::storage::AsyncWritableStorageTraits::set(&self.$var, key, value).await + ) -> Result<(), $crate::storage::StorageError> { + $crate::storage::AsyncWritableStorageTraits::set(&self.$object_store, key, value) + .await } async fn set_partial_values( &self, key_start_values: &[$crate::storage::StoreKeyStartValue], - ) -> Result<(), StorageError> { - $crate::storage::AsyncWritableStorageTraits::set_partial_values( - &self.$var, - key_start_values, - ) - .await + ) -> Result<(), $crate::storage::StorageError> { + $crate::storage::async_store_set_partial_values(self, key_start_values).await } - async fn erase(&self, key: &$crate::storage::StoreKey) -> Result { - $crate::storage::AsyncWritableStorageTraits::erase(&self.$var, key).await + async fn erase( + &self, + key: &$crate::storage::StoreKey, + ) -> Result { + $crate::storage::AsyncWritableStorageTraits::erase(&self.$object_store, key).await } async fn erase_prefix( &self, prefix: &$crate::storage::StorePrefix, - ) -> Result { - $crate::storage::AsyncWritableStorageTraits::erase_prefix(&self.$var, prefix).await + ) -> Result { + $crate::storage::AsyncWritableStorageTraits::erase_prefix( + &self.$object_store, + prefix, + ) + .await } } + #[cfg_attr(feature = "async", async_trait::async_trait)] + impl $crate::storage::AsyncReadableWritableStorageTraits for $store {} + #[cfg_attr(feature = "async", async_trait::async_trait)] impl $crate::storage::AsyncListableStorageTraits for $store { - async fn list(&self) -> Result<$crate::storage::StoreKeys, StorageError> { - $crate::storage::AsyncListableStorageTraits::list(&self.$var).await + async fn list( + &self, + ) -> Result<$crate::storage::StoreKeys, $crate::storage::StorageError> { + $crate::storage::AsyncListableStorageTraits::list(&self.$object_store).await } async fn list_prefix( &self, prefix: &$crate::storage::StorePrefix, - ) -> Result<$crate::storage::StoreKeys, StorageError> { - $crate::storage::AsyncListableStorageTraits::list_prefix(&self.$var, prefix).await + ) -> Result<$crate::storage::StoreKeys, $crate::storage::StorageError> { + $crate::storage::AsyncListableStorageTraits::list_prefix( + &self.$object_store, + prefix, + ) + .await } async fn list_dir( &self, prefix: &$crate::storage::StorePrefix, - ) -> Result<$crate::storage::StoreKeysPrefixes, StorageError> { - $crate::storage::AsyncListableStorageTraits::list_dir(&self.$var, prefix).await + ) -> Result<$crate::storage::StoreKeysPrefixes, $crate::storage::StorageError> { + $crate::storage::AsyncListableStorageTraits::list_dir(&self.$object_store, prefix) + .await } } }; diff --git a/src/storage/store/store_sync/filesystem_store.rs b/src/storage/store/store_sync/filesystem_store.rs index bb6af0bd..3715dbb6 100644 --- a/src/storage/store/store_sync/filesystem_store.rs +++ b/src/storage/store/store_sync/filesystem_store.rs @@ -6,9 +6,10 @@ use crate::{ array::MaybeBytes, byte_range::{ByteOffset, ByteRange}, storage::{ - ListableStorageTraits, ReadableStorageTraits, StorageError, StoreKey, StoreKeyError, - StoreKeyRange, StoreKeyStartValue, StoreKeys, StoreKeysPrefixes, StorePrefix, - StorePrefixes, WritableStorageTraits, + store_set_partial_values, ListableStorageTraits, ReadableStorageTraits, + ReadableWritableStorageTraits, StorageError, StoreKey, StoreKeyError, StoreKeyRange, + StoreKeyStartValue, StoreKeys, StoreKeysPrefixes, StorePrefix, StorePrefixes, + WritableStorageTraits, }, }; @@ -280,16 +281,7 @@ impl WritableStorageTraits for FilesystemStore { return Err(StorageError::ReadOnly); } - for key_start_value in key_start_values { - Self::set_impl( - self, - &key_start_value.key, - key_start_value.value, - Some(key_start_value.start), - false, - )?; - } - Ok(()) + store_set_partial_values(self, key_start_values) } fn erase(&self, key: &StoreKey) -> Result { @@ -324,6 +316,8 @@ impl WritableStorageTraits for FilesystemStore { } } +impl ReadableWritableStorageTraits for FilesystemStore {} + impl ListableStorageTraits for FilesystemStore { fn list(&self) -> Result { Ok(WalkDir::new(&self.base_path) diff --git a/src/storage/store/store_sync/memory_store.rs b/src/storage/store/store_sync/memory_store.rs index 65a5b53f..8f00863e 100644 --- a/src/storage/store/store_sync/memory_store.rs +++ b/src/storage/store/store_sync/memory_store.rs @@ -7,8 +7,9 @@ use crate::{ array::MaybeBytes, byte_range::{ByteOffset, ByteRange}, storage::{ - ListableStorageTraits, ReadableStorageTraits, StorageError, StoreKey, StoreKeyRange, - StoreKeyStartValue, StoreKeys, StoreKeysPrefixes, StorePrefix, WritableStorageTraits, + store_set_partial_values, ListableStorageTraits, ReadableStorageTraits, + ReadableWritableStorageTraits, StorageError, StoreKey, StoreKeyRange, StoreKeyStartValue, + StoreKeys, StoreKeysPrefixes, StorePrefix, WritableStorageTraits, }, }; @@ -137,16 +138,7 @@ impl WritableStorageTraits for MemoryStore { &self, key_start_values: &[StoreKeyStartValue], ) -> Result<(), StorageError> { - for key_start_value in key_start_values { - Self::set_impl( - self, - &key_start_value.key, - key_start_value.value, - Some(key_start_value.start), - false, - ); - } - Ok(()) + store_set_partial_values(self, key_start_values) } fn erase(&self, key: &StoreKey) -> Result { @@ -168,6 +160,8 @@ impl WritableStorageTraits for MemoryStore { } } +impl ReadableWritableStorageTraits for MemoryStore {} + impl ListableStorageTraits for MemoryStore { fn list(&self) -> Result { let data_map = self.data_map.lock().unwrap();