Skip to content

Commit

Permalink
Revise storage traits
Browse files Browse the repository at this point in the history
- WritableStorageTraits no longer requires ReadableStorageTraits
- Removed default implementation for WritableStorageTraits::set_partial_values().
- Adds utility functions {async_}store_set_partial_values for stores implementing ReadableWritableStorageTraits
- Readd ReadableWritableStorage
  • Loading branch information
LDeakin committed Dec 22, 2023
1 parent a940ea5 commit 51d9419
Show file tree
Hide file tree
Showing 22 changed files with 313 additions and 161 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- Added `ReadableWritableStorage`
- Added `{async_}store_set_partial_values`
- **Breaking** Added `create_readable_writable_transformer` to `StorageTransformerExtension` trait

### Changed
- **Breaking** `ReadableStorageTraits` is no longer a supertrait of `WritableStorageTraits`
- `WritableStorage` is no longer implicitly readable. Use `ReadableWritableStorage`
- **Breaking**: `{Async}WritableStorageTraits::set_partial_values()` no longer include default implementations
- Use new `{async_}store_set_partial_values` utility functions instead
- Add `#[must_use]` to `Array::builder`, `Array::chunk_grid_shape`, and `ArrayBuilder::from_array`

## [0.7.3] - 2023-12-22

### Added
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "zarrs"
version = "0.7.3"
version = "0.8.0"
authors = ["Lachlan Deakin <[email protected]>"]
edition = "2021"
rust-version = "1.70"
Expand Down
2 changes: 1 addition & 1 deletion examples/sharded_array_write_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ fn sharded_array_write_read() -> Result<(), Box<dyn std::error::Error>> {
chrono::Utc::now().format("[%T%.3f] ").to_string()
});
let store_readable_listable = usage_log.create_readable_listable_transformer(store.clone());
let store = usage_log.create_writable_transformer(store);
let store = usage_log.create_readable_writable_transformer(store);

