Skip to content

Commit

Permalink
Add support for storage key locking
Browse files Browse the repository at this point in the history
Chunk locking in `Array` now utilises the underlying storage locking mechanism.
  • Loading branch information
LDeakin committed Dec 24, 2023
1 parent 7bee687 commit f22e671
Show file tree
Hide file tree
Showing 28 changed files with 570 additions and 116 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `ReadableWritableStorage` and `ReadableWritableStore` and async variants
- Added `{async_}store_set_partial_values`
- **Breaking** Added `create_readable_writable_transformer` to `StorageTransformerExtension` trait
- Added `storage::store_lock` module
- Adds generic `StoreLocks`, `StoreKeyMutex`, and `StoreKeyMutexGuard` with associated traits and async variants
- Includes `DefaultStoreLocks` and `DisabledStoreLocks` implementations
- Readable and writable stores include a `new_with_locks` method to choose the store lock implementation

### Changed
- **Breaking** `ReadableStorageTraits` is no longer a supertrait of `WritableStorageTraits`
Expand All @@ -19,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Use new `{async_}store_set_partial_values` utility functions instead
- Add `#[must_use]` to `Array::builder`, `Array::chunk_grid_shape`, and `ArrayBuilder::from_array`
- **Breaking** Remove `http` and `zip` from default features
- Locking functionality for arrays is moved into stores

## [0.7.3] - 2023-12-22

Expand Down
70 changes: 31 additions & 39 deletions src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ mod fill_value_metadata;
mod nan_representations;
mod unsafe_cell_slice;

use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;

pub use self::{
array_builder::ArrayBuilder,
Expand Down Expand Up @@ -164,11 +164,6 @@ pub struct Array<TStorage: ?Sized> {
additional_fields: AdditionalFields,
/// If true, codecs run with multithreading (where supported)
parallel_codecs: bool,
/// Chunk locks.
chunk_locks: parking_lot::Mutex<HashMap<Vec<u64>, Arc<parking_lot::Mutex<()>>>>,
#[cfg(feature = "async")]
/// Asynchronous chunk locks.
async_chunk_locks: async_lock::Mutex<HashMap<Vec<u64>, Arc<async_lock::Mutex<()>>>>,
/// Zarrs metadata.
include_zarrs_metadata: bool,
}
Expand Down Expand Up @@ -242,9 +237,6 @@ impl<TStorage: ?Sized> Array<TStorage> {
storage_transformers,
dimension_names: metadata.dimension_names,
parallel_codecs: true,
chunk_locks: parking_lot::Mutex::default(),
#[cfg(feature = "async")]
async_chunk_locks: async_lock::Mutex::default(),
include_zarrs_metadata: true,
})
}
Expand Down Expand Up @@ -483,31 +475,6 @@ impl<TStorage: ?Sized> Array<TStorage> {
},
)
}

#[must_use]
fn chunk_mutex(&self, chunk_indices: &[u64]) -> Arc<parking_lot::Mutex<()>> {
let mut chunk_locks = self.chunk_locks.lock();
// TODO: Cleanup old locks
let chunk_mutex = chunk_locks
.entry(chunk_indices.to_vec())
.or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
.clone();
drop(chunk_locks);
chunk_mutex
}

#[cfg(feature = "async")]
#[must_use]
async fn async_chunk_mutex(&self, chunk_indices: &[u64]) -> Arc<async_lock::Mutex<()>> {
let mut chunk_locks = self.async_chunk_locks.lock().await;
// TODO: Cleanup old locks
let chunk_mutex = chunk_locks
.entry(chunk_indices.to_vec())
.or_insert_with(|| Arc::new(async_lock::Mutex::new(())))
.clone();
drop(chunk_locks);
chunk_mutex
}
}

// Safe transmute, avoiding an allocation where possible
Expand Down Expand Up @@ -617,7 +584,10 @@ mod tests {
use itertools::Itertools;
use rayon::prelude::{IntoParallelIterator, ParallelIterator};

use crate::storage::store::MemoryStore;
use crate::storage::{
store::MemoryStore,
store_lock::{DefaultStoreLocks, StoreLocks},
};

use super::*;

Expand Down Expand Up @@ -724,9 +694,8 @@ mod tests {
);
}

