From 21beff363c052be7474cdf875f76ec144d49246a Mon Sep 17 00:00:00 2001 From: Lachlan Deakin Date: Mon, 25 Dec 2023 11:16:28 +1100 Subject: [PATCH] Revise `object_store` support --- CHANGELOG.md | 5 + Cargo.toml | 12 +- examples/async_array_write_read.rs | 4 +- examples/async_http_array_read.rs | 6 +- src/group.rs | 4 +- src/lib.rs | 2 +- src/storage/store.rs | 17 +- src/storage/store/store_async.rs | 12 +- .../store/store_async/amazon_s3_store.rs | 37 --- .../store/store_async/filesystem_store.rs | 166 ------------- .../store/store_async/google_cloud_store.rs | 37 --- src/storage/store/store_async/http_store.rs | 151 ------------ src/storage/store/store_async/memory_store.rs | 112 --------- .../store_async/microsoft_azure_store.rs | 37 --- src/storage/store/store_async/object_store.rs | 232 +++++------------- 15 files changed, 93 insertions(+), 741 deletions(-) delete mode 100644 src/storage/store/store_async/amazon_s3_store.rs delete mode 100644 src/storage/store/store_async/filesystem_store.rs delete mode 100644 src/storage/store/store_async/google_cloud_store.rs delete mode 100644 src/storage/store/store_async/http_store.rs delete mode 100644 src/storage/store/store_async/memory_store.rs delete mode 100644 src/storage/store/store_async/microsoft_azure_store.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 82e5c392..074bfa6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **Breaking** Remove `http` and `zip` from default features - Locking functionality for arrays is moved into stores - Improved `Array` documentation + - Add `object_store` feature and revise `object_store` support + - Add an `ObjectStore` store, which wraps any `object_store`-based store implementing `object_store::ObjectStore`. + - **Breaking** Removes the explicit `object_store`-based stores (e.g. `AsyncAmazonS3Store`, `AsyncHTTPStore`) + - **Breaking** Removes the `object_store_impl` macro + - **Breaking** Removes the `s3`, `gcp`, and `azure` crate features ## [0.7.3] - 2023-12-22 diff --git a/Cargo.toml b/Cargo.toml index aae4b0ff..da1f48c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,13 +22,11 @@ sharding = [] # Enable the sharding codec transpose = ["dep:ndarray"] # Enable the transpose codec zfp = ["dep:zfp-sys"] # Enable the experimental zfp codec zstd = ["dep:zstd"] # Enable the zstd codec -http = ["dep:reqwest", "dep:url", "object_store/http"] # Enable the HTTP store (including the async variant if `async` is enabled) -s3 = ["async", "dep:object_store", "object_store/aws"] # Enable the Amazon S3 store -gcp = ["async", "dep:object_store", "object_store/gcp"] # Enable the Google Cloud Storage store -azure = ["async", "dep:object_store", "object_store/azure"] # Enable the Microsoft Azure Blob Storage store +http = ["dep:reqwest", "dep:url"] # Enable the sync HTTP store zip = ["dep:zip"] # Enable the zip storage adapter ndarray = ["dep:ndarray"] # Adds ndarray utility functions to Array -async = ["dep:object_store", "dep:async-trait", "dep:async-recursion", "dep:async-lock", "dep:futures"] # Enable experimental async API (async stores and array/group operations) +async = ["dep:async-trait", "dep:async-recursion", "dep:async-lock", "dep:futures"] # Enable experimental async API +object_store = ["dep:object_store"] # Enable object_store asynchronous stores support [package.metadata.docs.rs] all-features = true @@ -80,11 +78,11 @@ required-features = ["ndarray"] [[example]] name = "async_array_write_read" -required-features = ["ndarray", "async"] +required-features = ["ndarray", "async", "object_store"] [[example]] name = "async_http_array_read" -required-features = ["ndarray", "async", "http"] +required-features = ["ndarray", "async", "object_store/http"] [[example]] name = "http_array_read" diff --git a/examples/async_array_write_read.rs b/examples/async_array_write_read.rs index 5a650bdf..1658f755 100644 --- a/examples/async_array_write_read.rs +++ b/examples/async_array_write_read.rs @@ -14,7 +14,9 @@ async fn async_array_write_read() -> Result<(), Box> { // let store = Arc::new(store::AsyncFilesystemStore::new( // "tests/data/array_write_read.zarr", // )?); - let store = Arc::new(store::AsyncMemoryStore::default()); + let store = Arc::new(store::AsyncObjectStore::new( + object_store::memory::InMemory::new(), + )); // Create a group and write metadata to filesystem let group_path = "/group"; diff --git a/examples/async_http_array_read.rs b/examples/async_http_array_read.rs index d8f2e54e..9564a01f 100644 --- a/examples/async_http_array_read.rs +++ b/examples/async_http_array_read.rs @@ -14,7 +14,11 @@ async fn http_array_read() -> Result<(), Box> { const ARRAY_PATH: &str = "/group/array"; // Create a HTTP store - let store = Arc::new(store::AsyncHTTPStore::new(HTTP_URL)?); + let store = Arc::new(store::AsyncObjectStore::new( + object_store::http::HttpBuilder::new() + .with_url(HTTP_URL) + .build()?, + )); let log_writer = Arc::new(std::sync::Mutex::new( // std::io::BufWriter::new( std::io::stdout(), diff --git a/src/group.rs b/src/group.rs index 7ae4e3f2..17e531a6 100644 --- a/src/group.rs +++ b/src/group.rs @@ -314,7 +314,9 @@ mod tests { #[cfg(feature = "async")] #[tokio::test] async fn group_metadata_write_read_async() { - let store = std::sync::Arc::new(crate::storage::store::AsyncMemoryStore::new()); + let store = std::sync::Arc::new(crate::storage::store::AsyncObjectStore::new( + object_store::memory::InMemory::new(), + )); let group_path = "/group"; let group = GroupBuilder::new() .build(store.clone(), group_path) diff --git a/src/lib.rs b/src/lib.rs index 6af7c84a..ecb0dbaf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,7 @@ //! - [x] [ZEP0003 - Variable chunking](https://zarr.dev/zeps/draft/ZEP0003.html) ([draft](https://github.com/orgs/zarr-developers/discussions/52)). //! - [x] Stores: //! - Sync: [filesystem](crate::storage::store::FilesystemStore), [in memory](crate::storage::store::MemoryStore), [HTTP](crate::storage::store::HTTPStore), [ZIP](crate::storage::storage_adapter::ZipStorageAdapter). -//! - Async: [filesystem](crate::storage::store::AsyncFilesystemStore), [in memory](crate::storage::store::AsyncMemoryStore), [HTTP](crate::storage::store::AsyncHTTPStore), [Google Cloud Storage](crate::storage::store::AsyncGoogleCloudStore), [Amazon S3](crate::storage::store::AsyncAmazonS3Store), [Microsoft Azure Storage](crate::storage::store::AsyncMicrosoftAzureStore). +//! - Async: [`AsyncObjectStore`](crate::storage::store::AsyncObjectStore) (supports all [`object_store::ObjectStore`] stores, e.g. in-memory, HTTP, Google Cloud Storage, Amazon S3, Microsoft Azure Storage, etc.). //! - [x] Data types: [core data types](crate::array::data_type::DataType), [raw bits](crate::array::data_type::DataType::RawBits), [float16](crate::array::data_type::DataType::Float16), [bfloat16](crate::array::data_type::DataType::BFloat16) [(spec issue)](https://github.com/zarr-developers/zarr-specs/issues/130). //! - [x] Chunk grids: [regular](crate::array::chunk_grid::RegularChunkGrid), [rectangular](crate::array::chunk_grid::RectangularChunkGrid) ([draft](https://github.com/orgs/zarr-developers/discussions/52)). //! - [x] Chunk key encoding: [default](crate::array::chunk_key_encoding::DefaultChunkKeyEncoding), [v2](crate::array::chunk_key_encoding::V2ChunkKeyEncoding). diff --git a/src/storage/store.rs b/src/storage/store.rs index 574fe0f0..ba1e95e0 100644 --- a/src/storage/store.rs +++ b/src/storage/store.rs @@ -10,27 +10,14 @@ mod store_async; mod store_sync; // mod store_plugin; -#[cfg(feature = "async")] -pub use store_async::filesystem_store::AsyncFilesystemStore; -#[cfg(feature = "async")] -pub use store_async::memory_store::AsyncMemoryStore; - pub use store_sync::filesystem_store::{FilesystemStore, FilesystemStoreCreateError}; pub use store_sync::memory_store::MemoryStore; -#[cfg(all(feature = "async", feature = "http"))] -pub use store_async::http_store::AsyncHTTPStore; #[cfg(feature = "http")] pub use store_sync::http_store::{HTTPStore, HTTPStoreCreateError}; -#[cfg(all(feature = "async", feature = "s3"))] -pub use store_async::amazon_s3_store::AsyncAmazonS3Store; - -#[cfg(all(feature = "async", feature = "gcp"))] -pub use store_async::google_cloud_store::AsyncGoogleCloudStore; - -#[cfg(all(feature = "async", feature = "azure"))] -pub use store_async::microsoft_azure_store::AsyncMicrosoftAzureStore; +#[cfg(all(feature = "async", feature = "object_store"))] +pub use store_async::object_store::AsyncObjectStore; // pub use store_plugin::{StorePlugin, StorePluginCreateError}; // Currently disabled. diff --git a/src/storage/store/store_async.rs b/src/storage/store/store_async.rs index a4c239be..df76dfbb 100644 --- a/src/storage/store/store_async.rs +++ b/src/storage/store/store_async.rs @@ -1,12 +1,2 @@ -pub mod filesystem_store; -pub mod memory_store; +#[cfg(feature = "object_store")] pub mod object_store; - -#[cfg(feature = "s3")] -pub mod amazon_s3_store; -#[cfg(feature = "gcp")] -pub mod google_cloud_store; -#[cfg(feature = "http")] -pub mod http_store; -#[cfg(feature = "azure")] -pub mod microsoft_azure_store; diff --git a/src/storage/store/store_async/amazon_s3_store.rs b/src/storage/store/store_async/amazon_s3_store.rs deleted file mode 100644 index cfcbe048..00000000 --- a/src/storage/store/store_async/amazon_s3_store.rs +++ /dev/null @@ -1,37 +0,0 @@ -//! Amazon S3 stores. - -use std::sync::Arc; - -use crate::{ - object_store_impl, - storage::store_lock::{AsyncDefaultStoreLocks, AsyncStoreLocks}, -}; - -/// An Amazon S3 store. -#[derive(Debug)] -pub struct AsyncAmazonS3Store { - object_store: object_store::aws::AmazonS3, - locks: AsyncStoreLocks, -} - -impl AsyncAmazonS3Store { - /// Create a new amazon S3 store. - #[must_use] - pub fn new(object_store: object_store::aws::AmazonS3) -> Self { - Self::new_with_locks(object_store, Arc::new(AsyncDefaultStoreLocks::default())) - } - - /// Create a new amazon S3 store with non-default store locks. - #[must_use] - pub fn new_with_locks( - object_store: object_store::aws::AmazonS3, - store_locks: AsyncStoreLocks, - ) -> Self { - Self { - object_store, - locks: store_locks, - } - } -} - -object_store_impl!(AsyncAmazonS3Store, object_store, locks); diff --git a/src/storage/store/store_async/filesystem_store.rs b/src/storage/store/store_async/filesystem_store.rs deleted file mode 100644 index a4d5879f..00000000 --- a/src/storage/store/store_async/filesystem_store.rs +++ /dev/null @@ -1,166 +0,0 @@ -//! A filesystem store. -//! -//! See . - -use crate::{ - object_store_impl, - storage::{ - store_lock::{AsyncDefaultStoreLocks, AsyncStoreLocks}, - StorageError, - }, -}; - -use std::{path::Path, sync::Arc}; - -// // Register the store. -// inventory::submit! { -// ReadableStorePlugin::new("file", |uri| Ok(Arc::new(create_store_filesystem(uri)?))) -// } -// inventory::submit! { -// WritableStorePlugin::new("file", |uri| Ok(Arc::new(create_store_filesystem(uri)?))) -// } -// inventory::submit! { -// ListableStorePlugin::new("file", |uri| Ok(Arc::new(create_store_filesystem(uri)?))) -// } -// inventory::submit! { -// ReadableWritableStorePlugin::new("file", |uri| Ok(Arc::new(create_store_filesystem(uri)?))) -// } - -// #[allow(clippy::similar_names)] -// fn create_store_filesystem(uri: &str) -> Result { -// let url = url::Url::parse(uri)?; -// let path = std::path::PathBuf::from(url.path()); -// AsyncFilesystemStore::new(path).map_err(|e| StorePluginCreateError::Other(e.to_string())) -// } - -/// A file system store. -/// -/// See . -#[derive(Debug)] -pub struct AsyncFilesystemStore { - object_store: object_store::local::LocalFileSystem, - locks: AsyncStoreLocks, - // sort: bool, -} - -impl AsyncFilesystemStore { - /// Create a new file system store at a given `base_path`. - /// - /// # Errors - /// Returns a [`StorageError`] if `base_directory`: - /// - is not valid, or - /// - it points to an existing file rather than a directory. - pub fn new>(base_path: P) -> Result { - Self::new_with_locks(base_path, Arc::new(AsyncDefaultStoreLocks::default())) - } - - /// Create a new file system store at a given `base_path` with non-default store locks. - /// - /// # Errors - /// Returns a [`StorageError`] if `base_directory`: - /// - is not valid, or - /// - it points to an existing file rather than a directory. - pub fn new_with_locks>( - base_path: P, - store_locks: AsyncStoreLocks, - ) -> Result { - let base_path = base_path.as_ref().to_path_buf(); - if base_path.to_str().is_none() { - return Err(StorageError::from(format!( - "invalid base path {base_path:?}" - ))); - } - - if !base_path.exists() { - // the path does not exist, so try and create it - std::fs::create_dir_all(&base_path).map_err(StorageError::IOError)?; - }; - - let object_store = object_store::local::LocalFileSystem::new_with_prefix(base_path)?; - Ok(Self { - object_store, - locks: store_locks, - }) - } -} - -object_store_impl!(AsyncFilesystemStore, object_store, locks); - -#[cfg(test)] -mod tests { - use crate::storage::{ - AsyncListableStorageTraits, AsyncReadableStorageTraits, AsyncWritableStorageTraits, - StoreKeyStartValue, StorePrefix, - }; - - use super::*; - use std::error::Error; - - #[tokio::test] - async fn filesystem_set() -> Result<(), Box> { - let path = tempfile::TempDir::new()?; - let store = AsyncFilesystemStore::new(path.path())?; - let key = "a/b".try_into()?; - store.set(&key, &[0, 1, 2]).await?; - assert_eq!(store.get(&key).await?.unwrap(), &[0, 1, 2]); - store - .set_partial_values(&[StoreKeyStartValue::new(key.clone(), 1, &[3, 4])]) - .await?; - assert_eq!(store.get(&key).await?.unwrap(), &[0, 3, 4]); - Ok(()) - } - - #[tokio::test] - async fn filesystem_list() -> Result<(), Box> { - let path = tempfile::TempDir::new()?; - let store = AsyncFilesystemStore::new(path.path())?; - - store.set(&"a/b".try_into()?, &[]).await?; - store.set(&"a/c".try_into()?, &[]).await?; - store.set(&"a/d/e".try_into()?, &[]).await?; - store.set(&"a/d/f".try_into()?, &[]).await?; - store.erase(&"a/d/e".try_into()?).await?; - assert_eq!( - store.list().await?, - &["a/b".try_into()?, "a/c".try_into()?, "a/d/f".try_into()?] - ); - assert_eq!( - store.list_prefix(&"a/".try_into()?).await?, - &["a/b".try_into()?, "a/c".try_into()?, "a/d/f".try_into()?] - ); - assert_eq!( - store.list_prefix(&"a/d/".try_into()?).await?, - &["a/d/f".try_into()?] - ); - assert_eq!( - store.list_prefix(&"".try_into()?).await?, - &["a/b".try_into()?, "a/c".try_into()?, "a/d/f".try_into()?] - ); - - // assert!(crate::storage::node_exists(&store, &"/a/b".try_into()?).await?); - // assert!(crate::storage::node_exists_listable(&store, &"/a/b".try_into()?).await?); - - Ok(()) - } - - #[tokio::test] - async fn filesystem_list_dir() -> Result<(), Box> { - let path = tempfile::TempDir::new()?; - let store = AsyncFilesystemStore::new(path.path())?; - store.set(&"a/b".try_into()?, &[]).await?; - store.set(&"a/c".try_into()?, &[]).await?; - store.set(&"a/d/e".try_into()?, &[]).await?; - store.set(&"a/f/g".try_into()?, &[]).await?; - store.set(&"a/f/h".try_into()?, &[]).await?; - store.set(&"b/c/d".try_into()?, &[]).await?; - - let list_dir = store.list_dir(&StorePrefix::new("a/")?).await?; - - assert_eq!(list_dir.keys(), &["a/b".try_into()?, "a/c".try_into()?,]); - assert_eq!( - list_dir.prefixes(), - &["a/d/".try_into()?, "a/f/".try_into()?,] - ); - Ok(()) - } -} diff --git a/src/storage/store/store_async/google_cloud_store.rs b/src/storage/store/store_async/google_cloud_store.rs deleted file mode 100644 index 57ff21c0..00000000 --- a/src/storage/store/store_async/google_cloud_store.rs +++ /dev/null @@ -1,37 +0,0 @@ -//! Google Cloud stores. - -use std::sync::Arc; - -use crate::{ - object_store_impl, - storage::store_lock::{AsyncDefaultStoreLocks, AsyncStoreLocks}, -}; - -/// A Google Cloud Storage store. -#[derive(Debug)] -pub struct AsyncGoogleCloudStore { - object_store: object_store::gcp::GoogleCloudStorage, - locks: AsyncStoreLocks, -} - -impl AsyncGoogleCloudStore { - /// Create a new amazon S3 store. - #[must_use] - pub fn new(object_store: object_store::gcp::GoogleCloudStorage) -> Self { - Self::new_with_locks(object_store, Arc::new(AsyncDefaultStoreLocks::default())) - } - - /// Create a new amazon S3 store with non-default store locks. - #[must_use] - pub fn new_with_locks( - object_store: object_store::gcp::GoogleCloudStorage, - store_locks: AsyncStoreLocks, - ) -> Self { - Self { - object_store, - locks: store_locks, - } - } -} - -object_store_impl!(AsyncGoogleCloudStore, object_store, locks); diff --git a/src/storage/store/store_async/http_store.rs b/src/storage/store/store_async/http_store.rs deleted file mode 100644 index 81cca782..00000000 --- a/src/storage/store/store_async/http_store.rs +++ /dev/null @@ -1,151 +0,0 @@ -//! A HTTP store. - -use std::sync::Arc; - -use crate::{ - object_store_impl, - storage::{ - store_lock::{AsyncDefaultStoreLocks, AsyncStoreLocks}, - StorageError, - }, -}; - -use object_store::http::{HttpBuilder, HttpStore}; - -/// A HTTP store. -#[derive(Debug)] -pub struct AsyncHTTPStore { - object_store: HttpStore, - locks: AsyncStoreLocks, -} - -impl AsyncHTTPStore { - /// Create a new HTTP store at a given `base_url`. - /// - /// # Errors - /// - /// Returns a [`StorageError`] if `base_url` is not valid. - pub fn new(base_url: &str) -> Result { - Self::new_with_locks(base_url, Arc::new(AsyncDefaultStoreLocks::default())) - } - - /// Create a new HTTP store at a given `base_url` with non-default store locks. - /// - /// # Errors - /// - /// Returns a [`StorageError`] if `base_url` is not valid. - pub fn new_with_locks( - base_url: &str, - store_locks: AsyncStoreLocks, - ) -> Result { - let object_store = HttpBuilder::new().with_url(base_url).build()?; - Ok(Self { - object_store, - locks: store_locks, - }) - } -} - -object_store_impl!(AsyncHTTPStore, object_store, locks); - -#[cfg(test)] -mod tests { - use crate::{ - array::{Array, DataType}, - node::NodePath, - storage::{meta_key, AsyncReadableStorageTraits}, - }; - - use super::*; - - const HTTP_TEST_PATH_REF: &str = - "https://raw.githubusercontent.com/LDeakin/zarrs/main/tests/data/hierarchy.zarr"; - const ARRAY_PATH_REF: &str = "/a/baz"; - - #[tokio::test] - async fn http_store_size() { - let store = AsyncHTTPStore::new(HTTP_TEST_PATH_REF).unwrap(); - let len = store - .size_key(&meta_key(&NodePath::new(ARRAY_PATH_REF).unwrap())) - .await - .unwrap(); - assert_eq!(len.unwrap(), 691); - } - - #[tokio::test] - async fn http_store_get() { - let store = AsyncHTTPStore::new(HTTP_TEST_PATH_REF).unwrap(); - let metadata = store - .get(&meta_key(&NodePath::new(ARRAY_PATH_REF).unwrap())) - .await - .unwrap() - .unwrap(); - let metadata: crate::array::ArrayMetadataV3 = serde_json::from_slice(&metadata).unwrap(); - assert_eq!(metadata.data_type.name(), "float64"); - } - - #[tokio::test] - async fn http_store_array() { - let store = AsyncHTTPStore::new(HTTP_TEST_PATH_REF).unwrap(); - let array = Array::async_new(store.into(), ARRAY_PATH_REF) - .await - .unwrap(); - assert_eq!(array.data_type(), &DataType::Float64); - } - - #[cfg(feature = "gzip")] - #[tokio::test] - async fn http_store_array_get() { - const HTTP_TEST_PATH: &str = - "https://raw.githubusercontent.com/LDeakin/zarrs/main/tests/data/array_write_read.zarr"; - const ARRAY_PATH: &str = "/group/array"; - - let store = AsyncHTTPStore::new(HTTP_TEST_PATH).unwrap(); - let array = Array::async_new(store.into(), ARRAY_PATH).await.unwrap(); - assert_eq!(array.data_type(), &DataType::Float32); - - // Read the central 4x2 subset of the array - let subset_4x2 = crate::array_subset::ArraySubset::new_with_ranges(&[2..6, 3..5]); // the center 4x2 region - let data_4x2 = array - .async_retrieve_array_subset_elements::(&subset_4x2) - .await - .unwrap(); - // assert_eq!(data_4x2, &[0.0, f32::NAN, 0.1, f32::NAN, 0.4, 0.5, 0.7, 0.8]); - assert_eq!(data_4x2[0], 0.0); - assert!(data_4x2[1].is_nan()); - assert_eq!(data_4x2[2], 0.1); - assert!(data_4x2[3].is_nan()); - assert_eq!(data_4x2[4], 0.4); - assert_eq!(data_4x2[5], 0.5); - assert_eq!(data_4x2[6], 0.7); - assert_eq!(data_4x2[7], 0.8); - - // let data = array.retrieve_array_subset_ndarray::(&ArraySubset::new_with_shape(array.shape().to_vec())).unwrap(); - // println!("{data:?}"); - } - - #[cfg(all(feature = "sharding", feature = "gzip", feature = "crc32c"))] - #[tokio::test] - async fn http_store_sharded_array_get() { - const HTTP_TEST_PATH_SHARDED: &str = - "https://raw.githubusercontent.com/LDeakin/zarrs/main/tests/data/sharded_array_write_read.zarr"; - const ARRAY_PATH_SHARDED: &str = "/group/array"; - - let store = AsyncHTTPStore::new(HTTP_TEST_PATH_SHARDED).unwrap(); - let array = Array::async_new(store.into(), ARRAY_PATH_SHARDED) - .await - .unwrap(); - assert_eq!(array.data_type(), &DataType::UInt16); - - // Read the central 4x2 subset of the array - let subset_4x2 = crate::array_subset::ArraySubset::new_with_ranges(&[2..6, 3..5]); // the center 4x2 region - let data_4x2 = array - .async_retrieve_array_subset_elements::(&subset_4x2) - .await - .unwrap(); - assert_eq!(data_4x2, [19, 20, 27, 28, 35, 36, 43, 44].into()); - - // let data = array.retrieve_array_subset_ndarray::(&ArraySubset::new_with_shape(array.shape().to_vec())).unwrap(); - // println!("{data:?}"); - } -} diff --git a/src/storage/store/store_async/memory_store.rs b/src/storage/store/store_async/memory_store.rs deleted file mode 100644 index 2d34d3b3..00000000 --- a/src/storage/store/store_async/memory_store.rs +++ /dev/null @@ -1,112 +0,0 @@ -//! An in-memory store. - -use std::sync::Arc; - -use crate::{ - object_store_impl, - storage::store_lock::{AsyncDefaultStoreLocks, AsyncStoreLocks}, -}; - -/// An in-memory store. -#[derive(Debug)] -pub struct AsyncMemoryStore { - object_store: object_store::memory::InMemory, - locks: AsyncStoreLocks, -} - -impl AsyncMemoryStore { - /// Create a new memory store at a given `base_directory`. - #[must_use] - pub fn new() -> Self { - Self::new_with_locks(Arc::new(AsyncDefaultStoreLocks::default())) - } - - /// Create a new memory store at a given `base_directory`. - #[must_use] - pub fn new_with_locks(store_locks: AsyncStoreLocks) -> Self { - Self { - object_store: object_store::memory::InMemory::new(), - locks: store_locks, - } - } -} - -impl Default for AsyncMemoryStore { - fn default() -> Self { - Self::new() - } -} - -object_store_impl!(AsyncMemoryStore, object_store, locks); - -#[cfg(test)] -mod tests { - use crate::storage::{ - AsyncListableStorageTraits, AsyncReadableStorageTraits, AsyncWritableStorageTraits, - StoreKeyStartValue, - }; - - use super::*; - use std::error::Error; - - #[tokio::test] - async fn memory_set() -> Result<(), Box> { - let store = AsyncMemoryStore::new(); - let key = "a/b".try_into()?; - store.set(&key, &[0, 1, 2]).await?; - assert_eq!(store.get(&key).await?.unwrap(), &[0, 1, 2]); - store - .set_partial_values(&[StoreKeyStartValue::new(key.clone(), 1, &[3, 4])]) - .await?; - assert_eq!(store.get(&key).await?.unwrap(), &[0, 3, 4]); - Ok(()) - } - - #[tokio::test] - async fn memory_list() -> Result<(), Box> { - let store = AsyncMemoryStore::new(); - - store.set(&"a/b".try_into()?, &[]).await?; - store.set(&"a/c".try_into()?, &[]).await?; - store.set(&"a/d/e".try_into()?, &[]).await?; - store.set(&"a/d/f".try_into()?, &[]).await?; - store.erase(&"a/d/e".try_into()?).await?; - assert_eq!( - store.list().await?, - &["a/b".try_into()?, "a/c".try_into()?, "a/d/f".try_into()?] - ); - assert_eq!( - store.list_prefix(&"a/".try_into()?).await?, - &["a/b".try_into()?, "a/c".try_into()?, "a/d/f".try_into()?] - ); - assert_eq!( - store.list_prefix(&"a/d/".try_into()?).await?, - &["a/d/f".try_into()?] - ); - assert_eq!( - store.list_prefix(&"".try_into()?).await?, - &["a/b".try_into()?, "a/c".try_into()?, "a/d/f".try_into()?] - ); - Ok(()) - } - - #[tokio::test] - async fn memory_list_dir() -> Result<(), Box> { - let store = AsyncMemoryStore::new(); - store.set(&"a/b".try_into()?, &[]).await?; - store.set(&"a/c".try_into()?, &[]).await?; - store.set(&"a/d/e".try_into()?, &[]).await?; - store.set(&"a/f/g".try_into()?, &[]).await?; - store.set(&"a/f/h".try_into()?, &[]).await?; - store.set(&"b/c/d".try_into()?, &[]).await?; - - let list_dir = store.list_dir(&"a/".try_into()?).await?; - - assert_eq!(list_dir.keys(), &["a/b".try_into()?, "a/c".try_into()?,]); - assert_eq!( - list_dir.prefixes(), - &["a/d/".try_into()?, "a/f/".try_into()?,] - ); - Ok(()) - } -} diff --git a/src/storage/store/store_async/microsoft_azure_store.rs b/src/storage/store/store_async/microsoft_azure_store.rs deleted file mode 100644 index 61d73df2..00000000 --- a/src/storage/store/store_async/microsoft_azure_store.rs +++ /dev/null @@ -1,37 +0,0 @@ -//! Azure blob storage stores. - -use std::sync::Arc; - -use crate::{ - object_store_impl, - storage::store_lock::{AsyncDefaultStoreLocks, AsyncStoreLocks}, -}; - -/// A Microsoft Azure store. -#[derive(Debug)] -pub struct AsyncMicrosoftAzureStore { - object_store: object_store::azure::MicrosoftAzure, - locks: AsyncStoreLocks, -} - -impl AsyncMicrosoftAzureStore { - /// Create a new amazon S3 store. - #[must_use] - pub fn new(object_store: object_store::azure::MicrosoftAzure) -> Self { - Self::new_with_locks(object_store, Arc::new(AsyncDefaultStoreLocks::default())) - } - - /// Create a new amazon S3 store. - #[must_use] - pub fn new_with_locks( - object_store: object_store::azure::MicrosoftAzure, - store_locks: AsyncStoreLocks, - ) -> Self { - Self { - object_store, - locks: store_locks, - } - } -} - -object_store_impl!(AsyncMicrosoftAzureStore, object_store, locks); diff --git a/src/storage/store/store_async/object_store.rs b/src/storage/store/store_async/object_store.rs index d46ad1a6..a75be0ad 100644 --- a/src/storage/store/store_async/object_store.rs +++ b/src/storage/store/store_async/object_store.rs @@ -1,13 +1,16 @@ +use std::sync::Arc; + use futures::{StreamExt, TryStreamExt}; -use object_store::{path::Path, ObjectStore}; +use object_store::path::Path; use crate::{ array::MaybeBytes, byte_range::ByteRange, storage::{ - AsyncListableStorageTraits, AsyncReadableStorageTraits, AsyncWritableStorageTraits, - StorageError, StoreKey, StoreKeyRange, StoreKeyStartValue, StoreKeys, StoreKeysPrefixes, - StorePrefix, + store_lock::{AsyncDefaultStoreLocks, AsyncStoreKeyMutex, AsyncStoreLocks}, + AsyncListableStorageTraits, AsyncReadableStorageTraits, AsyncReadableWritableStorageTraits, + AsyncWritableStorageTraits, StorageError, StoreKey, StoreKeyRange, StoreKeyStartValue, + StoreKeys, StoreKeysPrefixes, StorePrefix, }, }; @@ -36,10 +39,33 @@ fn handle_result(result: Result) -> Result, } } -#[cfg_attr(feature = "async", async_trait::async_trait)] -impl AsyncReadableStorageTraits for T { +/// An asynchronous store backed by an [`object_store::ObjectStore`]. +pub struct AsyncObjectStore { + object_store: T, + locks: AsyncStoreLocks, +} + +impl AsyncObjectStore { + /// Create a new [`AsyncObjectStore`]. + #[must_use] + pub fn new(object_store: T) -> Self { + Self::new_with_locks(object_store, Arc::new(AsyncDefaultStoreLocks::default())) + } + + /// Create a new [`AsyncObjectStore`] with non-default store locks. + #[must_use] + pub fn new_with_locks(object_store: T, store_locks: AsyncStoreLocks) -> Self { + Self { + object_store, + locks: store_locks, + } + } +} + +#[async_trait::async_trait] +impl AsyncReadableStorageTraits for AsyncObjectStore { async fn get(&self, key: &StoreKey) -> Result { - let get = handle_result(ObjectStore::get(self, &key_to_path(key)).await)?; + let get = handle_result(self.object_store.get(&key_to_path(key)).await)?; if let Some(get) = get { let bytes = get.bytes().await?; Ok(Some(bytes.to_vec())) @@ -60,7 +86,10 @@ impl AsyncReadableStorageTraits for T { .iter() .map(|byte_range| byte_range.to_range_usize(size)) .collect::>(); - let get_ranges = self.get_ranges(&key_to_path(key), &ranges).await; + let get_ranges = self + .object_store + .get_ranges(&key_to_path(key), &ranges) + .await; match get_ranges { Ok(get_ranges) => Ok(Some( get_ranges.iter().map(|bytes| bytes.to_vec()).collect(), @@ -84,7 +113,7 @@ impl AsyncReadableStorageTraits for T { async fn size_prefix(&self, prefix: &StorePrefix) -> Result { let prefix: object_store::path::Path = prefix.as_str().into(); - let mut locations = ObjectStore::list(self, Some(&prefix)); + let mut locations = self.object_store.list(Some(&prefix)); let mut size = 0; while let Some(item) = locations.next().await { let meta = item?; @@ -94,11 +123,14 @@ impl AsyncReadableStorageTraits for T { } async fn size_key(&self, key: &StoreKey) -> Result, StorageError> { - Ok(handle_result(self.head(&key_to_path(key)).await)?.map(|meta| meta.size as u64)) + Ok( + handle_result(self.object_store.head(&key_to_path(key)).await)? + .map(|meta| meta.size as u64), + ) } async fn size(&self) -> Result { - let mut locations = ObjectStore::list(self, None); + let mut locations = self.object_store.list(None); let mut size = 0; while let Some(item) = locations.next().await { let meta = item?; @@ -108,12 +140,12 @@ impl AsyncReadableStorageTraits for T { } } -#[cfg_attr(feature = "async", async_trait::async_trait)] -impl AsyncWritableStorageTraits for T { +#[async_trait::async_trait] +impl AsyncWritableStorageTraits for AsyncObjectStore { async fn set(&self, key: &StoreKey, value: &[u8]) -> Result<(), StorageError> { // FIXME: Can this copy be avoided? let bytes = bytes::Bytes::copy_from_slice(value); - ObjectStore::put(self, &key_to_path(key), bytes).await?; + self.object_store.put(&key_to_path(key), bytes).await?; Ok(()) } @@ -126,25 +158,37 @@ impl AsyncWritableStorageTraits for T { } async fn erase(&self, key: &StoreKey) -> Result { - Ok(handle_result(ObjectStore::delete(self, &key_to_path(key)).await)?.is_some()) + Ok(handle_result(self.object_store.delete(&key_to_path(key)).await)?.is_some()) } async fn erase_prefix(&self, prefix: &StorePrefix) -> Result { let prefix: object_store::path::Path = prefix.as_str().into(); - let locations = ObjectStore::list(self, Some(&prefix)) + let locations = self + .object_store + .list(Some(&prefix)) .map_ok(|m| m.location) .boxed(); - ObjectStore::delete_stream(self, locations) + self.object_store + .delete_stream(locations) .try_collect::>() .await?; Ok(true) } } -#[cfg_attr(feature = "async", async_trait::async_trait)] -impl AsyncListableStorageTraits for T { +#[async_trait::async_trait] +impl AsyncReadableWritableStorageTraits for AsyncObjectStore { + async fn mutex(&self, key: &StoreKey) -> Result { + Ok(self.locks.mutex(key).await) + } +} + +#[async_trait::async_trait] +impl AsyncListableStorageTraits for AsyncObjectStore { async fn list(&self) -> Result { - let mut list = ObjectStore::list(self, None) + let mut list = self + .object_store + .list(None) .collect::>() .await .into_iter() @@ -162,7 +206,9 @@ impl AsyncListableStorageTraits for T { async fn list_prefix(&self, prefix: &StorePrefix) -> Result { // TODO: Check if this is outputting everything under prefix, or just one level under let path: object_store::path::Path = prefix.as_str().into(); - let mut list = ObjectStore::list(self, Some(&path)) + let mut list = self + .object_store + .list(Some(&path)) .collect::>() .await .into_iter() @@ -179,7 +225,7 @@ impl AsyncListableStorageTraits for T { async fn list_dir(&self, prefix: &StorePrefix) -> Result { let path: object_store::path::Path = prefix.as_str().into(); - let list_result = ObjectStore::list_with_delimiter(self, Some(&path)).await?; + let list_result = self.object_store.list_with_delimiter(Some(&path)).await?; let mut prefixes = list_result .common_prefixes .iter() @@ -201,145 +247,3 @@ impl AsyncListableStorageTraits for T { Ok(StoreKeysPrefixes { keys, prefixes }) } } - -/// Implement the storage traits for an object store -#[macro_export] -macro_rules! object_store_impl { - ($store:ty, $object_store:ident, $locks:ident) => { - #[cfg_attr(feature = "async", async_trait::async_trait)] - impl $crate::storage::AsyncReadableStorageTraits for $store { - async fn get( - &self, - key: &$crate::storage::StoreKey, - ) -> Result<$crate::array::MaybeBytes, $crate::storage::StorageError> { - $crate::storage::AsyncReadableStorageTraits::get(&self.$object_store, key).await - } - - async fn get_partial_values_key( - &self, - key: &$crate::storage::StoreKey, - byte_ranges: &[$crate::storage::ByteRange], - ) -> Result>>, $crate::storage::StorageError> { - $crate::storage::AsyncReadableStorageTraits::get_partial_values_key( - &self.$object_store, - key, - byte_ranges, - ) - .await - } - - async fn get_partial_values( - &self, - key_ranges: &[$crate::storage::StoreKeyRange], - ) -> Result, $crate::storage::StorageError> { - $crate::storage::AsyncReadableStorageTraits::get_partial_values( - &self.object_store, - key_ranges, - ) - .await - } - - async fn size_prefix( - &self, - prefix: &$crate::storage::StorePrefix, - ) -> Result { - $crate::storage::AsyncReadableStorageTraits::size_prefix( - &self.$object_store, - prefix, - ) - .await - } - - async fn size_key( - &self, - key: &$crate::storage::StoreKey, - ) -> Result, $crate::storage::StorageError> { - $crate::storage::AsyncReadableStorageTraits::size_key(&self.$object_store, key) - .await - } - - async fn size(&self) -> Result { - $crate::storage::AsyncReadableStorageTraits::size(&self.$object_store).await - } - } - - #[cfg_attr(feature = "async", async_trait::async_trait)] - impl $crate::storage::AsyncWritableStorageTraits for $store { - async fn set( - &self, - key: &$crate::storage::StoreKey, - value: &[u8], - ) -> Result<(), $crate::storage::StorageError> { - $crate::storage::AsyncWritableStorageTraits::set(&self.$object_store, key, value) - .await - } - - async fn set_partial_values( - &self, - key_start_values: &[$crate::storage::StoreKeyStartValue], - ) -> Result<(), $crate::storage::StorageError> { - $crate::storage::async_store_set_partial_values(self, key_start_values).await - } - - async fn erase( - &self, - key: &$crate::storage::StoreKey, - ) -> Result { - $crate::storage::AsyncWritableStorageTraits::erase(&self.$object_store, key).await - } - - async fn erase_prefix( - &self, - prefix: &$crate::storage::StorePrefix, - ) -> Result { - $crate::storage::AsyncWritableStorageTraits::erase_prefix( - &self.$object_store, - prefix, - ) - .await - } - } - - #[cfg_attr(feature = "async", async_trait::async_trait)] - impl $crate::storage::AsyncReadableWritableStorageTraits for $store { - async fn mutex( - &self, - key: &$crate::storage::StoreKey, - ) -> Result< - $crate::storage::store_lock::AsyncStoreKeyMutex, - $crate::storage::StorageError, - > { - let mutex = self.$locks.mutex(key).await; - Ok(mutex) - } - } - - #[cfg_attr(feature = "async", async_trait::async_trait)] - impl $crate::storage::AsyncListableStorageTraits for $store { - async fn list( - &self, - ) -> Result<$crate::storage::StoreKeys, $crate::storage::StorageError> { - $crate::storage::AsyncListableStorageTraits::list(&self.$object_store).await - } - - async fn list_prefix( - &self, - prefix: &$crate::storage::StorePrefix, - ) -> Result<$crate::storage::StoreKeys, $crate::storage::StorageError> { - $crate::storage::AsyncListableStorageTraits::list_prefix( - &self.$object_store, - prefix, - ) - .await - } - - async fn list_dir( - &self, - prefix: &$crate::storage::StorePrefix, - ) -> Result<$crate::storage::StoreKeysPrefixes, $crate::storage::StorageError> { - $crate::storage::AsyncListableStorageTraits::list_dir(&self.$object_store, prefix) - .await - } - } - }; -}