Skip to content

Commit

Permalink
Add store lock tests and remove non-default store lock constructors
Browse files Browse the repository at this point in the history
  • Loading branch information
LDeakin committed Feb 10, 2024
1 parent e246751 commit 9a9255f
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 4 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Implement `From<ChunkShape>` for `Vec<NonZeroU64>`
- Add `TestUnbounded` codec for internal testing
- Additional sharding tests
- Add store lock tests

### Changed
- Dependency bumps
Expand All @@ -30,6 +31,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Removed
- **Breaking**: remove `InvalidArraySubsetError` and `ArrayExtractElementsError`
- Remove non-default store lock constructors

## [0.11.6] - 2024-02-06

Expand Down
36 changes: 35 additions & 1 deletion src/storage/store_lock/store_lock_async/default_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl AsyncStoreKeyMutexTraits for AsyncDefaultStoreMutex {
}

/// Default store locks.
#[derive(Debug, Default, derive_more::Constructor)]
#[derive(Debug, Default)]
pub struct AsyncDefaultStoreLocks(Mutex<HashMap<StoreKey, Arc<Mutex<()>>>>);

#[async_trait::async_trait]
Expand All @@ -42,3 +42,37 @@ impl AsyncStoreLocksTraits for AsyncDefaultStoreLocks {
))
}
}

#[cfg(test)]
mod tests {
use std::{sync::atomic::AtomicUsize, time::Duration};

use crate::storage::{store::AsyncObjectStore, AsyncReadableWritableStorageTraits};

use super::*;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn store_default_lock_async() {
let store = Arc::new(AsyncObjectStore::new_with_locks(
object_store::memory::InMemory::default(),
Arc::new(AsyncDefaultStoreLocks::default()),
));
let locks_held = Arc::new(AtomicUsize::new(0));
let futures = (0..20).into_iter().map(|_| {
let key = StoreKey::new("key").unwrap();
let store = store.clone();
let locks_held = locks_held.clone();
tokio::task::spawn(async move {
let mutex = store.mutex(&key).await.unwrap();
let _lock = mutex.lock().await;
locks_held.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
std::thread::sleep(Duration::from_millis(10));
let locks_held = locks_held.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
locks_held == 1
})
});
let result = futures::future::try_join_all(futures).await.unwrap();
println!("{result:?}");
assert!(result.iter().all(|b| *b));
}
}
39 changes: 38 additions & 1 deletion src/storage/store_lock/store_lock_async/disabled_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl AsyncStoreKeyMutexTraits for AsyncDisabledStoreMutex {
}

/// Disabled store locks.
#[derive(Debug, Default, derive_more::Constructor)]
#[derive(Debug, Default)]
pub struct AsyncDisabledStoreLocks;

#[async_trait::async_trait]
Expand All @@ -34,3 +34,40 @@ impl AsyncStoreLocksTraits for AsyncDisabledStoreLocks {
Box::new(AsyncDisabledStoreMutex)
}
}

#[cfg(test)]
mod tests {
use std::{
sync::{atomic::AtomicUsize, Arc},
time::Duration,
};

use crate::storage::{store::AsyncObjectStore, AsyncReadableWritableStorageTraits};

use super::*;

#[tokio::test(flavor = "multi_thread")]
async fn store_disabled_lock_async() {
let store = Arc::new(AsyncObjectStore::new_with_locks(
object_store::memory::InMemory::default(),
Arc::new(AsyncDisabledStoreLocks::default()),
));
let locks_held = Arc::new(AtomicUsize::new(0));
let futures = (0..20).into_iter().map(|_| {
let key = StoreKey::new("key").unwrap();
let store = store.clone();
let locks_held = locks_held.clone();
tokio::task::spawn(async move {
let mutex = store.mutex(&key).await.unwrap();
let _lock = mutex.lock().await;
locks_held.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
std::thread::sleep(Duration::from_millis(10));
let locks_held = locks_held.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
locks_held > 1
})
});
let result = futures::future::try_join_all(futures).await.unwrap();
println!("{result:?}");
assert!(result.iter().any(|b| *b));
}
}
28 changes: 27 additions & 1 deletion src/storage/store_lock/store_lock_sync/default_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl StoreKeyMutexTraits for DefaultStoreMutex {
}

/// Default store locks.
#[derive(Debug, Default, derive_more::Constructor)]
#[derive(Debug, Default)]
pub struct DefaultStoreLocks(Mutex<HashMap<StoreKey, Arc<Mutex<()>>>>);

impl StoreLocksTraits for DefaultStoreLocks {
Expand All @@ -40,3 +40,29 @@ impl StoreLocksTraits for DefaultStoreLocks {
))
}
}

#[cfg(test)]
mod tests {
use std::{sync::atomic::AtomicUsize, time::Duration};

use rayon::iter::{IntoParallelIterator, ParallelIterator};

use crate::storage::{store::MemoryStore, ReadableWritableStorageTraits};

use super::*;

#[test]
fn store_default_lock_sync() {
let store = MemoryStore::new_with_locks(Arc::new(DefaultStoreLocks::default()));
let key = StoreKey::new("key").unwrap();
let locks_held = AtomicUsize::new(0);
(0..20).into_par_iter().for_each(|_| {
let mutex = store.mutex(&key).unwrap();
let _lock = mutex.lock();
locks_held.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
std::thread::sleep(Duration::from_millis(10));
let locks_held = locks_held.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
assert_eq!(locks_held, 1);
});
}
}
31 changes: 30 additions & 1 deletion src/storage/store_lock/store_lock_sync/disabled_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,40 @@ impl StoreKeyMutexTraits for DisabledStoreMutex {
}

/// Disabled store locks.
#[derive(Debug, Default, derive_more::Constructor)]
#[derive(Debug, Default)]
pub struct DisabledStoreLocks;

impl StoreLocksTraits for DisabledStoreLocks {
fn mutex(&self, _key: &StoreKey) -> StoreKeyMutex {
Box::new(DisabledStoreMutex)
}
}

#[cfg(test)]
mod tests {
use std::{
sync::{atomic::AtomicUsize, Arc},
time::Duration,
};

use rayon::iter::{IntoParallelIterator, ParallelIterator};

use crate::storage::{store::MemoryStore, ReadableWritableStorageTraits};

use super::*;

#[test]
fn store_disable_lock_sync() {
let store = MemoryStore::new_with_locks(Arc::new(DisabledStoreLocks::default()));
let key = StoreKey::new("key").unwrap();
let locks_held = AtomicUsize::new(0);
assert!((0..20).into_par_iter().any(|_| {
let mutex = store.mutex(&key).unwrap();
let _lock = mutex.lock();
locks_held.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
std::thread::sleep(Duration::from_millis(10));
let locks_held = locks_held.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
locks_held > 1
}));
}
}

0 comments on commit 9a9255f

Please sign in to comment.