diff --git a/CHANGELOG.md b/CHANGELOG.md index 06101639..3ab8643d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added `codec::{Encode,Decode,PartialDecode,PartialDecoder}Options` - Added new `Array::opt` methods which can use new encode/decode options - **Breaking** Existing `Array` `_opt` use new encode/decode options insted of `parallel: bool` + - Implement `DoubleEndedIterator` for `{Indices,LinearisedIndices,ContiguousIndices,ContiguousLinearisedIndicesIterator}Iterator` + - Add `ParIndicesIterator` and `ParChunksIterator` ### Changed - Dependency bumps @@ -43,6 +45,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **Breaking**: `_opt` variants use new `codec::{Encode,Decode,PartialDecode,PartialDecoder}Options` instead of `parallel: bool` - variants without prefix/suffix are no longer serial variants but parallel - TODO: Remove these? + - **Major breaking**: refactor array subset iterators: + - `ArraySubset::iter_` methods no longer have an `iter_` prefix and return structures implementing `IntoIterator` including + - `Indices`, `LinearisedIndices`, `ContiguousIndices`, `ContiguousLinearisedIndices`, `Chunks` + - `Indices` and `Chunks` implement `IntoParIter` + - **Breaking**: array subset iterators are moved into public `array_subset::iterators` and no longer in the `array_subset` namespace ### Removed - **Breaking**: remove `InvalidArraySubsetError` and `ArrayExtractElementsError` diff --git a/benches/array_subset.rs b/benches/array_subset.rs index 69668a12..59a5be8b 100644 --- a/benches/array_subset.rs +++ b/benches/array_subset.rs @@ -14,7 +14,7 @@ fn array_subset_indices_iterator(c: &mut Criterion) { group.throughput(Throughput::Elements(array_subset.num_elements())); group.bench_function(BenchmarkId::new("size", array_subset_size), |b| { b.iter(|| { - array_subset.iter_indices().for_each(|indices| { + array_subset.indices().into_iter().for_each(|indices| { black_box(indices.first().unwrap()); }) }); diff --git a/src/array/array_async_readable.rs b/src/array/array_async_readable.rs index 8326fac1..63353b57 100644 --- a/src/array/array_async_readable.rs +++ b/src/array/array_async_readable.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use futures::{stream::FuturesUnordered, StreamExt}; -use itertools::Itertools; use crate::{ array_subset::ArraySubset, @@ -301,7 +300,7 @@ impl Array { )); } - let array_subset = self.chunks_subset(chunks)?; + let array_subset = Arc::new(self.chunks_subset(chunks)?); // Retrieve chunk bytes let num_chunks = chunks.num_elements(); @@ -321,16 +320,20 @@ impl Array { std::slice::from_raw_parts_mut(output.as_mut_ptr().cast::(), size_output) }; let output_slice = UnsafeCellSlice::new(output_slice); - let indices = chunks.iter_indices().collect_vec(); - let mut futures = indices - .iter() + let mut futures = chunks + .indices() + .into_iter() .map(|chunk_indices| { - self._async_decode_chunk_into_array_subset( - chunk_indices, - &array_subset, - unsafe { output_slice.get() }, - options, - ) + let array_subset = array_subset.clone(); + async move { + self._async_decode_chunk_into_array_subset( + &chunk_indices, + &array_subset, + unsafe { output_slice.get() }, + options, + ) + .await + } }) .collect::>(); while let Some(item) = futures.next().await { @@ -437,12 +440,14 @@ impl Array { let chunk_subset_in_array_subset = unsafe { overlap.relative_to_unchecked(array_subset.start()) }; let mut decoded_offset = 0; - for (array_subset_element_index, num_elements) in unsafe { + let contiguous_indices = unsafe { chunk_subset_in_array_subset - .iter_contiguous_linearised_indices_unchecked(array_subset.shape()) - } { + .contiguous_linearised_indices_unchecked(array_subset.shape()) + }; + let length = + usize::try_from(contiguous_indices.contiguous_elements() * element_size).unwrap(); + for (array_subset_element_index, _num_elements) in &contiguous_indices { let output_offset = usize::try_from(array_subset_element_index * element_size).unwrap(); - let length = usize::try_from(num_elements * element_size).unwrap(); debug_assert!((output_offset + length) <= output.len()); debug_assert!((decoded_offset + length) <= decoded_bytes.len()); output[output_offset..output_offset + length] @@ -535,7 +540,8 @@ impl Array { }; let output_slice = UnsafeCellSlice::new(output_slice); let mut futures = chunks - .iter_indices() + .indices() + .into_iter() .map(|chunk_indices| { async move { // Get the subset of the array corresponding to the chunk @@ -589,16 +595,18 @@ impl Array { let chunk_subset_in_array_subset = unsafe { overlap.relative_to_unchecked(array_subset.start()) }; let mut decoded_offset = 0; - for (array_subset_element_index, num_elements) in unsafe { + let contiguous_indices = unsafe { chunk_subset_in_array_subset - .iter_contiguous_linearised_indices_unchecked( - array_subset.shape(), - ) - } { + .contiguous_linearised_indices_unchecked(array_subset.shape()) + }; + let length = usize::try_from( + contiguous_indices.contiguous_elements() * element_size, + ) + .unwrap(); + for (array_subset_element_index, _num_elements) in &contiguous_indices { let output_offset = usize::try_from(array_subset_element_index * element_size) .unwrap(); - let length = usize::try_from(num_elements * element_size).unwrap(); debug_assert!((output_offset + length) <= size_output); debug_assert!((decoded_offset + length) <= decoded_bytes.len()); unsafe { diff --git a/src/array/array_async_readable_writable.rs b/src/array/array_async_readable_writable.rs index d715ac2b..f2859cee 100644 --- a/src/array/array_async_readable_writable.rs +++ b/src/array/array_async_readable_writable.rs @@ -93,7 +93,8 @@ impl Array Array Array { let element_size = self.data_type().size(); - let chunks_to_update = chunks.iter_indices().collect::>(); - let mut futures = chunks_to_update - .iter() + let mut futures = chunks + .indices() + .into_iter() .map(|chunk_indices| { let chunk_subset_in_array = unsafe { self.chunk_grid() - .subset_unchecked(chunk_indices, self.shape()) + .subset_unchecked(&chunk_indices, self.shape()) .unwrap() }; let overlap = unsafe { array_subset.overlap_unchecked(&chunk_subset_in_array) }; @@ -226,7 +226,10 @@ impl Array { chunk_subset_in_array_subset.num_elements() ); - self.async_store_chunk_opt(chunk_indices, chunk_bytes, options) + async move { + self.async_store_chunk_opt(&chunk_indices, chunk_bytes, options) + .await + } }) .collect::>(); while let Some(item) = futures.next().await { @@ -335,16 +338,20 @@ impl Array { let storage_transformer = self .storage_transformers() .create_async_writable_transformer(storage_handle); - let chunks = chunks.iter_indices().collect::>(); let mut futures = chunks - .iter() + .indices() + .into_iter() .map(|chunk_indices| { - crate::storage::async_erase_chunk( - &*storage_transformer, - self.path(), - chunk_indices, - self.chunk_key_encoding(), - ) + let storage_transformer = storage_transformer.clone(); + async move { + crate::storage::async_erase_chunk( + &*storage_transformer, + self.path(), + &chunk_indices, + self.chunk_key_encoding(), + ) + .await + } }) .collect::>(); while let Some(item) = futures.next().await { diff --git a/src/array/array_sync_readable.rs b/src/array/array_sync_readable.rs index 584d3d6a..e6d8813e 100644 --- a/src/array/array_sync_readable.rs +++ b/src/array/array_sync_readable.rs @@ -13,7 +13,7 @@ use super::{ ArrayCodecTraits, ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, DecodeOptions, PartialDecoderOptions, StoragePartialDecoder, }, - transmute_from_bytes_vec, unravel_index, + transmute_from_bytes_vec, unsafe_cell_slice::UnsafeCellSlice, validate_element_size, Array, ArrayCreateError, ArrayError, ArrayMetadata, }; @@ -307,19 +307,9 @@ impl Array { }; if options.is_parallel() { let output = UnsafeCellSlice::new(output_slice); - (0..chunks.shape().iter().product()) + chunks + .indices() .into_par_iter() - .map(|chunk_index| { - std::iter::zip( - unravel_index(chunk_index, chunks.shape()), - chunks.start(), - ) - .map(|(chunk_indices, chunks_start)| chunk_indices + chunks_start) - .collect::>() - }) - // chunks - // .iter_indices() - // .par_bridge() .try_for_each(|chunk_indices| { self._decode_chunk_into_array_subset( &chunk_indices, @@ -329,7 +319,7 @@ impl Array { ) })?; } else { - for chunk_indices in chunks.iter_indices() { + for chunk_indices in &chunks.indices() { self._decode_chunk_into_array_subset( &chunk_indices, &array_subset, @@ -449,12 +439,14 @@ impl Array { let chunk_subset_in_array_subset = unsafe { overlap.relative_to_unchecked(array_subset.start()) }; let mut decoded_offset = 0; - for (array_subset_element_index, num_elements) in unsafe { + let contiguous_indices = unsafe { chunk_subset_in_array_subset - .iter_contiguous_linearised_indices_unchecked(array_subset.shape()) - } { + .contiguous_linearised_indices_unchecked(array_subset.shape()) + }; + let length = + usize::try_from(contiguous_indices.contiguous_elements() * element_size).unwrap(); + for (array_subset_element_index, _num_elements) in &contiguous_indices { let output_offset = usize::try_from(array_subset_element_index * element_size).unwrap(); - let length = usize::try_from(num_elements * element_size).unwrap(); debug_assert!((output_offset + length) <= output.len()); debug_assert!((decoded_offset + length) <= decoded_bytes.len()); output[output_offset..output_offset + length] @@ -547,16 +539,9 @@ impl Array { // FIXME: Constrain concurrency here based on parallelism internally vs externally let output = UnsafeCellSlice::new(output_slice); - (0..chunks.shape().iter().product()) + chunks + .indices() .into_par_iter() - .map(|chunk_index| { - std::iter::zip( - unravel_index(chunk_index, chunks.shape()), - chunks.start(), - ) - .map(|(chunk_indices, chunks_start)| chunk_indices + chunks_start) - .collect::>() - }) .try_for_each(|chunk_indices| { self._decode_chunk_into_array_subset( &chunk_indices, @@ -566,7 +551,7 @@ impl Array { ) })?; } else { - for chunk_indices in chunks.iter_indices() { + for chunk_indices in &chunks.indices() { self._decode_chunk_into_array_subset( &chunk_indices, array_subset, diff --git a/src/array/array_sync_readable_writable.rs b/src/array/array_sync_readable_writable.rs index 80a3a36c..c1f6be5b 100644 --- a/src/array/array_sync_readable_writable.rs +++ b/src/array/array_sync_readable_writable.rs @@ -7,7 +7,7 @@ use crate::{ use super::{ codec::{DecodeOptions, EncodeOptions}, - unravel_index, Array, ArrayError, + Array, ArrayError, }; impl Array { @@ -122,23 +122,13 @@ impl Array Ok(()) }; - if encode_options.is_parallel() { - (0..chunks.shape().iter().product()) - .into_par_iter() - .map(|chunk_index| { - std::iter::zip(unravel_index(chunk_index, chunks.shape()), chunks.start()) - .map(|(chunk_indices, chunks_start)| chunk_indices + chunks_start) - .collect::>() - }) - // chunks - // .iter_indices() - // .par_bridge() - .try_for_each(store_chunk)?; - } else { - for chunk_indices in chunks.iter_indices() { - store_chunk(chunk_indices)?; - } - } + let indices = chunks.indices(); + rayon_iter_concurrent_limit::iter_concurrent_limit!( + encode_options.concurrent_limit().get(), + indices.into_par_iter(), + try_for_each, + store_chunk + )?; } Ok(()) } @@ -301,11 +291,12 @@ impl Array // Update the intersecting subset of the chunk let element_size = self.data_type().size() as u64; let mut offset = 0; - for (chunk_element_index, num_elements) in - unsafe { chunk_subset.iter_contiguous_linearised_indices_unchecked(&chunk_shape) } - { + let contiguous_iterator = + unsafe { chunk_subset.contiguous_linearised_indices_unchecked(&chunk_shape) }; + let length = + usize::try_from(contiguous_iterator.contiguous_elements() * element_size).unwrap(); + for (chunk_element_index, _num_elements) in &contiguous_iterator { let chunk_offset = usize::try_from(chunk_element_index * element_size).unwrap(); - let length = usize::try_from(num_elements * element_size).unwrap(); debug_assert!(chunk_offset + length <= chunk_bytes.len()); debug_assert!(offset + length <= chunk_subset_bytes.len()); chunk_bytes[chunk_offset..chunk_offset + length] diff --git a/src/array/array_sync_writable.rs b/src/array/array_sync_writable.rs index 8feac4ea..893e713c 100644 --- a/src/array/array_sync_writable.rs +++ b/src/array/array_sync_writable.rs @@ -9,7 +9,7 @@ use crate::{ use super::{ codec::{ArrayCodecTraits, EncodeOptions}, - unravel_index, Array, ArrayError, + Array, ArrayError, }; impl Array { @@ -210,16 +210,9 @@ impl Array { Ok(()) }; if options.is_parallel() { - (0..chunks.shape().iter().product()) - .into_par_iter() - .map(|chunk_index| { - std::iter::zip(unravel_index(chunk_index, chunks.shape()), chunks.start()) - .map(|(chunk_indices, chunks_start)| chunk_indices + chunks_start) - .collect::>() - }) - .try_for_each(store_chunk)?; + chunks.indices().into_par_iter().try_for_each(store_chunk)?; } else { - for chunk_indices in chunks.iter_indices() { + for chunk_indices in &chunks.indices() { store_chunk(chunk_indices)?; } } @@ -323,16 +316,9 @@ impl Array { let storage_transformer = self .storage_transformers() .create_writable_transformer(storage_handle); - (0..chunks.shape().iter().product()) - .into_par_iter() - .map(|chunk_index| { - std::iter::zip(unravel_index(chunk_index, chunks.shape()), chunks.start()) - .map(|(chunk_indices, chunks_start)| chunk_indices + chunks_start) - .collect::>() - }) - // chunks - // .iter_indices() - // .par_bridge() + chunks + .indices() + .into_par_iter() // FIXME .try_for_each(|chunk_indices| { crate::storage::erase_chunk( &*storage_transformer, diff --git a/src/array/codec.rs b/src/array/codec.rs index 262e506b..cc244614 100644 --- a/src/array/codec.rs +++ b/src/array/codec.rs @@ -1117,61 +1117,6 @@ pub fn extract_byte_ranges_read( mod tests { use super::*; - #[test] - fn test_array_subset_iterator1() { - let array_shape = vec![2, 2]; - let array_subset = ArraySubset::new_with_shape(vec![2, 1]); - let mut iter = array_subset.iter_contiguous_indices(&array_shape).unwrap(); - - assert_eq!(iter.next().unwrap(), (vec![0, 0], 1)); - assert_eq!(iter.next().unwrap(), (vec![1, 0], 1)); - assert!(iter.next().is_none()); - } - - #[test] - fn test_array_subset_iterator2() { - let array_shape = vec![2, 2]; - let array_subset = ArraySubset::new_with_ranges(&[1..2, 0..2]); - let mut iter = array_subset.iter_contiguous_indices(&array_shape).unwrap(); - - assert_eq!(iter.next().unwrap(), (vec![1, 0], 2)); - assert!(iter.next().is_none()); - } - - #[test] - fn test_array_subset_iterator3() { - let array_shape = vec![2, 2]; - let array_subset = ArraySubset::new_with_shape(vec![2, 2]); - let mut iter = array_subset.iter_contiguous_indices(&array_shape).unwrap(); - - assert_eq!(iter.next().unwrap(), (vec![0, 0], 4)); - assert!(iter.next().is_none()); - } - - #[test] - fn test_array_subset_iterator4() { - let array_shape = vec![2, 2, 2, 3]; - let array_subset = ArraySubset::new_with_shape(vec![2, 1, 2, 3]); - let mut iter = array_subset.iter_contiguous_indices(&array_shape).unwrap(); - - assert_eq!(iter.next().unwrap(), (vec![0, 0, 0, 0], 6)); - assert_eq!(iter.next().unwrap(), (vec![1, 0, 0, 0], 6)); - assert!(iter.next().is_none()); - } - - #[test] - fn test_array_subset_iterator5() { - let array_shape = vec![2, 2, 3]; - let array_subset = ArraySubset::new_with_ranges(&[0..2, 0..2, 1..3]); - let mut iter = array_subset.iter_contiguous_indices(&array_shape).unwrap(); - - assert_eq!(iter.next().unwrap(), (vec![0, 0, 1], 2)); - assert_eq!(iter.next().unwrap(), (vec![0, 1, 1], 2)); - assert_eq!(iter.next().unwrap(), (vec![1, 0, 1], 2)); - assert_eq!(iter.next().unwrap(), (vec![1, 1, 1], 2)); - assert!(iter.next().is_none()); - } - #[test] fn test_extract_byte_ranges_read() { let data: Vec = (0..10).collect(); diff --git a/src/array/codec/array_to_bytes/sharding/sharding_codec.rs b/src/array/codec/array_to_bytes/sharding/sharding_codec.rs index 4ee98ad2..8b15f315 100644 --- a/src/array/codec/array_to_bytes/sharding/sharding_codec.rs +++ b/src/array/codec/array_to_bytes/sharding/sharding_codec.rs @@ -819,8 +819,9 @@ impl ShardingCodec { let shard_shape = shard_representation.shape_u64(); for (chunk_index, (_chunk_indices, chunk_subset)) in unsafe { ArraySubset::new_with_shape(shard_shape.clone()) - .iter_chunks_unchecked(self.chunk_shape.as_slice()) + .chunks_unchecked(self.chunk_shape.as_slice()) } + .into_iter() .enumerate() { let bytes = unsafe { @@ -924,8 +925,9 @@ impl ShardingCodec { let shard_shape = shard_representation.shape_u64(); for (chunk_index, (_chunk_indices, chunk_subset)) in unsafe { ArraySubset::new_with_shape(shard_shape.clone()) - .iter_chunks_unchecked(self.chunk_shape.as_slice()) + .chunks_unchecked(self.chunk_shape.as_slice()) } + .into_iter() .enumerate() { let bytes = unsafe { @@ -1412,15 +1414,13 @@ impl ShardingCodec { // Copy to subset of shard let mut data_idx = 0; let contiguous_iterator = unsafe { - chunk_subset.iter_contiguous_linearised_indices_unchecked(&shard_shape) + chunk_subset.contiguous_linearised_indices_unchecked(&shard_shape) }; let length = usize::try_from( contiguous_iterator.contiguous_elements() * element_size, ) .unwrap(); - for (index, _) in unsafe { - chunk_subset.iter_contiguous_linearised_indices_unchecked(&shard_shape) - } { + for (index, _) in &contiguous_iterator { let shard_offset = usize::try_from(index * element_size).unwrap(); shard_slice[shard_offset..shard_offset + length] .copy_from_slice(&decoded_chunk[data_idx..data_idx + length]); @@ -1456,8 +1456,9 @@ impl ShardingCodec { let shard_shape = shard_representation.shape_u64(); for (chunk_index, (_chunk_indices, chunk_subset)) in unsafe { ArraySubset::new_with_shape(shard_shape.clone()) - .iter_chunks_unchecked(self.chunk_shape.as_slice()) + .chunks_unchecked(self.chunk_shape.as_slice()) } + .into_iter() .enumerate() { // Read the offset/size @@ -1481,11 +1482,13 @@ impl ShardingCodec { // Copy to subset of shard let mut data_idx = 0; - for (index, num_elements) in unsafe { - chunk_subset.iter_contiguous_linearised_indices_unchecked(&shard_shape) - } { + let contiguous_iterator = + unsafe { chunk_subset.contiguous_linearised_indices_unchecked(&shard_shape) }; + let length = + usize::try_from(contiguous_iterator.contiguous_elements() * element_size) + .unwrap(); + for (index, _num_elements) in &contiguous_iterator { let shard_offset = usize::try_from(index * element_size).unwrap(); - let length = usize::try_from(num_elements * element_size).unwrap(); shard_slice[shard_offset..shard_offset + length] .copy_from_slice(&decoded_chunk[data_idx..data_idx + length]); data_idx += length; @@ -1586,11 +1589,13 @@ impl ShardingCodec { let mut data_idx = 0; let element_size = chunk_representation.element_size() as u64; let shard_slice = unsafe { shard_slice.get() }; - for (index, num_elements) in unsafe { - chunk_subset.iter_contiguous_linearised_indices_unchecked(&shard_shape) - } { + let contiguous_iterator = + unsafe { chunk_subset.contiguous_linearised_indices_unchecked(&shard_shape) }; + let length = + usize::try_from(contiguous_iterator.contiguous_elements() * element_size) + .unwrap(); + for (index, _num_elements) in &contiguous_iterator { let shard_offset = usize::try_from(index * element_size).unwrap(); - let length = usize::try_from(num_elements * element_size).unwrap(); shard_slice[shard_offset..shard_offset + length] .copy_from_slice(&decoded_chunk[data_idx..data_idx + length]); data_idx += length; @@ -1620,11 +1625,14 @@ impl ShardingCodec { let mut data_idx = 0; let element_size = chunk_representation.element_size() as u64; let shard_slice = unsafe { shard_slice.get() }; - for (index, num_elements) in unsafe { - chunk_subset.iter_contiguous_linearised_indices_unchecked(&shard_shape) - } { + let contiguous_iterator = unsafe { + chunk_subset.contiguous_linearised_indices_unchecked(&shard_shape) + }; + let length = + usize::try_from(contiguous_iterator.contiguous_elements() * element_size) + .unwrap(); + for (index, _num_elements) in &contiguous_iterator { let shard_offset = usize::try_from(index * element_size).unwrap(); - let length = usize::try_from(num_elements * element_size).unwrap(); shard_slice[shard_offset..shard_offset + length] .copy_from_slice(&filled_chunk[data_idx..data_idx + length]); data_idx += length; @@ -1635,7 +1643,8 @@ impl ShardingCodec { let element_size = chunk_representation.element_size() as u64; for (chunk_index, (_chunk_indices, chunk_subset)) in unsafe { ArraySubset::new_with_shape(shard_shape.clone()) - .iter_chunks_unchecked(self.chunk_shape.as_slice()) + .chunks_unchecked(self.chunk_shape.as_slice()) + .into_iter() } .enumerate() { @@ -1659,11 +1668,13 @@ impl ShardingCodec { // Copy to subset of shard let mut data_idx = 0; let shard_slice = unsafe { shard_slice.get() }; - for (index, num_elements) in unsafe { - chunk_subset.iter_contiguous_linearised_indices_unchecked(&shard_shape) - } { + let contiguous_iterator = + unsafe { chunk_subset.contiguous_linearised_indices_unchecked(&shard_shape) }; + let length = + usize::try_from(contiguous_iterator.contiguous_elements() * element_size) + .unwrap(); + for (index, _num_elements) in &contiguous_iterator { let shard_offset = usize::try_from(index * element_size).unwrap(); - let length = usize::try_from(num_elements * element_size).unwrap(); shard_slice[shard_offset..shard_offset + length] .copy_from_slice(&decoded_chunk[data_idx..data_idx + length]); data_idx += length; diff --git a/src/array/codec/array_to_bytes/sharding/sharding_partial_decoder.rs b/src/array/codec/array_to_bytes/sharding/sharding_partial_decoder.rs index 23c74dba..bbe3029e 100644 --- a/src/array/codec/array_to_bytes/sharding/sharding_partial_decoder.rs +++ b/src/array/codec/array_to_bytes/sharding/sharding_partial_decoder.rs @@ -165,10 +165,13 @@ impl ArrayPartialDecoderTraits for ShardingPartialDecoder<'_> { let out_array_subset_slice = UnsafeCellSlice::new(out_array_subset.as_mut_slice()); // Decode those chunks if required - // FIXME: Concurrent limit here - unsafe { array_subset.iter_chunks_unchecked(chunk_representation.shape()) } - .par_bridge() - .try_for_each(|(chunk_indices, chunk_subset)| { + let chunks = unsafe { array_subset.chunks_unchecked(chunk_representation.shape()) }; + + rayon_iter_concurrent_limit::iter_concurrent_limit!( + options.concurrent_limit().get(), + chunks.into_par_iter(), + try_for_each, + |(chunk_indices, chunk_subset)| { let out_array_subset_slice = unsafe { out_array_subset_slice.get() }; let shard_index_idx: usize = @@ -206,13 +209,16 @@ impl ArrayPartialDecoderTraits for ShardingPartialDecoder<'_> { let chunk_subset_in_array_subset = unsafe { overlap.relative_to_unchecked(array_subset.start()) }; let mut decoded_offset = 0; - for (array_subset_element_index, num_elements) in unsafe { + let contiguous_iterator = unsafe { chunk_subset_in_array_subset - .iter_contiguous_linearised_indices_unchecked(array_subset.shape()) - } { + .contiguous_linearised_indices_unchecked(array_subset.shape()) + }; + let length = + usize::try_from(contiguous_iterator.contiguous_elements() * element_size) + .unwrap(); + for (array_subset_element_index, _num_elements) in &contiguous_iterator { let output_offset = usize::try_from(array_subset_element_index * element_size).unwrap(); - let length = usize::try_from(num_elements * element_size).unwrap(); out_array_subset_slice[output_offset..output_offset + length] .copy_from_slice( &decoded_bytes[decoded_offset..decoded_offset + length], @@ -220,7 +226,8 @@ impl ArrayPartialDecoderTraits for ShardingPartialDecoder<'_> { decoded_offset += length; } Ok::<_, CodecError>(()) - })?; + } + )?; out.push(out_array_subset); } Ok(out) @@ -453,6 +460,7 @@ impl AsyncArrayPartialDecoderTraits for AsyncShardingPartialDecoder<'_> { let element_size = self.decoded_representation.element_size(); let mut out = Vec::with_capacity(array_subsets.len()); + // FIXME: Could go parallel here for array_subset in array_subsets { // shard (subset) let mut shard = vec![ @@ -466,7 +474,8 @@ impl AsyncArrayPartialDecoderTraits for AsyncShardingPartialDecoder<'_> { // Find filled / non filled chunks let chunk_info = - unsafe { array_subset.iter_chunks_unchecked(self.chunk_grid.chunk_shape()) } + unsafe { array_subset.chunks_unchecked(self.chunk_grid.chunk_shape()) } + .into_iter() .map(|(chunk_indices, chunk_subset)| { let chunk_index = ravel_indices(&chunk_indices, &chunks_per_shard); let chunk_index = usize::try_from(chunk_index).unwrap(); @@ -485,7 +494,7 @@ impl AsyncArrayPartialDecoderTraits for AsyncShardingPartialDecoder<'_> { .collect::>(); // Decode unfilled chunks - futures::future::join_all( + let results = futures::future::join_all( chunk_info .iter() .filter_map(|(chunk_subset, offset_size)| { @@ -537,25 +546,38 @@ impl AsyncArrayPartialDecoderTraits for AsyncShardingPartialDecoder<'_> { Ok::<_, CodecError>((chunk_subset_in_array_subset, decoded_chunk)) }), ) - .await - .into_par_iter() - .try_for_each(|subset_and_decoded_chunk| { - let (chunk_subset_in_array_subset, decoded_chunk) = subset_and_decoded_chunk?; - let mut data_idx = 0; - let element_size = element_size as u64; - let shard_slice = unsafe { shard_slice.get() }; - for (index, num_elements) in unsafe { - chunk_subset_in_array_subset - .iter_contiguous_linearised_indices_unchecked(array_subset.shape()) - } { - let shard_offset = usize::try_from(index * element_size).unwrap(); - let length = usize::try_from(num_elements * element_size).unwrap(); - shard_slice[shard_offset..shard_offset + length] - .copy_from_slice(&decoded_chunk[data_idx..data_idx + length]); - data_idx += length; - } - Ok::<_, CodecError>(()) - })?; + .await; + // FIXME: Concurrency limit for futures + + if !results.is_empty() { + rayon_iter_concurrent_limit::iter_concurrent_limit!( + options.concurrent_limit().get(), + results.into_par_iter(), + try_for_each, + |subset_and_decoded_chunk| { + let (chunk_subset_in_array_subset, decoded_chunk) = + subset_and_decoded_chunk?; + let mut data_idx = 0; + let element_size = element_size as u64; + let shard_slice = unsafe { shard_slice.get() }; + let contiguous_iterator = unsafe { + chunk_subset_in_array_subset + .contiguous_linearised_indices_unchecked(array_subset.shape()) + }; + let length = usize::try_from( + contiguous_iterator.contiguous_elements() * element_size, + ) + .unwrap(); + for (index, _num_elements) in &contiguous_iterator { + let shard_offset = usize::try_from(index * element_size).unwrap(); + shard_slice[shard_offset..shard_offset + length] + .copy_from_slice(&decoded_chunk[data_idx..data_idx + length]); + data_idx += length; + } + Ok::<_, CodecError>(()) + } + )?; + } // Write filled chunks let filled_chunks = chunk_info @@ -577,24 +599,33 @@ impl AsyncArrayPartialDecoderTraits for AsyncShardingPartialDecoder<'_> { .repeat(chunk_array_ss.num_elements_usize()); // Write filled chunks - filled_chunks.par_iter().for_each(|chunk_subset| { - let overlap = unsafe { array_subset.overlap_unchecked(chunk_subset) }; - let chunk_subset_in_array_subset = - unsafe { overlap.relative_to_unchecked(array_subset.start()) }; - let mut data_idx = 0; - let element_size = self.decoded_representation.element_size() as u64; - let shard_slice = unsafe { shard_slice.get() }; - for (index, num_elements) in unsafe { - chunk_subset_in_array_subset - .iter_contiguous_linearised_indices_unchecked(array_subset.shape()) - } { - let shard_offset = usize::try_from(index * element_size).unwrap(); - let length = usize::try_from(num_elements * element_size).unwrap(); - shard_slice[shard_offset..shard_offset + length] - .copy_from_slice(&filled_chunk[data_idx..data_idx + length]); - data_idx += length; + rayon_iter_concurrent_limit::iter_concurrent_limit!( + options.concurrent_limit().get(), + filled_chunks.into_par_iter(), + for_each, + |chunk_subset| { + let overlap = unsafe { array_subset.overlap_unchecked(chunk_subset) }; + let chunk_subset_in_array_subset = + unsafe { overlap.relative_to_unchecked(array_subset.start()) }; + let mut data_idx = 0; + let element_size = self.decoded_representation.element_size() as u64; + let shard_slice = unsafe { shard_slice.get() }; + let contiguous_iterator = unsafe { + chunk_subset_in_array_subset + .contiguous_linearised_indices_unchecked(array_subset.shape()) + }; + let length = usize::try_from( + contiguous_iterator.contiguous_elements() * element_size, + ) + .unwrap(); + for (index, _num_elements) in &contiguous_iterator { + let shard_offset = usize::try_from(index * element_size).unwrap(); + shard_slice[shard_offset..shard_offset + length] + .copy_from_slice(&filled_chunk[data_idx..data_idx + length]); + data_idx += length; + } } - }); + ); }; #[allow(clippy::transmute_undefined_repr)] diff --git a/src/array_subset.rs b/src/array_subset.rs index 3042e826..2378991b 100644 --- a/src/array_subset.rs +++ b/src/array_subset.rs @@ -3,19 +3,18 @@ //! An [`ArraySubset`] represents a subset of an array or chunk. //! //! Many [`Array`](crate::array::Array) store and retrieve methods have an [`ArraySubset`] parameter. -//! This module includes various iterators supporting iteration over the indices of an [`ArraySubset`] with respect to an array (or chunk). +//! This module includes various types of [`iterators`] over the elements represented by an [`ArraySubset`]. //! //! This module also provides convenience functions for: //! - computing the byte ranges of array subsets within an array, and //! - extracting the bytes within subsets of an array. -mod array_subset_iterators; +pub mod iterators; use std::{num::NonZeroU64, ops::Range}; -pub use array_subset_iterators::{ - ChunksIterator, ContiguousIndicesIterator, ContiguousLinearisedIndicesIterator, - IndicesIterator, LinearisedIndicesIterator, +use iterators::{ + Chunks, ContiguousIndices, ContiguousLinearisedIndices, Indices, LinearisedIndices, }; use derive_more::{Display, From}; @@ -263,7 +262,7 @@ impl ArraySubset { ) -> Result, IncompatibleArraySubsetAndShapeError> { let mut byte_ranges: Vec = Vec::new(); for (array_index, contiguous_elements) in - self.iter_contiguous_linearised_indices(array_shape)? + &self.contiguous_linearised_indices(array_shape)? { let byte_index = array_index * element_size as u64; let byte_length = contiguous_elements * element_size as u64; @@ -284,7 +283,7 @@ impl ArraySubset { ) -> Vec { let mut byte_ranges: Vec = Vec::new(); for (array_index, contiguous_elements) in - self.iter_contiguous_linearised_indices_unchecked(array_shape) + &self.contiguous_linearised_indices_unchecked(array_shape) { let byte_index = array_index * element_size as u64; let byte_length = contiguous_elements * element_size as u64; @@ -356,7 +355,7 @@ impl ArraySubset { }; let mut subset_offset = 0; for (array_index, contiguous_elements) in - self.iter_contiguous_linearised_indices_unchecked(array_shape) + &self.contiguous_linearised_indices_unchecked(array_shape) { let byte_offset = usize::try_from(array_index * element_size).unwrap(); let byte_length = usize::try_from(contiguous_elements * element_size).unwrap(); @@ -426,7 +425,7 @@ impl ArraySubset { }; let mut subset_offset = 0; for (array_index, contiguous_elements) in - self.iter_contiguous_linearised_indices_unchecked(array_shape) + &self.contiguous_linearised_indices_unchecked(array_shape) { let element_offset = usize::try_from(array_index).unwrap(); let element_length = usize::try_from(contiguous_elements).unwrap(); @@ -477,7 +476,7 @@ impl ArraySubset { } else { let mut offset = 0; for (array_index, contiguous_elements) in - self.iter_contiguous_linearised_indices(array_shape)? + &self.contiguous_linearised_indices(array_shape)? { let byte_index = usize::try_from(array_index * element_size_u64).unwrap(); let byte_length = usize::try_from(contiguous_elements * element_size_u64).unwrap(); @@ -520,7 +519,7 @@ impl ArraySubset { ); let mut offset = 0; for (array_index, contiguous_elements) in - self.iter_contiguous_linearised_indices_unchecked(array_shape) + &self.contiguous_linearised_indices_unchecked(array_shape) { let byte_index = usize::try_from(array_index * element_size_u64).unwrap(); let byte_length = usize::try_from(contiguous_elements * element_size_u64).unwrap(); @@ -534,8 +533,8 @@ impl ArraySubset { /// Returns an iterator over the indices of elements within the subset. #[must_use] - pub fn iter_indices(&self) -> IndicesIterator { - IndicesIterator::new(self.clone()) + pub fn indices(&self) -> Indices { + Indices::new(self.clone()) } /// Returns an iterator over the linearised indices of elements within the subset. @@ -543,11 +542,11 @@ impl ArraySubset { /// # Errors /// /// Returns [`IncompatibleArraySubsetAndShapeError`] if the `array_shape` does not encapsulate this array subset. - pub fn iter_linearised_indices<'a>( + pub fn linearised_indices( &self, - array_shape: &'a [u64], - ) -> Result, IncompatibleArraySubsetAndShapeError> { - LinearisedIndicesIterator::new(self.clone(), array_shape) + array_shape: &[u64], + ) -> Result { + LinearisedIndices::new(self.clone(), array_shape.to_vec()) } /// Returns an iterator over the indices of elements within the subset. @@ -555,11 +554,8 @@ impl ArraySubset { /// # Safety /// `array_shape` must match the dimensionality and encapsulate this array subset. #[must_use] - pub unsafe fn iter_linearised_indices_unchecked<'a>( - &'a self, - array_shape: &'a [u64], - ) -> LinearisedIndicesIterator<'a> { - LinearisedIndicesIterator::new_unchecked(self.clone(), array_shape) + pub unsafe fn linearised_indices_unchecked(&self, array_shape: &[u64]) -> LinearisedIndices { + LinearisedIndices::new_unchecked(self.clone(), array_shape.to_vec()) } /// Returns an iterator over the indices of contiguous elements within the subset. @@ -567,11 +563,11 @@ impl ArraySubset { /// # Errors /// /// Returns [`IncompatibleArraySubsetAndShapeError`] if the `array_shape` does not encapsulate this array subset. - pub fn iter_contiguous_indices<'a>( - &'a self, - array_shape: &'a [u64], - ) -> Result { - ContiguousIndicesIterator::new(self, array_shape) + pub fn contiguous_indices( + &self, + array_shape: &[u64], + ) -> Result { + ContiguousIndices::new(self, array_shape) } /// Returns an iterator over the indices of contiguous elements within the subset. @@ -579,11 +575,8 @@ impl ArraySubset { /// # Safety /// The length of `array_shape` must match the array subset dimensionality. #[must_use] - pub unsafe fn iter_contiguous_indices_unchecked<'a>( - &'a self, - array_shape: &'a [u64], - ) -> ContiguousIndicesIterator { - ContiguousIndicesIterator::new_unchecked(self, array_shape) + pub unsafe fn contiguous_indices_unchecked(&self, array_shape: &[u64]) -> ContiguousIndices { + ContiguousIndices::new_unchecked(self, array_shape) } /// Returns an iterator over the linearised indices of contiguous elements within the subset. @@ -591,11 +584,11 @@ impl ArraySubset { /// # Errors /// /// Returns [`IncompatibleArraySubsetAndShapeError`] if the `array_shape` does not encapsulate this array subset. - pub fn iter_contiguous_linearised_indices<'a>( - &'a self, - array_shape: &'a [u64], - ) -> Result { - ContiguousLinearisedIndicesIterator::new(self, array_shape) + pub fn contiguous_linearised_indices( + &self, + array_shape: &[u64], + ) -> Result { + ContiguousLinearisedIndices::new(self, array_shape.to_vec()) } /// Returns an iterator over the linearised indices of contiguous elements within the subset. @@ -603,29 +596,28 @@ impl ArraySubset { /// # Safety /// The length of `array_shape` must match the array subset dimensionality. #[must_use] - pub unsafe fn iter_contiguous_linearised_indices_unchecked<'a>( - &'a self, - array_shape: &'a [u64], - ) -> ContiguousLinearisedIndicesIterator { - ContiguousLinearisedIndicesIterator::new_unchecked(self, array_shape) + pub unsafe fn contiguous_linearised_indices_unchecked( + &self, + array_shape: &[u64], + ) -> ContiguousLinearisedIndices { + ContiguousLinearisedIndices::new_unchecked(self, array_shape.to_vec()) } - /// Returns an iterator over chunks with shape `chunk_shape` in the array subset. + /// Returns the [`Chunks`] with `chunk_shape` in the array subset which can be iterated over. /// /// All chunks overlapping the array subset are returned, and they all have the same shape `chunk_shape`. /// Thus, the subsets of the chunks may extend out over the subset. /// /// # Errors - /// /// Returns an error if `chunk_shape` does not match the array subset dimensionality. - pub fn iter_chunks<'a>( - &'a self, - chunk_shape: &'a [NonZeroU64], - ) -> Result { - ChunksIterator::new(self, chunk_shape) + pub fn chunks( + &self, + chunk_shape: &[NonZeroU64], + ) -> Result { + Chunks::new(self, chunk_shape) } - /// Returns an iterator over chunks with shape `chunk_shape` in the array subset. + /// Returns the [`Chunks`] with `chunk_shape` in the array subset which can be iterated over. /// /// All chunks overlapping the array subset are returned, and they all have the same shape `chunk_shape`. /// Thus, the subsets of the chunks may extend out over the subset. @@ -633,11 +625,8 @@ impl ArraySubset { /// # Safety /// The length of `chunk_shape` must match the array subset dimensionality. #[must_use] - pub unsafe fn iter_chunks_unchecked<'a>( - &'a self, - chunk_shape: &'a [NonZeroU64], - ) -> ChunksIterator { - ChunksIterator::new_unchecked(self, chunk_shape) + pub unsafe fn chunks_unchecked(&self, chunk_shape: &[NonZeroU64]) -> Chunks { + Chunks::new_unchecked(self, chunk_shape) } /// Return the overlapping subset between this array subset and `subset_other`. @@ -792,7 +781,9 @@ mod tests { let array_subset2 = ArraySubset::new_with_ranges(&[3..6, 4..7, 0..1]); assert!(array_subset0.overlap(&array_subset2).is_err()); assert_eq!( - unsafe { array_subset2.iter_linearised_indices_unchecked(&[6, 7, 1]) }.next(), + unsafe { array_subset2.linearised_indices_unchecked(&[6, 7, 1]) } + .into_iter() + .next(), Some(4 * 1 + 3 * 7 * 1) ) } diff --git a/src/array_subset/array_subset_iterators/chunks_iterator.rs b/src/array_subset/array_subset_iterators/chunks_iterator.rs deleted file mode 100644 index 3e93df5c..00000000 --- a/src/array_subset/array_subset_iterators/chunks_iterator.rs +++ /dev/null @@ -1,83 +0,0 @@ -use std::{iter::FusedIterator, num::NonZeroU64}; - -use crate::{ - array::{chunk_shape_to_array_shape, ArrayIndices}, - array_subset::{ArraySubset, IncompatibleDimensionalityError}, -}; - -use super::IndicesIterator; - -/// Iterates over the regular sized chunks overlapping this array subset. -/// All chunks have the same size, and may extend over the bounds of the array subset. -/// -/// The iterator item is a ([`ArrayIndices`], [`ArraySubset`]) tuple corresponding to the chunk indices and array subset. -pub struct ChunksIterator { - inner: IndicesIterator, - chunk_shape: Vec, -} - -impl ChunksIterator { - /// Create a new chunks iterator. - /// - /// # Errors - /// - /// Returns [`IncompatibleDimensionalityError`] if `chunk_shape` does not match the dimensionality of `subset`. - pub fn new( - subset: &ArraySubset, - chunk_shape: &[NonZeroU64], - ) -> Result { - if subset.dimensionality() == chunk_shape.len() { - Ok(unsafe { Self::new_unchecked(subset, chunk_shape) }) - } else { - Err(IncompatibleDimensionalityError( - chunk_shape.len(), - subset.dimensionality(), - )) - } - } - - /// Create a new chunks iterator. - /// - /// # Safety - /// - /// The dimensionality of `chunk_shape` must match the dimensionality of `subset`. - #[must_use] - pub unsafe fn new_unchecked(subset: &ArraySubset, chunk_shape: &[NonZeroU64]) -> Self { - debug_assert_eq!(subset.dimensionality(), chunk_shape.len()); - let chunk_shape = chunk_shape_to_array_shape(chunk_shape); - let chunk_start: ArrayIndices = std::iter::zip(subset.start(), &chunk_shape) - .map(|(s, c)| s / c) - .collect(); - let chunk_end_inc: ArrayIndices = std::iter::zip(subset.end_inc(), &chunk_shape) - .map(|(e, c)| e / c) - .collect(); - let subset_chunks = - unsafe { ArraySubset::new_with_start_end_inc_unchecked(chunk_start, chunk_end_inc) }; - let inner = IndicesIterator::new(subset_chunks); - Self { inner, chunk_shape } - } -} - -impl Iterator for ChunksIterator { - type Item = (ArrayIndices, ArraySubset); - - fn next(&mut self) -> Option { - self.inner.next().map(|chunk_indices| { - let start = std::iter::zip(&chunk_indices, &self.chunk_shape) - .map(|(i, c)| i * c) - .collect(); - let chunk_subset = unsafe { - ArraySubset::new_with_start_shape_unchecked(start, self.chunk_shape.clone()) - }; - (chunk_indices, chunk_subset) - }) - } - - fn size_hint(&self) -> (usize, Option) { - self.inner.size_hint() - } -} - -impl ExactSizeIterator for ChunksIterator {} - -impl FusedIterator for ChunksIterator {} diff --git a/src/array_subset/array_subset_iterators/contiguous_linearised_indices_iterator.rs b/src/array_subset/array_subset_iterators/contiguous_linearised_indices_iterator.rs deleted file mode 100644 index 33a32cbb..00000000 --- a/src/array_subset/array_subset_iterators/contiguous_linearised_indices_iterator.rs +++ /dev/null @@ -1,66 +0,0 @@ -use std::iter::FusedIterator; - -use crate::{ - array::ravel_indices, - array_subset::{ArraySubset, IncompatibleArraySubsetAndShapeError}, -}; - -use super::ContiguousIndicesIterator; - -/// Iterates over contiguous linearised element indices in an array subset. -/// -/// The iterator item is a tuple: (linearised index, # contiguous elements). -pub struct ContiguousLinearisedIndicesIterator<'a> { - inner: ContiguousIndicesIterator, - array_shape: &'a [u64], -} - -impl<'a> ContiguousLinearisedIndicesIterator<'a> { - /// Return a new contiguous linearised indices iterator. - /// - /// # Errors - /// - /// Returns [`IncompatibleArraySubsetAndShapeError`] if `array_shape` does not encapsulate `subset`. - pub fn new( - subset: &ArraySubset, - array_shape: &'a [u64], - ) -> Result { - let inner = subset.iter_contiguous_indices(array_shape)?; - Ok(Self { inner, array_shape }) - } - - /// Return a new contiguous linearised indices iterator. - /// - /// # Safety - /// - /// `array_shape` must encapsulate `subset`. - #[must_use] - pub unsafe fn new_unchecked(subset: &ArraySubset, array_shape: &'a [u64]) -> Self { - let inner = subset.iter_contiguous_indices_unchecked(array_shape); - Self { inner, array_shape } - } - - /// Return the number of contiguous elements (fixed on each iteration). - #[must_use] - pub fn contiguous_elements(&self) -> u64 { - self.inner.contiguous_elements() - } -} - -impl Iterator for ContiguousLinearisedIndicesIterator<'_> { - type Item = (u64, u64); - - fn next(&mut self) -> Option { - self.inner - .next() - .map(|(indices, elements)| (ravel_indices(&indices, self.array_shape), elements)) - } - - fn size_hint(&self) -> (usize, Option) { - self.inner.size_hint() - } -} - -impl ExactSizeIterator for ContiguousLinearisedIndicesIterator<'_> {} - -impl FusedIterator for ContiguousLinearisedIndicesIterator<'_> {} diff --git a/src/array_subset/array_subset_iterators/indices_iterator.rs b/src/array_subset/array_subset_iterators/indices_iterator.rs deleted file mode 100644 index 671ca003..00000000 --- a/src/array_subset/array_subset_iterators/indices_iterator.rs +++ /dev/null @@ -1,58 +0,0 @@ -use std::iter::FusedIterator; - -use itertools::izip; - -use crate::{array::ArrayIndices, array_subset::ArraySubset}; - -/// Iterates over element indices in an array subset. -pub struct IndicesIterator { - subset_rev: ArraySubset, - index: u64, -} - -impl IndicesIterator { - /// Create a new indices iterator. - #[must_use] - pub fn new(mut subset: ArraySubset) -> Self { - subset.start.reverse(); - subset.shape.reverse(); - Self { - subset_rev: subset, - index: 0, - } - } -} - -impl Iterator for IndicesIterator { - type Item = ArrayIndices; - - fn next(&mut self) -> Option { - let mut current = self.index; - // let mut indices = vec![0u64; self.subset_rev.dimensionality()]; - let mut indices = vec![core::mem::MaybeUninit::uninit(); self.subset_rev.dimensionality()]; - for (out, &subset_start, &subset_size) in izip!( - indices.iter_mut().rev(), - self.subset_rev.start.iter(), - self.subset_rev.shape.iter(), - ) { - out.write(current % subset_size + subset_start); - current /= subset_size; - } - if current == 0 { - self.index += 1; - #[allow(clippy::transmute_undefined_repr)] - Some(unsafe { std::mem::transmute(indices) }) - } else { - None - } - } - - fn size_hint(&self) -> (usize, Option) { - let num_elements = self.subset_rev.num_elements_usize(); - (num_elements, Some(num_elements)) - } -} - -impl ExactSizeIterator for IndicesIterator {} - -impl FusedIterator for IndicesIterator {} diff --git a/src/array_subset/array_subset_iterators/linearised_indices_iterator.rs b/src/array_subset/array_subset_iterators/linearised_indices_iterator.rs deleted file mode 100644 index b640d113..00000000 --- a/src/array_subset/array_subset_iterators/linearised_indices_iterator.rs +++ /dev/null @@ -1,92 +0,0 @@ -use std::iter::FusedIterator; - -use itertools::izip; - -use crate::array_subset::{ArraySubset, IncompatibleArraySubsetAndShapeError}; - -/// Iterates over linearised element indices of an array subset in an array. -pub struct LinearisedIndicesIterator<'a> { - subset: ArraySubset, - index: u64, - array_shape: &'a [u64], -} - -impl<'a> LinearisedIndicesIterator<'a> { - /// Create a new linearised indices iterator. - /// - /// # Errors - /// - /// Returns [`IncompatibleArraySubsetAndShapeError`] if `array_shape` does not encapsulate `subset`. - pub fn new( - subset: ArraySubset, - array_shape: &'a [u64], - ) -> Result { - if subset.dimensionality() == array_shape.len() - && std::iter::zip(subset.end_exc(), array_shape).all(|(end, shape)| end <= *shape) - { - Ok(Self { - subset, - index: 0, - array_shape, - }) - } else { - Err(IncompatibleArraySubsetAndShapeError( - subset, - array_shape.to_vec(), - )) - } - } - - /// Create a new linearised indices iterator. - /// - /// # Safety - /// - /// `array_shape` must encapsulate `subset`. - #[must_use] - pub unsafe fn new_unchecked(subset: ArraySubset, array_shape: &'a [u64]) -> Self { - debug_assert_eq!(subset.dimensionality(), array_shape.len()); - debug_assert!( - std::iter::zip(subset.end_exc(), array_shape).all(|(end, shape)| end <= *shape) - ); - Self { - subset, - index: 0, - array_shape, - } - } -} - -impl Iterator for LinearisedIndicesIterator<'_> { - type Item = u64; - - fn next(&mut self) -> Option { - let mut current = self.index; - let mut out = 0; - let mut mult = 1; - for (&subset_start, &subset_size, &array_size) in izip!( - self.subset.start.iter().rev(), - self.subset.shape.iter().rev(), - self.array_shape.iter().rev() - ) { - let index = current % subset_size + subset_start; - current /= subset_size; - out += index * mult; - mult *= array_size; - } - if current == 0 { - self.index += 1; - Some(out) - } else { - None - } - } - - fn size_hint(&self) -> (usize, Option) { - let num_elements = self.subset.num_elements_usize(); - (num_elements, Some(num_elements)) - } -} - -impl ExactSizeIterator for LinearisedIndicesIterator<'_> {} - -impl FusedIterator for LinearisedIndicesIterator<'_> {} diff --git a/src/array_subset/array_subset_iterators.rs b/src/array_subset/iterators.rs similarity index 58% rename from src/array_subset/array_subset_iterators.rs rename to src/array_subset/iterators.rs index 1fc7bbd3..705d2bb8 100644 --- a/src/array_subset/array_subset_iterators.rs +++ b/src/array_subset/iterators.rs @@ -1,38 +1,72 @@ +//! Array subset iterators. + mod chunks_iterator; mod contiguous_indices_iterator; mod contiguous_linearised_indices_iterator; mod indices_iterator; mod linearised_indices_iterator; -pub use chunks_iterator::ChunksIterator; -pub use contiguous_indices_iterator::ContiguousIndicesIterator; -pub use contiguous_linearised_indices_iterator::ContiguousLinearisedIndicesIterator; -pub use indices_iterator::IndicesIterator; -pub use linearised_indices_iterator::LinearisedIndicesIterator; +pub use chunks_iterator::{Chunks, ChunksIterator}; +pub use contiguous_indices_iterator::{ContiguousIndices, ContiguousIndicesIterator}; +pub use contiguous_linearised_indices_iterator::{ + ContiguousLinearisedIndices, ContiguousLinearisedIndicesIterator, +}; +pub use indices_iterator::{Indices, IndicesIterator, ParIndicesIterator}; +pub use linearised_indices_iterator::{LinearisedIndices, LinearisedIndicesIterator}; #[cfg(test)] mod tests { use std::num::NonZeroU64; + use rayon::iter::{IntoParallelIterator, ParallelIterator}; + use crate::array_subset::ArraySubset; #[test] fn array_subset_iter_indices() { let subset = ArraySubset::new_with_ranges(&[1..3, 1..3]); - let mut iter = subset.iter_indices(); + let indices = subset.indices(); + let mut iter = indices.into_iter(); assert_eq!(iter.size_hint(), (4, Some(4))); assert_eq!(iter.next(), Some(vec![1, 1])); + assert_eq!(iter.next_back(), Some(vec![2, 2])); assert_eq!(iter.next(), Some(vec![1, 2])); assert_eq!(iter.next(), Some(vec![2, 1])); - assert_eq!(iter.next(), Some(vec![2, 2])); assert_eq!(iter.next(), None); + assert_eq!(iter.next_back(), None); + } + + #[test] + fn array_subset_iter_indices2() { + let subset = ArraySubset::new_with_ranges(&[1..3, 1..3]); + let indices = subset.indices(); + let mut iter = indices.into_iter(); + assert_eq!(iter.size_hint(), (4, Some(4))); + assert_eq!(iter.next_back(), Some(vec![2, 2])); + assert_eq!(iter.next_back(), Some(vec![2, 1])); + assert_eq!(iter.next_back(), Some(vec![1, 2])); + assert_eq!(iter.next_back(), Some(vec![1, 1])); + assert_eq!(iter.next(), None); + assert_eq!(iter.next_back(), None); + } + + #[test] + fn array_subset_par_iter_indices() { + use rayon::prelude::*; + let subset = ArraySubset::new_with_ranges(&[1..3, 1..3]); + let indices = subset.indices().into_par_iter().collect::>(); + assert_eq!( + indices, + vec![vec![1, 1], vec![1, 2], vec![2, 1], vec![2, 2]] + ); } #[test] fn array_subset_iter_linearised_indices() { let subset = ArraySubset::new_with_ranges(&[1..3, 1..3]); - assert!(subset.iter_linearised_indices(&[4, 4, 4]).is_err()); - let mut iter = subset.iter_linearised_indices(&[4, 4]).unwrap(); + assert!(subset.linearised_indices(&[4, 4, 4]).is_err()); + let indices = subset.linearised_indices(&[4, 4]).unwrap(); + let mut iter = indices.into_iter(); // 0 1 2 3 // 4 5 6 7 // 8 9 10 11 @@ -48,7 +82,8 @@ mod tests { #[test] fn array_subset_iter_contiguous_indices1() { let subset = ArraySubset::new_with_shape(vec![2, 2]); - let mut iter = subset.iter_contiguous_indices(&[2, 2]).unwrap(); + let indices = subset.contiguous_indices(&[2, 2]).unwrap(); + let mut iter = indices.into_iter(); assert_eq!(iter.size_hint(), (1, Some(1))); assert_eq!(iter.next(), Some((vec![0, 0], 4))); assert_eq!(iter.next(), None); @@ -57,7 +92,8 @@ mod tests { #[test] fn array_subset_iter_contiguous_indices2() { let subset = ArraySubset::new_with_ranges(&[1..3, 1..3]); - let mut iter = subset.iter_contiguous_indices(&[4, 4]).unwrap(); + let indices = subset.contiguous_indices(&[4, 4]).unwrap(); + let mut iter = indices.into_iter(); assert_eq!(iter.size_hint(), (2, Some(2))); assert_eq!(iter.next(), Some((vec![1, 1], 2))); assert_eq!(iter.next(), Some((vec![2, 1], 2))); @@ -67,7 +103,8 @@ mod tests { #[test] fn array_subset_iter_contiguous_indices3() { let subset = ArraySubset::new_with_ranges(&[1..3, 0..1, 0..2, 0..2]); - let mut iter = subset.iter_contiguous_indices(&[3, 1, 2, 2]).unwrap(); + let indices = subset.contiguous_indices(&[3, 1, 2, 2]).unwrap(); + let mut iter = indices.into_iter(); assert_eq!(iter.size_hint(), (1, Some(1))); assert_eq!(iter.next(), Some((vec![1, 0, 0, 0], 8))); assert_eq!(iter.next(), None); @@ -76,7 +113,8 @@ mod tests { #[test] fn array_subset_iter_continuous_linearised_indices() { let subset = ArraySubset::new_with_ranges(&[1..3, 1..3]); - let mut iter = subset.iter_contiguous_linearised_indices(&[4, 4]).unwrap(); + let indices = subset.contiguous_linearised_indices(&[4, 4]).unwrap(); + let mut iter = indices.into_iter(); // 0 1 2 3 // 4 5 6 7 // 8 9 10 11 @@ -92,9 +130,10 @@ mod tests { fn array_subset_iter_chunks1() { let subset = ArraySubset::new_with_ranges(&[1..5, 1..5]); let chunk_shape_invalid = [NonZeroU64::new(2).unwrap()]; - assert!(subset.iter_chunks(&chunk_shape_invalid).is_err()); + assert!(subset.chunks(&chunk_shape_invalid).is_err()); let chunk_shape = [NonZeroU64::new(2).unwrap(), NonZeroU64::new(2).unwrap()]; - let mut iter = subset.iter_chunks(&chunk_shape).unwrap(); + let chunks = subset.chunks(&chunk_shape).unwrap(); + let mut iter = chunks.into_iter(); assert_eq!(iter.size_hint(), (9, Some(9))); assert_eq!(iter.next(), Some((vec![0, 0], ArraySubset::new_with_ranges(&[0..2, 0..2])))); assert_eq!(iter.next(), Some((vec![0, 1], ArraySubset::new_with_ranges(&[0..2, 2..4])))); @@ -113,7 +152,8 @@ mod tests { fn array_subset_iter_chunks2() { let subset = ArraySubset::new_with_ranges(&[2..5, 2..6]); let chunk_shape = [NonZeroU64::new(2).unwrap(), NonZeroU64::new(3).unwrap()]; - let mut iter = subset.iter_chunks(&chunk_shape).unwrap(); + let chunks = subset.chunks(&chunk_shape).unwrap(); + let mut iter = chunks.into_iter(); assert_eq!(iter.size_hint(), (4, Some(4))); assert_eq!(iter.next(), Some((vec![1, 0], ArraySubset::new_with_ranges(&[2..4, 0..3])))); assert_eq!(iter.next(), Some((vec![1, 1], ArraySubset::new_with_ranges(&[2..4, 3..6])))); @@ -121,4 +161,19 @@ mod tests { assert_eq!(iter.next(), Some((vec![2, 1], ArraySubset::new_with_ranges(&[4..6, 3..6])))); assert_eq!(iter.next(), None); } + + #[test] + #[rustfmt::skip] + fn array_subset_par_iter_chunks() { + let subset = ArraySubset::new_with_ranges(&[2..5, 2..6]); + let chunk_shape = [NonZeroU64::new(2).unwrap(), NonZeroU64::new(3).unwrap()]; + let chunks = subset.chunks(&chunk_shape).unwrap(); + let chunks = chunks.into_par_iter().collect::>(); + assert_eq!(chunks, vec![ + (vec![1, 0], ArraySubset::new_with_ranges(&[2..4, 0..3])), + (vec![1, 1], ArraySubset::new_with_ranges(&[2..4, 3..6])), + (vec![2, 0], ArraySubset::new_with_ranges(&[4..6, 0..3])), + (vec![2, 1], ArraySubset::new_with_ranges(&[4..6, 3..6])), + ]); + } } diff --git a/src/array_subset/iterators/chunks_iterator.rs b/src/array_subset/iterators/chunks_iterator.rs new file mode 100644 index 00000000..ef8f4ad0 --- /dev/null +++ b/src/array_subset/iterators/chunks_iterator.rs @@ -0,0 +1,245 @@ +use std::{iter::FusedIterator, num::NonZeroU64}; + +use rayon::iter::{ + plumbing::{bridge, Consumer, Producer, ProducerCallback, UnindexedConsumer}, + IndexedParallelIterator, IntoParallelIterator, ParallelIterator, +}; + +use crate::{ + array::{chunk_shape_to_array_shape, ArrayIndices}, + array_subset::{ArraySubset, IncompatibleDimensionalityError}, +}; + +use super::{ + indices_iterator::ParIndicesIteratorProducer, Indices, IndicesIterator, ParIndicesIterator, +}; + +/// Iterates over the regular sized chunks overlapping this array subset. +/// +/// Iterates over the last dimension fastest (i.e. C-contiguous order). +/// All chunks have the same size, and may extend over the bounds of the array subset. +/// +/// The iterator item is a ([`ArrayIndices`], [`ArraySubset`]) tuple corresponding to the chunk indices and array subset. +/// +/// For example, consider a 4x3 array with element indices +/// ```text +/// (0, 0) (0, 1) (0, 2) +/// (1, 0) (1, 1) (1, 2) +/// (2, 0) (2, 1) (2, 2) +/// (3, 0) (3, 1) (3, 2) +/// ``` +/// An 2x2 chunks iterator with an array subset covering the entire array will produce +/// ```rust,ignore +/// [ +/// ((0, 0), ArraySubset{offset: (0,0), shape: (2, 2)}), +/// ((0, 1), ArraySubset{offset: (0,2), shape: (2, 2)}), +/// ((1, 0), ArraySubset{offset: (2,0), shape: (2, 2)}), +/// ((1, 1), ArraySubset{offset: (2,2), shape: (2, 2)}), +/// ] +/// ``` +/// +pub struct Chunks { + indices: Indices, + chunk_shape: Vec, +} + +impl Chunks { + /// Create a new chunks iterator. + /// + /// # Errors + /// Returns [`IncompatibleDimensionalityError`] if `chunk_shape` does not match the dimensionality of `subset`. + pub fn new( + subset: &ArraySubset, + chunk_shape: &[NonZeroU64], + ) -> Result { + if subset.dimensionality() == chunk_shape.len() { + Ok(unsafe { Self::new_unchecked(subset, chunk_shape) }) + } else { + Err(IncompatibleDimensionalityError( + chunk_shape.len(), + subset.dimensionality(), + )) + } + } + + /// Create a new chunks iterator. + /// + /// # Safety + /// The dimensionality of `chunk_shape` must match the dimensionality of `subset`. + #[must_use] + pub unsafe fn new_unchecked(subset: &ArraySubset, chunk_shape: &[NonZeroU64]) -> Self { + debug_assert_eq!(subset.dimensionality(), chunk_shape.len()); + let chunk_shape = chunk_shape_to_array_shape(chunk_shape); + let chunk_start: ArrayIndices = std::iter::zip(subset.start(), &chunk_shape) + .map(|(s, c)| s / c) + .collect(); + let chunk_end_inc: ArrayIndices = std::iter::zip(subset.end_inc(), &chunk_shape) + .map(|(e, c)| e / c) + .collect(); + let subset_chunks = + unsafe { ArraySubset::new_with_start_end_inc_unchecked(chunk_start, chunk_end_inc) }; + Self { + indices: subset_chunks.indices(), + chunk_shape, + } + } + + /// Create a new serial iterator. + #[must_use] + pub fn iter(&self) -> ChunksIterator<'_> { + <&Self as IntoIterator>::into_iter(self) + } +} + +impl<'a> IntoIterator for &'a Chunks { + type Item = (ArrayIndices, ArraySubset); + type IntoIter = ChunksIterator<'a>; + + fn into_iter(self) -> Self::IntoIter { + ChunksIterator { + inner: self.indices.into_iter(), + chunk_shape: &self.chunk_shape, + } + } +} + +impl<'a> IntoParallelIterator for &'a Chunks { + type Item = (ArrayIndices, ArraySubset); + type Iter = ParChunksIterator<'a>; + + fn into_par_iter(self) -> Self::Iter { + ParChunksIterator { + inner: self.indices.into_par_iter(), + chunk_shape: &self.chunk_shape, + } + } +} + +/// Serial chunks iterator. +/// +/// See [`Chunks`]. +pub struct ChunksIterator<'a> { + inner: IndicesIterator<'a>, + chunk_shape: &'a [u64], +} + +impl ChunksIterator<'_> { + fn chunk_indices_with_subset(&self, chunk_indices: Vec) -> (Vec, ArraySubset) { + let start = std::iter::zip(&chunk_indices, self.chunk_shape) + .map(|(i, c)| i * c) + .collect(); + let chunk_subset = unsafe { + ArraySubset::new_with_start_shape_unchecked(start, self.chunk_shape.to_vec()) + }; + (chunk_indices, chunk_subset) + } +} + +impl Iterator for ChunksIterator<'_> { + type Item = (ArrayIndices, ArraySubset); + + fn next(&mut self) -> Option { + self.inner + .next() + .map(|chunk_indices| self.chunk_indices_with_subset(chunk_indices)) + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} + +impl DoubleEndedIterator for ChunksIterator<'_> { + fn next_back(&mut self) -> Option { + self.inner + .next_back() + .map(|chunk_indices| self.chunk_indices_with_subset(chunk_indices)) + } +} + +impl ExactSizeIterator for ChunksIterator<'_> {} + +impl FusedIterator for ChunksIterator<'_> {} + +/// Parallel chunks iterator. +/// +/// See [`Chunks`]. +pub struct ParChunksIterator<'a> { + inner: ParIndicesIterator<'a>, + chunk_shape: &'a [u64], +} + +impl ParallelIterator for ParChunksIterator<'_> { + type Item = (Vec, ArraySubset); + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + bridge(self, consumer) + } + + fn opt_len(&self) -> Option { + Some(self.len()) + } +} + +impl IndexedParallelIterator for ParChunksIterator<'_> { + fn with_producer>(self, callback: CB) -> CB::Output { + let producer = ParChunksIteratorProducer::from(&self); + callback.callback(producer) + } + + fn drive>(self, consumer: C) -> C::Result { + bridge(self, consumer) + } + + fn len(&self) -> usize { + self.inner.len() + } +} + +#[derive(Debug)] +struct ParChunksIteratorProducer<'a> { + inner: ParIndicesIteratorProducer<'a>, + chunk_shape: &'a [u64], +} + +impl<'a> Producer for ParChunksIteratorProducer<'a> { + type Item = (Vec, ArraySubset); + type IntoIter = ChunksIterator<'a>; + + fn into_iter(self) -> Self::IntoIter { + ChunksIterator { + inner: IndicesIterator::new_with_start_end( + self.inner.subset, + self.inner.index_front, + self.inner.index_back, + ), + chunk_shape: self.chunk_shape, + } + } + + fn split_at(self, index: usize) -> (Self, Self) { + let (left, right) = self.inner.split_at(index); + ( + ParChunksIteratorProducer { + inner: left, + chunk_shape: self.chunk_shape, + }, + ParChunksIteratorProducer { + inner: right, + chunk_shape: self.chunk_shape, + }, + ) + } +} + +impl<'a> From<&'a ParChunksIterator<'_>> for ParChunksIteratorProducer<'a> { + fn from(iterator: &'a ParChunksIterator<'_>) -> Self { + Self { + inner: ParIndicesIteratorProducer::from(&iterator.inner), + chunk_shape: iterator.chunk_shape, + } + } +} diff --git a/src/array_subset/array_subset_iterators/contiguous_indices_iterator.rs b/src/array_subset/iterators/contiguous_indices_iterator.rs similarity index 62% rename from src/array_subset/array_subset_iterators/contiguous_indices_iterator.rs rename to src/array_subset/iterators/contiguous_indices_iterator.rs index 5bb0c7ec..68c329c4 100644 --- a/src/array_subset/array_subset_iterators/contiguous_indices_iterator.rs +++ b/src/array_subset/iterators/contiguous_indices_iterator.rs @@ -12,12 +12,29 @@ use super::IndicesIterator; /// Iterates over contiguous element indices in an array subset. /// /// The iterator item is a tuple: (indices, # contiguous elements). -pub struct ContiguousIndicesIterator { - inner: IndicesIterator, +/// +/// Iterates over the last dimension fastest (i.e. C-contiguous order). +/// For example, consider a 4x3 array with element indices +/// ```text +/// (0, 0) (0, 1) (0, 2) +/// (1, 0) (1, 1) (1, 2) +/// (2, 0) (2, 1) (2, 2) +/// (3, 0) (3, 1) (3, 2) +/// ``` +/// An iterator with an array subset covering the entire array will produce +/// ```rust,ignore +/// [((0, 0), 9)] +/// ``` +/// An iterator with an array subset corresponding to the lower right 2x2 region will produce +/// ```rust,ignore +/// [((2, 1), 2), ((3, 1), 2)] +/// ``` +pub struct ContiguousIndices { + subset_contiguous_start: ArraySubset, contiguous_elements: u64, } -impl ContiguousIndicesIterator { +impl ContiguousIndices { /// Create a new contiguous indices iterator. /// /// # Errors @@ -71,9 +88,9 @@ impl ContiguousIndicesIterator { let shape_out: Vec = unsafe { core::mem::transmute(shape_out) }; let subset_contiguous_start = ArraySubset::new_with_start_shape_unchecked(subset.start().to_vec(), shape_out); - let inner = subset_contiguous_start.iter_indices(); + // let inner = subset_contiguous_start.iter_indices(); Self { - inner, + subset_contiguous_start, contiguous_elements, } } @@ -83,9 +100,35 @@ impl ContiguousIndicesIterator { pub fn contiguous_elements(&self) -> u64 { self.contiguous_elements } + + /// Create a new serial iterator. + #[must_use] + pub fn iter(&self) -> ContiguousIndicesIterator<'_> { + <&Self as IntoIterator>::into_iter(self) + } } -impl Iterator for ContiguousIndicesIterator { +impl<'a> IntoIterator for &'a ContiguousIndices { + type Item = (ArrayIndices, u64); + type IntoIter = ContiguousIndicesIterator<'a>; + + fn into_iter(self) -> Self::IntoIter { + ContiguousIndicesIterator { + inner: IndicesIterator::new(&self.subset_contiguous_start), + contiguous_elements: self.contiguous_elements, + } + } +} + +/// Serial contiguous indices iterator. +/// +/// See [`ContiguousIndices`]. +pub struct ContiguousIndicesIterator<'a> { + inner: IndicesIterator<'a>, + contiguous_elements: u64, +} + +impl Iterator for ContiguousIndicesIterator<'_> { type Item = (ArrayIndices, u64); fn next(&mut self) -> Option { @@ -99,6 +142,14 @@ impl Iterator for ContiguousIndicesIterator { } } -impl ExactSizeIterator for ContiguousIndicesIterator {} +impl DoubleEndedIterator for ContiguousIndicesIterator<'_> { + fn next_back(&mut self) -> Option { + self.inner + .next_back() + .map(|indices| (indices, self.contiguous_elements)) + } +} + +impl ExactSizeIterator for ContiguousIndicesIterator<'_> {} -impl FusedIterator for ContiguousIndicesIterator {} +impl FusedIterator for ContiguousIndicesIterator<'_> {} diff --git a/src/array_subset/iterators/contiguous_linearised_indices_iterator.rs b/src/array_subset/iterators/contiguous_linearised_indices_iterator.rs new file mode 100644 index 00000000..83ba4e14 --- /dev/null +++ b/src/array_subset/iterators/contiguous_linearised_indices_iterator.rs @@ -0,0 +1,117 @@ +use std::iter::FusedIterator; + +use crate::{ + array::ravel_indices, + array_subset::{ArraySubset, IncompatibleArraySubsetAndShapeError}, +}; + +use super::{contiguous_indices_iterator::ContiguousIndices, ContiguousIndicesIterator}; + +/// Iterates over contiguous linearised element indices in an array subset. +/// +/// The iterator item is a tuple: (linearised index, # contiguous elements). +/// +/// Iterates over the last dimension fastest (i.e. C-contiguous order). +/// For example, consider a 4x3 array with linearised element indices +/// ```text +/// 0 1 2 +/// 3 4 5 +/// 6 7 8 +/// 9 10 11 +/// ``` +/// An iterator with an array subset covering the entire array will produce +/// ```rust,ignore +/// [(0, 9)] +/// ``` +/// An iterator with an array subset corresponding to the lower right 2x2 region will produce +/// ```rust,ignore +/// [(7, 2), (10, 2)] +/// ``` +pub struct ContiguousLinearisedIndices { + inner: ContiguousIndices, + array_shape: Vec, +} + +impl ContiguousLinearisedIndices { + /// Return a new contiguous linearised indices iterator. + /// + /// # Errors + /// + /// Returns [`IncompatibleArraySubsetAndShapeError`] if `array_shape` does not encapsulate `subset`. + pub fn new( + subset: &ArraySubset, + array_shape: Vec, + ) -> Result { + let inner = subset.contiguous_indices(&array_shape)?; + Ok(Self { inner, array_shape }) + } + + /// Return a new contiguous linearised indices iterator. + /// + /// # Safety + /// + /// `array_shape` must encapsulate `subset`. + #[must_use] + pub unsafe fn new_unchecked(subset: &ArraySubset, array_shape: Vec) -> Self { + let inner = subset.contiguous_indices_unchecked(&array_shape); + Self { inner, array_shape } + } + + /// Return the number of contiguous elements (fixed on each iteration). + #[must_use] + pub fn contiguous_elements(&self) -> u64 { + self.inner.contiguous_elements() + } + + /// Create a new serial iterator. + #[must_use] + pub fn iter(&self) -> ContiguousLinearisedIndicesIterator<'_> { + <&Self as IntoIterator>::into_iter(self) + } +} + +impl<'a> IntoIterator for &'a ContiguousLinearisedIndices { + type Item = (u64, u64); + type IntoIter = ContiguousLinearisedIndicesIterator<'a>; + + fn into_iter(self) -> Self::IntoIter { + ContiguousLinearisedIndicesIterator { + inner: self.inner.into_iter(), + array_shape: &self.array_shape, + } + } +} + +/// Serial contiguous linearised indices iterator. +/// +/// See [`ContiguousLinearisedIndices`]. +pub struct ContiguousLinearisedIndicesIterator<'a> { + inner: ContiguousIndicesIterator<'a>, + array_shape: &'a [u64], +} + +impl Iterator for ContiguousLinearisedIndicesIterator<'_> { + type Item = (u64, u64); + + fn next(&mut self) -> Option { + self.inner + .next() + .map(|(indices, elements)| (ravel_indices(&indices, self.array_shape), elements)) + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} + +impl DoubleEndedIterator for ContiguousLinearisedIndicesIterator<'_> { + fn next_back(&mut self) -> Option { + self.inner + .next_back() + .map(|(indices, elements)| (ravel_indices(&indices, self.array_shape), elements)) + } +} + +impl ExactSizeIterator for ContiguousLinearisedIndicesIterator<'_> {} + +impl FusedIterator for ContiguousLinearisedIndicesIterator<'_> {} diff --git a/src/array_subset/iterators/indices_iterator.rs b/src/array_subset/iterators/indices_iterator.rs new file mode 100644 index 00000000..8d32a2a3 --- /dev/null +++ b/src/array_subset/iterators/indices_iterator.rs @@ -0,0 +1,264 @@ +use std::iter::FusedIterator; + +use crate::{ + array::{unravel_index, ArrayIndices}, + array_subset::ArraySubset, +}; + +use rayon::iter::{ + plumbing::{bridge, Consumer, Producer, ProducerCallback, UnindexedConsumer}, + IndexedParallelIterator, IntoParallelIterator, ParallelIterator, +}; + +/// An iterator over the indices in an array subset. +/// +/// Iterates over the last dimension fastest (i.e. C-contiguous order). +/// For example, consider a 4x3 array with element indices +/// ```text +/// (0, 0) (0, 1) (0, 2) +/// (1, 0) (1, 1) (1, 2) +/// (2, 0) (2, 1) (2, 2) +/// (3, 0) (3, 1) (3, 2) +/// ``` +/// An iterator with an array subset corresponding to the lower right 2x2 region will produce `[(2, 1), (2, 2), (3, 1), (3, 2)]`. +pub struct Indices { + subset: ArraySubset, + index_front: u64, + index_back: u64, + length: usize, +} + +impl Indices { + /// Create a new indices struct. + #[must_use] + pub fn new(subset: ArraySubset) -> Self { + let length = subset.num_elements_usize(); + let index_front = 0; + let index_back = length as u64; + Self { + subset, + index_front, + index_back, + length, + } + } + + /// Create a new indices struct spanning an explicit index range. + /// + /// # Panics + /// Panics if `index_back` < `index_front` + #[must_use] + pub fn new_with_start_end(subset: ArraySubset, index_front: u64, index_back: u64) -> Self { + let length = usize::try_from(index_back - index_front).unwrap(); + Self { + subset, + index_front, + index_back, + length, + } + } + + /// Create a new serial iterator. + #[must_use] + pub fn iter(&self) -> IndicesIterator<'_> { + <&Self as IntoIterator>::into_iter(self) + } +} + +impl<'a> IntoIterator for &'a Indices { + type Item = ArrayIndices; + type IntoIter = IndicesIterator<'a>; + + fn into_iter(self) -> Self::IntoIter { + IndicesIterator { + subset: &self.subset, + index_front: self.index_front, + index_back: self.index_back, + length: self.length, + } + } +} + +impl<'a> IntoParallelIterator for &'a Indices { + type Item = ArrayIndices; + type Iter = ParIndicesIterator<'a>; + + fn into_par_iter(self) -> Self::Iter { + ParIndicesIterator { + subset: &self.subset, + index_front: self.index_front, + index_back: self.index_back, + length: self.length, + } + } +} + +/// Serial indices iterator. +/// +/// See [`Indices`]. +pub struct IndicesIterator<'a> { + subset: &'a ArraySubset, + index_front: u64, + index_back: u64, + length: usize, +} + +impl<'a> IndicesIterator<'a> { + /// Create a new indices iterator. + #[must_use] + pub(super) fn new(subset: &'a ArraySubset) -> Self { + let length = subset.num_elements_usize(); + let index_front = 0; + let index_back = length as u64; + Self { + subset, + index_front, + index_back, + length, + } + } + + /// Create a new indices iterator spanning an explicit index range. + /// + /// # Panics + /// Panics if `index_back` < `index_front` + #[must_use] + pub(super) fn new_with_start_end( + subset: &'a ArraySubset, + index_front: u64, + index_back: u64, + ) -> Self { + let length = usize::try_from(index_back - index_front).unwrap(); + Self { + subset, + index_front, + index_back, + length, + } + } +} + +impl Iterator for IndicesIterator<'_> { + type Item = ArrayIndices; + + fn next(&mut self) -> Option { + let indices = std::iter::zip( + unravel_index(self.index_front, self.subset.shape()).iter(), // FIXME: iter variant + self.subset.start(), + ) + .map(|(index, start)| index + start) + .collect::>(); + + if self.index_front < self.index_back { + self.index_front += 1; + Some(indices) + } else { + None + } + } + + fn size_hint(&self) -> (usize, Option) { + (self.length, Some(self.length)) + } +} + +impl DoubleEndedIterator for IndicesIterator<'_> { + fn next_back(&mut self) -> Option { + if self.index_back > self.index_front { + self.index_back -= 1; + let indices = std::iter::zip( + unravel_index(self.index_back, self.subset.shape()).iter(), // FIXME: iter variant + self.subset.start(), + ) + .map(|(index, start)| index + start) + .collect::>(); + Some(indices) + } else { + None + } + } +} + +impl ExactSizeIterator for IndicesIterator<'_> {} + +impl FusedIterator for IndicesIterator<'_> {} + +/// Parallel indices iterator. +/// +/// See [`Indices`]. +pub struct ParIndicesIterator<'a> { + subset: &'a ArraySubset, + index_front: u64, + index_back: u64, + length: usize, +} + +impl ParallelIterator for ParIndicesIterator<'_> { + type Item = ArrayIndices; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + bridge(self, consumer) + } + + fn opt_len(&self) -> Option { + Some(self.len()) + } +} + +impl IndexedParallelIterator for ParIndicesIterator<'_> { + fn with_producer>(self, callback: CB) -> CB::Output { + let producer = ParIndicesIteratorProducer::from(&self); + callback.callback(producer) + } + + fn drive>(self, consumer: C) -> C::Result { + bridge(self, consumer) + } + + fn len(&self) -> usize { + self.length + } +} + +#[derive(Debug)] +pub(super) struct ParIndicesIteratorProducer<'a> { + pub subset: &'a ArraySubset, + pub index_front: u64, + pub index_back: u64, +} + +impl<'a> Producer for ParIndicesIteratorProducer<'a> { + type Item = ArrayIndices; + type IntoIter = IndicesIterator<'a>; + + fn into_iter(self) -> Self::IntoIter { + IndicesIterator::new_with_start_end(self.subset, self.index_front, self.index_back) + } + + fn split_at(self, index: usize) -> (Self, Self) { + let left = ParIndicesIteratorProducer { + subset: self.subset, + index_front: self.index_front, + index_back: self.index_front + index as u64, + }; + let right = ParIndicesIteratorProducer { + subset: self.subset, + index_front: self.index_front + index as u64, + index_back: self.index_back, + }; + (left, right) + } +} + +impl<'a> From<&'a ParIndicesIterator<'_>> for ParIndicesIteratorProducer<'a> { + fn from(iterator: &'a ParIndicesIterator<'_>) -> Self { + Self { + subset: iterator.subset, + index_front: iterator.index_front, + index_back: iterator.index_back, + } + } +} diff --git a/src/array_subset/iterators/linearised_indices_iterator.rs b/src/array_subset/iterators/linearised_indices_iterator.rs new file mode 100644 index 00000000..56adc56d --- /dev/null +++ b/src/array_subset/iterators/linearised_indices_iterator.rs @@ -0,0 +1,114 @@ +use std::iter::FusedIterator; + +use crate::{ + array::{ravel_indices, ArrayShape}, + array_subset::{ArraySubset, IncompatibleArraySubsetAndShapeError}, +}; + +use super::IndicesIterator; + +/// An iterator over the linearised indices in an array subset. +/// +/// Iterates over the last dimension fastest (i.e. C-contiguous order). +/// For example, consider a 4x3 array with linearised element indices +/// ```text +/// 0 1 2 +/// 3 4 5 +/// 6 7 8 +/// 9 10 11 +/// ``` +/// An iterator with an array subset corresponding to the lower right 2x2 region will produce `[7, 8, 10, 11]`. +pub struct LinearisedIndices { + subset: ArraySubset, + array_shape: ArrayShape, +} + +impl LinearisedIndices { + /// Create a new linearised indices iterator. + /// + /// # Errors + /// Returns [`IncompatibleArraySubsetAndShapeError`] if `array_shape` does not encapsulate `subset`. + pub fn new( + subset: ArraySubset, + array_shape: ArrayShape, + ) -> Result { + if subset.dimensionality() == array_shape.len() + && std::iter::zip(subset.end_exc(), &array_shape).all(|(end, shape)| end <= *shape) + { + Ok(Self { + subset, + array_shape, + }) + } else { + Err(IncompatibleArraySubsetAndShapeError(subset, array_shape)) + } + } + + /// Create a new linearised indices iterator. + /// + /// # Safety + /// `array_shape` must encapsulate `subset`. + #[must_use] + pub unsafe fn new_unchecked(subset: ArraySubset, array_shape: ArrayShape) -> Self { + debug_assert_eq!(subset.dimensionality(), array_shape.len()); + debug_assert!( + std::iter::zip(subset.end_exc(), &array_shape).all(|(end, shape)| end <= *shape) + ); + Self { + subset, + array_shape, + } + } + + /// Create a new serial iterator. + #[must_use] + pub fn iter(&self) -> LinearisedIndicesIterator<'_> { + <&Self as IntoIterator>::into_iter(self) + } +} + +impl<'a> IntoIterator for &'a LinearisedIndices { + type Item = u64; + type IntoIter = LinearisedIndicesIterator<'a>; + + fn into_iter(self) -> Self::IntoIter { + LinearisedIndicesIterator { + inner: IndicesIterator::new(&self.subset), + array_shape: &self.array_shape, + } + } +} + +/// Parallel linearised indices iterator. +/// +/// See [`LinearisedIndices`]. +pub struct LinearisedIndicesIterator<'a> { + inner: IndicesIterator<'a>, + array_shape: &'a [u64], +} + +impl Iterator for LinearisedIndicesIterator<'_> { + type Item = u64; + + fn next(&mut self) -> Option { + self.inner + .next() + .map(|indices| ravel_indices(&indices, self.array_shape)) + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} + +impl DoubleEndedIterator for LinearisedIndicesIterator<'_> { + fn next_back(&mut self) -> Option { + self.inner + .next_back() + .map(|indices| ravel_indices(&indices, self.array_shape)) + } +} + +impl ExactSizeIterator for LinearisedIndicesIterator<'_> {} + +impl FusedIterator for LinearisedIndicesIterator<'_> {}