#[test]
fn array_subset_locking() {
let store = Arc::new(MemoryStore::new());
fn array_subset_locking(locks: StoreLocks, expect_equal: bool) {
let store = Arc::new(MemoryStore::new_with_locks(locks));

let array_path = "/array";
let array = ArrayBuilder::new(
Expand All @@ -738,6 +707,7 @@ mod tests {
.build(store, array_path)
.unwrap();

let mut any_not_equal = false;
for j in 1..10 {
(0..100).into_par_iter().for_each(|i| {
let subset = ArraySubset::new_with_start_shape(vec![i, 0], vec![1, 4]).unwrap();
Expand All @@ -746,7 +716,29 @@ mod tests {
let subset_all =
ArraySubset::new_with_start_shape(vec![0, 0], array.shape().to_vec()).unwrap();
let data_all = array.retrieve_array_subset(&subset_all).unwrap();
assert_eq!(data_all.iter().all_equal_value(), Ok(&j));
let all_equal = data_all.iter().all_equal_value() == Ok(&j);
if expect_equal {
assert!(all_equal);
} else {
any_not_equal |= !all_equal;
}
}
if !expect_equal {
assert!(any_not_equal);
}
}

#[test]
fn array_subset_locking_default() {
array_subset_locking(Arc::new(DefaultStoreLocks::default()), true);
}

// // Due to the nature of this test, it can fail sometimes. It was used for development but is now disabled.
// #[test]
// fn array_subset_locking_disabled() {
// array_subset_locking(
// Arc::new(crate::storage::store_lock::DisabledStoreLocks::default()),
// false,
// );
// }
}
5 changes: 3 additions & 2 deletions src/array/array_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1216,8 +1216,9 @@ impl<TStorage: ?Sized + AsyncReadableWritableStorageTraits> Array<TStorage> {
.await
} else {
// Lock the chunk
let chunk_mutex = self.async_chunk_mutex(chunk_indices).await;
let _lock = chunk_mutex.lock();
let key = data_key(self.path(), chunk_indices, self.chunk_key_encoding());
let mutex = self.storage.mutex(&key).await?;
let _lock = mutex.lock();

// Decode the entire chunk
let mut chunk_bytes = self.async_retrieve_chunk(chunk_indices).await?;
Expand Down
3 changes: 0 additions & 3 deletions src/array/array_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,6 @@ impl ArrayBuilder {
additional_fields: self.additional_fields.clone(),
parallel_codecs: self.parallel_codecs,
include_zarrs_metadata: true,
chunk_locks: parking_lot::Mutex::default(),
#[cfg(feature = "async")]
async_chunk_locks: async_lock::Mutex::default(),
})
}
}
5 changes: 3 additions & 2 deletions src/array/array_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1065,8 +1065,9 @@ impl<TStorage: ?Sized + ReadableWritableStorageTraits> Array<TStorage> {
self.store_chunk(chunk_indices, chunk_subset_bytes)
} else {
// Lock the chunk
let chunk_mutex = self.chunk_mutex(chunk_indices);
let _lock = chunk_mutex.lock();
let key = data_key(self.path(), chunk_indices, self.chunk_key_encoding());
let mutex = self.storage.mutex(&key)?;
let _lock = mutex.lock();

// Decode the entire chunk
let mut chunk_bytes = self.retrieve_chunk(chunk_indices)?;
Expand Down
1 change: 1 addition & 0 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod storage_transformer;
mod storage_value_io;
pub mod store;
mod store_key;
pub mod store_lock;
mod store_prefix;

#[cfg(feature = "async")]
Expand Down
13 changes: 10 additions & 3 deletions src/storage/storage_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use crate::{
};

use super::{
data_key, meta_key, StorageError, StoreKey, StoreKeyRange, StoreKeyStartValue, StoreKeys,
StoreKeysPrefixes, StorePrefix, StorePrefixes,
data_key, meta_key, store_lock::AsyncStoreKeyMutex, StorageError, StoreKey, StoreKeyRange,
StoreKeyStartValue, StoreKeys, StoreKeysPrefixes, StorePrefix, StorePrefixes,
};

/// Async readable storage traits.
Expand Down Expand Up @@ -182,7 +182,9 @@ pub async fn async_store_set_partial_values<T: AsyncReadableWritableStorageTrait
let mut futures = group_by_key
.into_iter()
.map(|(key, group)| async move {
// TODO: Lock
// Lock the store key
let mutex = store.mutex(&key).await?;
let _lock = mutex.lock().await;

// Read the store key
let mut bytes = store.get(&key.clone()).await?.unwrap_or_else(Vec::default);
Expand Down Expand Up @@ -268,6 +270,11 @@ pub trait AsyncWritableStorageTraits: Send + Sync {
pub trait AsyncReadableWritableStorageTraits:
AsyncReadableStorageTraits + AsyncWritableStorageTraits
{
/// Returns the mutex for the store value at `key`.
///
/// # Errors
/// Returns a [`StorageError`] if the mutex cannot be retrieved.
async fn mutex(&self, key: &StoreKey) -> Result<AsyncStoreKeyMutex, StorageError>;
}

/// A supertrait of [`AsyncReadableStorageTraits`] and [`AsyncListableStorageTraits`].
Expand Down
14 changes: 10 additions & 4 deletions src/storage/storage_handle.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::{array::MaybeBytes, byte_range::ByteRange};

use super::{
ListableStorageTraits, ReadableStorageTraits, ReadableWritableStorageTraits, StorageError,
StoreKey, StorePrefix, WritableStorageTraits,
store_lock::StoreKeyMutex, ListableStorageTraits, ReadableStorageTraits,
ReadableWritableStorageTraits, StorageError, StoreKey, StorePrefix, WritableStorageTraits,
};

#[cfg(feature = "async")]
use super::{
AsyncListableStorageTraits, AsyncReadableStorageTraits, AsyncReadableWritableStorageTraits,
AsyncWritableStorageTraits,
store_lock::AsyncStoreKeyMutex, AsyncListableStorageTraits, AsyncReadableStorageTraits,
AsyncReadableWritableStorageTraits, AsyncWritableStorageTraits,
};

/// A storage handle.
Expand Down Expand Up @@ -111,6 +111,9 @@ impl<TStorage: ?Sized + WritableStorageTraits> WritableStorageTraits
impl<TStorage: ?Sized + ReadableWritableStorageTraits> ReadableWritableStorageTraits
for StorageHandle<'_, TStorage>
{
fn mutex(&self, key: &StoreKey) -> Result<StoreKeyMutex, StorageError> {
self.0.mutex(key)
}
}

#[cfg(feature = "async")]
Expand Down Expand Up @@ -208,4 +211,7 @@ impl<TStorage: ?Sized + AsyncWritableStorageTraits> AsyncWritableStorageTraits
impl<TStorage: ?Sized + AsyncReadableWritableStorageTraits> AsyncReadableWritableStorageTraits
for StorageHandle<'_, TStorage>
{
async fn mutex(&self, key: &StoreKey) -> Result<AsyncStoreKeyMutex, StorageError> {
self.0.mutex(key).await
}
}
16 changes: 12 additions & 4 deletions src/storage/storage_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use crate::{
};

use super::{
data_key, meta_key, StorageError, StoreKey, StoreKeyRange, StoreKeyStartValue, StoreKeys,
StoreKeysPrefixes, StorePrefix, StorePrefixes,
data_key, meta_key, store_lock::StoreKeyMutex, StorageError, StoreKey, StoreKeyRange,
StoreKeyStartValue, StoreKeys, StoreKeysPrefixes, StorePrefix, StorePrefixes,
};

/// Readable storage traits.
Expand Down Expand Up @@ -173,7 +173,9 @@ pub fn store_set_partial_values<T: ReadableWritableStorageTraits>(

// Group by store key
group_by_key.into_iter().try_for_each(|(key, group)| {
// TODO: Lock
// Lock the store key
let mutex = store.mutex(&key)?;
let _lock = mutex.lock();

// Read the store key
let mut bytes = store.get(&key)?.unwrap_or_default();
Expand Down Expand Up @@ -251,7 +253,13 @@ pub trait WritableStorageTraits: Send + Sync {
}

/// A supertrait of [`ReadableStorageTraits`] and [`WritableStorageTraits`].
pub trait ReadableWritableStorageTraits: ReadableStorageTraits + WritableStorageTraits {}
pub trait ReadableWritableStorageTraits: ReadableStorageTraits + WritableStorageTraits {
/// Returns the mutex for the store value at `key`.
///
/// # Errors
/// Returns a [`StorageError`] if the mutex cannot be retrieved.
fn mutex(&self, key: &StoreKey) -> Result<StoreKeyMutex, StorageError>;
}

/// A supertrait of [`ReadableStorageTraits`] and [`ListableStorageTraits`].
pub trait ReadableListableStorageTraits: ReadableStorageTraits + ListableStorageTraits {}
Expand Down
28 changes: 21 additions & 7 deletions src/storage/storage_transformer/performance_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@ use crate::{
array::MaybeBytes,
metadata::Metadata,
storage::{
ListableStorage, ListableStorageTraits, ReadableListableStorage, ReadableStorage,
ReadableStorageTraits, ReadableWritableStorage, ReadableWritableStorageTraits,
StorageError, StoreKey, StoreKeyRange, StoreKeyStartValue, StoreKeys, StoreKeysPrefixes,
StorePrefix, WritableStorage, WritableStorageTraits,
store_lock::StoreKeyMutex, ListableStorage, ListableStorageTraits, ReadableListableStorage,
ReadableStorage, ReadableStorageTraits, ReadableWritableStorage,
ReadableWritableStorageTraits, StorageError, StoreKey, StoreKeyRange, StoreKeyStartValue,
StoreKeys, StoreKeysPrefixes, StorePrefix, WritableStorage, WritableStorageTraits,
},
};

#[cfg(feature = "async")]
use crate::storage::{
AsyncListableStorage, AsyncListableStorageTraits, AsyncReadableListableStorage,
AsyncReadableStorage, AsyncReadableStorageTraits, AsyncReadableWritableStorageTraits,
AsyncWritableStorage, AsyncWritableStorageTraits,
store_lock::AsyncStoreKeyMutex, AsyncListableStorage, AsyncListableStorageTraits,
AsyncReadableListableStorage, AsyncReadableStorage, AsyncReadableStorageTraits,
AsyncReadableWritableStorageTraits, AsyncWritableStorage, AsyncWritableStorageTraits,
};

use std::sync::{
Expand All @@ -32,6 +32,7 @@ pub struct PerformanceMetricsStorageTransformer {
bytes_written: AtomicUsize,
reads: AtomicUsize,
writes: AtomicUsize,
locks: AtomicUsize,
}

impl PerformanceMetricsStorageTransformer {
Expand Down Expand Up @@ -61,6 +62,11 @@ impl PerformanceMetricsStorageTransformer {
self.writes.load(Ordering::Relaxed)
}

/// Returns the number of lock requests.
pub fn locks(&self) -> usize {
self.locks.load(Ordering::Relaxed)
}

fn create_transformer<TStorage: ?Sized>(
&self,
storage: Arc<TStorage>,
Expand Down Expand Up @@ -279,6 +285,10 @@ impl<TStorage: ?Sized + WritableStorageTraits> WritableStorageTraits
impl<TStorage: ?Sized + ReadableWritableStorageTraits> ReadableWritableStorageTraits
for PerformanceMetricsStorageTransformerImpl<'_, TStorage>
{
fn mutex(&self, key: &StoreKey) -> Result<StoreKeyMutex, StorageError> {
self.transformer.locks.fetch_add(1, Ordering::Relaxed);
self.storage.mutex(key)
}
}

#[cfg(feature = "async")]
Expand Down Expand Up @@ -416,4 +426,8 @@ impl<TStorage: ?Sized + AsyncWritableStorageTraits> AsyncWritableStorageTraits
impl<TStorage: ?Sized + AsyncReadableWritableStorageTraits> AsyncReadableWritableStorageTraits
for PerformanceMetricsStorageTransformerImpl<'_, TStorage>
{
async fn mutex(&self, key: &StoreKey) -> Result<AsyncStoreKeyMutex, StorageError> {
self.transformer.locks.fetch_add(1, Ordering::Relaxed);
self.storage.mutex(key).await
}
}
Loading

0 comments on commit f22e671

Please sign in to comment.