Skip to content

Commit

Permalink
Make various Array methods with parallel parameter pub
Browse files Browse the repository at this point in the history
  • Loading branch information
LDeakin committed Dec 26, 2023
1 parent 72bcbbb commit 6a3618d
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 168 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Locking functionality for arrays is moved into stores
- Improved `Array` documentation
- Add store testing utility functions for unified store testing
- Make various `Array` methods with `parallel` parameter `pub`

### Fixed
- Fixed `MemoryStore::get_partial_values_key` if given an invalid byte range, now returns `InvalidByteRangeError` instead of panicking
Expand Down
186 changes: 101 additions & 85 deletions src/array/array_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,21 @@ impl<TStorage: ?Sized + AsyncReadableStorageTraits> Array<TStorage> {
Ok(())
}

/// Read and decode the `array_subset` of array into its bytes.
///
/// Out-of-bounds elements will have the fill value.
/// If `parallel` is true, chunks intersecting the array subset are retrieved in parallel.
///
/// # Errors
/// Returns an [`ArrayError`] if:
/// - the `array_subset` dimensionality does not match the chunk grid dimensionality,
/// - there is a codec decoding error, or
/// - an underlying store error.
///
/// # Panics
/// Panics if attempting to reference a byte beyond `usize::MAX`.
#[allow(clippy::too_many_lines)]
async fn _async_retrieve_array_subset(
pub async fn async_retrieve_array_subset_opt(
&self,
array_subset: &ArraySubset,
parallel: bool,
Expand Down Expand Up @@ -398,36 +411,38 @@ impl<TStorage: ?Sized + AsyncReadableStorageTraits> Array<TStorage> {
}
}

/// Read and decode the `array_subset` of array into its bytes.
///
/// Out-of-bounds elements will have the fill value.
/// If `parallel` is true, chunks intersecting the array subset are retrieved in parallel.
///
/// # Errors
/// Returns an [`ArrayError`] if:
/// - the `array_subset` dimensionality does not match the chunk grid dimensionality,
/// - there is a codec decoding error, or
/// - an underlying store error.
///
/// # Panics
/// Panics if attempting to reference a byte beyond `usize::MAX`.
/// Serial version of [`Array::retrieve_array_subset_opt`].
#[allow(clippy::missing_panics_doc, clippy::missing_errors_doc)]
pub async fn async_retrieve_array_subset(
&self,
array_subset: &ArraySubset,
) -> Result<Box<[u8]>, ArrayError> {
self._async_retrieve_array_subset(array_subset, false).await
self.async_retrieve_array_subset_opt(array_subset, false)
.await
}

/// Parallel version of [`Array::retrieve_array_subset`].
/// Parallel version of [`Array::retrieve_array_subset_opt`].
#[allow(clippy::missing_panics_doc, clippy::missing_errors_doc)]
pub async fn async_par_retrieve_array_subset(
&self,
array_subset: &ArraySubset,
) -> Result<Box<[u8]>, ArrayError> {
self._async_retrieve_array_subset(array_subset, true).await
self.async_retrieve_array_subset_opt(array_subset, true)
.await
}

async fn _async_retrieve_array_subset_elements<T: TriviallyTransmutable + Send + Sync>(
/// Read and decode the `array_subset` of array into a vector of its elements.
///
/// # Errors
/// Returns an [`ArrayError`] if:
/// - the size of `T` does not match the data type size,
/// - the decoded bytes cannot be transmuted,
/// - an array subset is invalid or out of bounds of the array,
/// - there is a codec decoding error, or
/// - an underlying store error.
pub async fn async_retrieve_array_subset_elements_opt<
T: TriviallyTransmutable + Send + Sync,
>(
&self,
array_subset: &ArraySubset,
parallel: bool,
Expand All @@ -440,7 +455,7 @@ impl<TStorage: ?Sized + AsyncReadableStorageTraits> Array<TStorage> {
}

let bytes = self
._async_retrieve_array_subset(array_subset, parallel)
.async_retrieve_array_subset_opt(array_subset, parallel)
.await?;
if safe_transmute::align::check_alignment::<_, T>(&bytes).is_ok() {
// no-copy path
Expand All @@ -462,37 +477,40 @@ impl<TStorage: ?Sized + AsyncReadableStorageTraits> Array<TStorage> {
}
}

/// Read and decode the `array_subset` of array into a vector of its elements.
///
/// # Errors
/// Returns an [`ArrayError`] if:
/// - the size of `T` does not match the data type size,
/// - the decoded bytes cannot be transmuted,
/// - an array subset is invalid or out of bounds of the array,
/// - there is a codec decoding error, or
/// - an underlying store error.
/// Serial version of [`Array::retrieve_array_subset_elements_opt`].
#[allow(clippy::missing_panics_doc, clippy::missing_errors_doc)]
pub async fn async_retrieve_array_subset_elements<T: TriviallyTransmutable + Send + Sync>(
&self,
array_subset: &ArraySubset,
) -> Result<Box<[T]>, ArrayError> {
self._async_retrieve_array_subset_elements(array_subset, false)
self.async_retrieve_array_subset_elements_opt(array_subset, false)
.await
}

/// Parallel version of [`Array::retrieve_array_subset_elements`].
/// Parallel version of [`Array::retrieve_array_subset_elements_opt`].
#[allow(clippy::missing_panics_doc, clippy::missing_errors_doc)]
pub async fn async_par_retrieve_array_subset_elements<
T: TriviallyTransmutable + Send + Sync,
>(
&self,
array_subset: &ArraySubset,
) -> Result<Box<[T]>, ArrayError> {
self._async_retrieve_array_subset_elements(array_subset, true)
self.async_retrieve_array_subset_elements_opt(array_subset, true)
.await
}

#[cfg(feature = "ndarray")]
async fn _async_retrieve_array_subset_ndarray<
/// Read and decode the `array_subset` of array into an [`ndarray::ArrayD`].
///
/// # Errors
/// Returns an [`ArrayError`] if:
/// - an array subset is invalid or out of bounds of the array,
/// - there is a codec decoding error, or
/// - an underlying store error.
///
/// # Panics
/// Will panic if any dimension in `chunk_subset` is `usize::MAX` or larger.
pub async fn async_retrieve_array_subset_ndarray_opt<
T: safe_transmute::TriviallyTransmutable + Send + Sync,
>(
&self,
Expand All @@ -509,7 +527,7 @@ impl<TStorage: ?Sized + AsyncReadableStorageTraits> Array<TStorage> {
}

let elements = self
._async_retrieve_array_subset_elements(array_subset, parallel)
.async_retrieve_array_subset_elements_opt(array_subset, parallel)
.await?;
let length = elements.len();
ndarray::ArrayD::<T>::from_shape_vec(
Expand All @@ -525,36 +543,28 @@ impl<TStorage: ?Sized + AsyncReadableStorageTraits> Array<TStorage> {
}

#[cfg(feature = "ndarray")]
/// Read and decode the `array_subset` of array into an [`ndarray::ArrayD`].
///
/// # Errors
/// Returns an [`ArrayError`] if:
/// - an array subset is invalid or out of bounds of the array,
/// - there is a codec decoding error, or
/// - an underlying store error.
///
/// # Panics
/// Will panic if any dimension in `chunk_subset` is `usize::MAX` or larger.
/// Serial version of [`Array::retrieve_array_subset_ndarray_opt`].
#[allow(clippy::missing_panics_doc, clippy::missing_errors_doc)]
pub async fn async_retrieve_array_subset_ndarray<
T: safe_transmute::TriviallyTransmutable + Send + Sync,
>(
&self,
array_subset: &ArraySubset,
) -> Result<ndarray::ArrayD<T>, ArrayError> {
self._async_retrieve_array_subset_ndarray(array_subset, false)
self.async_retrieve_array_subset_ndarray_opt(array_subset, false)
.await
}

#[cfg(feature = "ndarray")]
/// Parallel version of [`Array::retrieve_array_subset_ndarray`].
/// Parallel version of [`Array::retrieve_array_subset_ndarray_opt`].
#[allow(clippy::missing_panics_doc, clippy::missing_errors_doc)]
pub async fn async_par_retrieve_array_subset_ndarray<
T: safe_transmute::TriviallyTransmutable + Send + Sync,
>(
&self,
array_subset: &ArraySubset,
) -> Result<ndarray::ArrayD<T>, ArrayError> {
self._async_retrieve_array_subset_ndarray(array_subset, true)
self.async_retrieve_array_subset_ndarray_opt(array_subset, true)
.await
}

Expand Down Expand Up @@ -887,8 +897,19 @@ impl<TStorage: ?Sized + AsyncWritableStorageTraits> Array<TStorage> {
}

impl<TStorage: ?Sized + AsyncReadableWritableStorageTraits> Array<TStorage> {
#[allow(clippy::too_many_lines)]
async fn _async_store_array_subset(
/// Encode `subset_bytes` and store in `array_subset`.
///
/// If `parallel` is true, chunks intersecting the array subset are retrieved in parallel.
/// Prefer to use [`store_chunk`](Array<WritableStorageTraits>::store_chunk) since this will decode and encode each chunk intersecting `array_subset`.
///
/// # Errors
/// Returns an [`ArrayError`] if
/// - the dimensionality of `array_subset` does not match the chunk grid dimensionality
/// - the length of `subset_bytes` does not match the expected length governed by the shape of the array subset and the data type size,
/// - there is a codec encoding error, or
/// - an underlying store error.
#[allow(clippy::missing_panics_doc, clippy::too_many_lines)]
pub async fn async_store_array_subset_opt(
&self,
array_subset: &ArraySubset,
subset_bytes: Vec<u8>,
Expand Down Expand Up @@ -1029,38 +1050,37 @@ impl<TStorage: ?Sized + AsyncReadableWritableStorageTraits> Array<TStorage> {
Ok(())
}

/// Encode `subset_bytes` and store in `array_subset`.
///
/// If `parallel` is true, chunks intersecting the array subset are retrieved in parallel.
/// Prefer to use [`store_chunk`](Array<WritableStorageTraits>::store_chunk) since this will decode and encode each chunk intersecting `array_subset`.
///
/// # Errors
/// Returns an [`ArrayError`] if
/// - the dimensionality of `array_subset` does not match the chunk grid dimensionality
/// - the length of `subset_bytes` does not match the expected length governed by the shape of the array subset and the data type size,
/// - there is a codec encoding error, or
/// - an underlying store error.
/// Serial version of [`Array::store_array_subset_opt`].
#[allow(clippy::missing_panics_doc, clippy::missing_errors_doc)]
pub async fn async_store_array_subset(
&self,
array_subset: &ArraySubset,
subset_bytes: Vec<u8>,
) -> Result<(), ArrayError> {
self._async_store_array_subset(array_subset, subset_bytes, false)
self.async_store_array_subset_opt(array_subset, subset_bytes, false)
.await
}

/// Parallel version of [`Array::store_array_subset`].
/// Parallel version of [`Array::store_array_subset_opt`].
#[allow(clippy::missing_panics_doc, clippy::missing_errors_doc)]
pub async fn async_par_store_array_subset(
&self,
array_subset: &ArraySubset,
subset_bytes: Vec<u8>,
) -> Result<(), ArrayError> {
self._async_store_array_subset(array_subset, subset_bytes, true)
self.async_store_array_subset_opt(array_subset, subset_bytes, true)
.await
}

async fn _async_store_array_subset_elements<T: TriviallyTransmutable + Send>(
/// Encode `subset_elements` and store in `array_subset`.
///
/// Prefer to use [`store_chunk`](Array<WritableStorageTraits>::store_chunk) since this will decode and encode each chunk intersecting `array_subset`.
///
/// # Errors
/// Returns an [`ArrayError`] if
/// - the size of `T` does not match the data type size, or
/// - a [`store_array_subset`](Array::store_array_subset) error condition is met.
pub async fn async_store_array_subset_elements_opt<T: TriviallyTransmutable + Send>(
&self,
array_subset: &ArraySubset,
subset_elements: Vec<T>,
Expand All @@ -1074,40 +1094,39 @@ impl<TStorage: ?Sized + AsyncReadableWritableStorageTraits> Array<TStorage> {
}

let subset_bytes = safe_transmute_to_bytes_vec(subset_elements);
self._async_store_array_subset(array_subset, subset_bytes, parallel)
self.async_store_array_subset_opt(array_subset, subset_bytes, parallel)
.await
}

/// Encode `subset_elements` and store in `array_subset`.
///
/// Prefer to use [`store_chunk`](Array<WritableStorageTraits>::store_chunk) since this will decode and encode each chunk intersecting `array_subset`.
///
/// # Errors
/// Returns an [`ArrayError`] if
/// - the size of `T` does not match the data type size, or
/// - a [`store_array_subset`](Array::store_array_subset) error condition is met.
/// Serial version of [`Array::store_array_subset_elements_opt`].
#[allow(clippy::missing_panics_doc, clippy::missing_errors_doc)]
pub async fn async_store_array_subset_elements<T: TriviallyTransmutable + Send>(
&self,
array_subset: &ArraySubset,
subset_elements: Vec<T>,
) -> Result<(), ArrayError> {
self._async_store_array_subset_elements(array_subset, subset_elements, false)
self.async_store_array_subset_elements_opt(array_subset, subset_elements, false)
.await
}

/// Parallel version of [`Array::store_array_subset_elements`].
/// Parallel version of [`Array::store_array_subset_elements_opt`].
#[allow(clippy::missing_panics_doc, clippy::missing_errors_doc)]
pub async fn async_par_store_array_subset_elements<T: TriviallyTransmutable + Send>(
&self,
array_subset: &ArraySubset,
subset_elements: Vec<T>,
) -> Result<(), ArrayError> {
self._async_store_array_subset_elements(array_subset, subset_elements, true)
self.async_store_array_subset_elements_opt(array_subset, subset_elements, true)
.await
}

#[cfg(feature = "ndarray")]
async fn _async_store_array_subset_ndarray<
/// Encode `subset_array` and store in the array subset starting at `subset_start`.
///
/// # Errors
/// Returns an [`ArrayError`] if a [`store_array_subset_elements`](Array::store_array_subset_elements) error condition is met.
#[allow(clippy::missing_panics_doc)]
pub async fn async_store_array_subset_ndarray_opt<
T: safe_transmute::TriviallyTransmutable + Send + Sync,
>(
&self,
Expand Down Expand Up @@ -1137,29 +1156,26 @@ impl<TStorage: ?Sized + AsyncReadableWritableStorageTraits> Array<TStorage> {
};
let array_standard = subset_array.as_standard_layout();
let elements = array_standard.into_owned().into_raw_vec();
self._async_store_array_subset_elements(&subset, elements, parallel)
self.async_store_array_subset_elements_opt(&subset, elements, parallel)
.await
}

#[cfg(feature = "ndarray")]
/// Encode `subset_array` and store in the array subset starting at `subset_start`.
///
/// # Errors
/// Returns an [`ArrayError`] if a [`store_array_subset_elements`](Array::store_array_subset_elements) error condition is met.
#[allow(clippy::missing_panics_doc)]
/// Serial version of [`Array::store_array_subset_ndarray_opt`].
#[allow(clippy::missing_panics_doc, clippy::missing_errors_doc)]
pub async fn async_store_array_subset_ndarray<
T: safe_transmute::TriviallyTransmutable + Send + Sync,
>(
&self,
subset_start: &[u64],
subset_array: &ndarray::ArrayViewD<'_, T>,
) -> Result<(), ArrayError> {
self._async_store_array_subset_ndarray(subset_start, subset_array, false)
self.async_store_array_subset_ndarray_opt(subset_start, subset_array, false)
.await
}

#[cfg(feature = "ndarray")]
/// Parallel version of [`Array::store_array_subset_ndarray`].
/// Parallel version of [`Array::store_array_subset_ndarray_opt`].
#[allow(clippy::missing_panics_doc, clippy::missing_errors_doc)]
pub async fn async_par_store_array_subset_ndarray<
T: safe_transmute::TriviallyTransmutable + Send + Sync,
Expand All @@ -1168,7 +1184,7 @@ impl<TStorage: ?Sized + AsyncReadableWritableStorageTraits> Array<TStorage> {
subset_start: &[u64],
subset_array: &ndarray::ArrayViewD<'_, T>,
) -> Result<(), ArrayError> {
self._async_store_array_subset_ndarray(subset_start, subset_array, true)
self.async_store_array_subset_ndarray_opt(subset_start, subset_array, true)
.await
}

Expand Down
Loading

0 comments on commit 6a3618d

Please sign in to comment.