// Create a group and write metadata to filesystem
let group_path = "/group";
Expand Down
4 changes: 2 additions & 2 deletions examples/zip_array_write_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ use std::{
use zarrs::{
array::{Array, ZARR_NAN_F32},
array_subset::ArraySubset,
storage::{ReadableStorageTraits, WritableStorageTraits},
storage::{ReadableStorageTraits, ReadableWritableStorageTraits},
};

// const ARRAY_PATH: &'static str = "/array";
const ARRAY_PATH: &str = "/";

fn write_array_to_storage<TStorage: WritableStorageTraits>(
fn write_array_to_storage<TStorage: ReadableWritableStorageTraits>(
storage: Arc<TStorage>,
) -> Result<Array<TStorage>, Box<dyn std::error::Error>> {
use zarrs::array::{chunk_grid::ChunkGridTraits, codec, DataType, FillValue};
Expand Down
6 changes: 3 additions & 3 deletions src/array/array_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use crate::{
array_subset::ArraySubset,
node::NodePath,
storage::{
data_key, meta_key, AsyncReadableStorageTraits, AsyncWritableStorageTraits, StorageError,
StorageHandle,
data_key, meta_key, AsyncReadableStorageTraits, AsyncReadableWritableStorageTraits,
AsyncWritableStorageTraits, StorageError, StorageHandle,
},
};

Expand Down Expand Up @@ -886,7 +886,7 @@ impl<TStorage: ?Sized + AsyncWritableStorageTraits> Array<TStorage> {
}
}

impl<TStorage: ?Sized + AsyncReadableStorageTraits + AsyncWritableStorageTraits> Array<TStorage> {
impl<TStorage: ?Sized + AsyncReadableWritableStorageTraits> Array<TStorage> {
#[allow(clippy::too_many_lines)]
async fn _async_store_array_subset(
&self,
Expand Down
3 changes: 3 additions & 0 deletions src/array/array_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,9 @@ impl ArrayBuilder {
additional_fields: self.additional_fields.clone(),
parallel_codecs: self.parallel_codecs,
include_zarrs_metadata: true,
chunk_locks: parking_lot::Mutex::default(),
#[cfg(feature = "async")]
async_chunk_locks: async_lock::Mutex::default(),
})
}
}
6 changes: 3 additions & 3 deletions src/array/array_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use crate::{
array_subset::ArraySubset,
node::NodePath,
storage::{
data_key, meta_key, ReadableStorageTraits, StorageError, StorageHandle,
WritableStorageTraits,
data_key, meta_key, ReadableStorageTraits, ReadableWritableStorageTraits, StorageError,
StorageHandle, WritableStorageTraits,
},
};

Expand Down Expand Up @@ -777,7 +777,7 @@ impl<TStorage: ?Sized + WritableStorageTraits> Array<TStorage> {
}
}

impl<TStorage: ?Sized + ReadableStorageTraits + WritableStorageTraits> Array<TStorage> {
impl<TStorage: ?Sized + ReadableWritableStorageTraits> Array<TStorage> {
fn _store_array_subset(
&self,
array_subset: &ArraySubset,
Expand Down
15 changes: 10 additions & 5 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ pub use self::storage_async::{
async_create_array, async_create_group, async_discover_children, async_discover_nodes,
async_erase_chunk, async_erase_node, async_get_child_nodes, async_node_exists,
async_node_exists_listable, async_retrieve_chunk, async_retrieve_partial_values,
async_store_chunk, AsyncListableStorageTraits, AsyncReadableListableStorageTraits,
AsyncReadableStorageTraits, AsyncWritableStorageTraits,
async_store_chunk, async_store_set_partial_values, AsyncListableStorageTraits,
AsyncReadableListableStorageTraits, AsyncReadableStorageTraits,
AsyncReadableWritableStorageTraits, AsyncWritableStorageTraits,
};

pub use self::storage_sync::{
create_array, create_group, discover_children, discover_nodes, erase_chunk, erase_node,
get_child_nodes, node_exists, node_exists_listable, retrieve_chunk, retrieve_partial_values,
store_chunk, ListableStorageTraits, ReadableListableStorageTraits, ReadableStorageTraits,
WritableStorageTraits,
store_chunk, store_set_partial_values, ListableStorageTraits, ReadableListableStorageTraits,
ReadableStorageTraits, ReadableWritableStorageTraits, WritableStorageTraits,
};
pub use self::storage_transformer::StorageTransformerChain;

Expand All @@ -63,6 +64,9 @@ pub type ReadableStorage<'a> = Arc<dyn ReadableStorageTraits + 'a>;
/// [`Arc`] wrapped writable storage.
pub type WritableStorage<'a> = Arc<dyn WritableStorageTraits + 'a>;

/// [`Arc`] wrapped readable and writable storage.
pub type ReadableWritableStorage<'a> = Arc<dyn ReadableWritableStorageTraits + 'a>;

/// [`Arc`] wrapped listable storage.
pub type ListableStorage<'a> = Arc<dyn ListableStorageTraits + 'a>;

Expand Down Expand Up @@ -277,7 +281,8 @@ mod tests {
// storage_transformer_usage_log.clone(),
storage_transformer_performance_metrics.clone(),
]);
let transformer = storage_transformer_chain.create_writable_transformer(store.clone());
let transformer =
storage_transformer_chain.create_readable_writable_transformer(store.clone());
let transformer_listable = storage_transformer_chain.create_listable_transformer(store);

(0..10).into_par_iter().for_each(|_| {
Expand Down
107 changes: 62 additions & 45 deletions src/storage/storage_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,71 +159,82 @@ pub trait AsyncListableStorageTraits: Send + Sync {
async fn list_dir(&self, prefix: &StorePrefix) -> Result<StoreKeysPrefixes, StorageError>;
}

/// Set partial values for an asynchronous store.
///
/// # Errors
/// Returns a [`StorageError`] if an underlying store operation fails.
///
/// # Panics
/// Panics if a key ends beyond `usize::MAX`.
pub async fn async_store_set_partial_values<T: AsyncReadableWritableStorageTraits>(
store: &T,
key_start_values: &[StoreKeyStartValue<'_>],
) -> Result<(), StorageError> {
// Group by key
let group_by_key = key_start_values
.iter()
.group_by(|key_start_value| &key_start_value.key)
.into_iter()
.map(|(key, group)| (key.clone(), group.into_iter().cloned().collect::<Vec<_>>()))
.collect::<Vec<_>>();

// Read keys
let mut futures = group_by_key
.into_iter()
.map(|(key, group)| async move {
// TODO: Lock

// Read the store key
let mut bytes = store.get(&key.clone()).await?.unwrap_or_else(Vec::default);

// Expand the store key if needed
let end_max =
usize::try_from(group.iter().map(StoreKeyStartValue::end).max().unwrap()).unwrap();
if bytes.len() < end_max {
bytes.resize_with(end_max, Default::default);
}

// Update the store key
for key_start_value in group {
let start: usize = key_start_value.start.try_into().unwrap();
let end: usize = key_start_value.end().try_into().unwrap();
bytes[start..end].copy_from_slice(key_start_value.value);
}

// Write the store key
store.set(&key, &bytes).await
})
.collect::<FuturesUnordered<_>>();
while let Some(item) = futures.next().await {
item?;
}

Ok(())
}

/// Async writable storage traits.
#[cfg_attr(feature = "async", async_trait::async_trait)]
pub trait AsyncWritableStorageTraits: Send + Sync + AsyncReadableStorageTraits {
pub trait AsyncWritableStorageTraits: Send + Sync {
/// Store bytes at a [`StoreKey`].
///
/// # Errors
///
/// Returns a [`StorageError`] on failure to store.
async fn set(&self, key: &StoreKey, value: &[u8]) -> Result<(), StorageError>;

/// Store bytes according to a list of [`StoreKeyStartValue`].
///
/// # Errors
///
/// Returns a [`StorageError`] on failure to store.
async fn set_partial_values(
&self,
key_start_values: &[StoreKeyStartValue],
) -> Result<(), StorageError> {
// Group by key
let group_by_key = key_start_values
.iter()
.group_by(|key_start_value| &key_start_value.key)
.into_iter()
.map(|(key, group)| (key.clone(), group.into_iter().cloned().collect::<Vec<_>>()))
.collect::<Vec<_>>();

// Read keys
let mut futures = group_by_key
.into_iter()
.map(|(key, group)| async move {
let mut bytes = self.get(&key.clone()).await?.unwrap_or_else(Vec::default);
let end_max =
usize::try_from(group.iter().map(StoreKeyStartValue::end).max().unwrap())
.unwrap();

// Expand the store key if needed
if bytes.len() < end_max {
bytes.resize_with(end_max, Default::default);
}

// Update the store key
for key_start_value in group {
let start: usize = key_start_value.start.try_into().unwrap();
let end: usize = key_start_value.end().try_into().unwrap();
bytes[start..end].copy_from_slice(key_start_value.value);
}

// Write the store key
self.set(&key, &bytes).await
})
.collect::<FuturesUnordered<_>>();
while let Some(item) = futures.next().await {
item?;
}

Ok(())
}
) -> Result<(), StorageError>;

/// Erase a [`StoreKey`].
///
/// Returns true if the key exists and was erased, or false if the key does not exist.
///
/// # Errors
///
/// Returns a [`StorageError`] if there is an underlying storage error.
async fn erase(&self, key: &StoreKey) -> Result<bool, StorageError>;

Expand All @@ -232,7 +243,6 @@ pub trait AsyncWritableStorageTraits: Send + Sync + AsyncReadableStorageTraits {
/// Returns true if all keys existed and were erased, or false if any key does not exist.
///
/// # Errors
///
/// Returns a [`StorageError`] if there is an underlying storage error.
async fn erase_values(&self, keys: &[StoreKey]) -> Result<bool, StorageError> {
let futures_erase = keys.iter().map(|key| self.erase(key));
Expand All @@ -253,6 +263,13 @@ pub trait AsyncWritableStorageTraits: Send + Sync + AsyncReadableStorageTraits {
async fn erase_prefix(&self, prefix: &StorePrefix) -> Result<bool, StorageError>;
}

/// A supertrait of [`AsyncReadableStorageTraits`] and [`AsyncWritableStorageTraits`].
#[cfg_attr(feature = "async", async_trait::async_trait)]
pub trait AsyncReadableWritableStorageTraits:
AsyncReadableStorageTraits + AsyncWritableStorageTraits
{
}

/// A supertrait of [`AsyncReadableStorageTraits`] and [`AsyncListableStorageTraits`].
pub trait AsyncReadableListableStorageTraits:
AsyncReadableStorageTraits + AsyncListableStorageTraits
Expand Down
21 changes: 18 additions & 3 deletions src/storage/storage_handle.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use crate::{array::MaybeBytes, byte_range::ByteRange};

use super::{
ListableStorageTraits, ReadableStorageTraits, StorageError, StoreKey, StorePrefix,
WritableStorageTraits,
ListableStorageTraits, ReadableStorageTraits, ReadableWritableStorageTraits, StorageError,
StoreKey, StorePrefix, WritableStorageTraits,
};

#[cfg(feature = "async")]
use super::{AsyncListableStorageTraits, AsyncReadableStorageTraits, AsyncWritableStorageTraits};
use super::{
AsyncListableStorageTraits, AsyncReadableStorageTraits, AsyncReadableWritableStorageTraits,
AsyncWritableStorageTraits,
};

/// A storage handle.
///
Expand Down Expand Up @@ -105,6 +108,11 @@ impl<TStorage: ?Sized + WritableStorageTraits> WritableStorageTraits
}
}

impl<TStorage: ?Sized + ReadableWritableStorageTraits> ReadableWritableStorageTraits
for StorageHandle<'_, TStorage>
{
}

#[cfg(feature = "async")]
#[cfg_attr(feature = "async", async_trait::async_trait)]
impl<TStorage: ?Sized + AsyncReadableStorageTraits> AsyncReadableStorageTraits
Expand Down Expand Up @@ -194,3 +202,10 @@ impl<TStorage: ?Sized + AsyncWritableStorageTraits> AsyncWritableStorageTraits
self.0.erase_prefix(prefix).await
}
}

#[cfg(feature = "async")]
#[cfg_attr(feature = "async", async_trait::async_trait)]
impl<TStorage: ?Sized + AsyncReadableWritableStorageTraits> AsyncReadableWritableStorageTraits
for StorageHandle<'_, TStorage>
{
}
Loading

0 comments on commit 51d9419

Please sign in to comment.