Skip to content

Commit

Permalink
Merge pull request #10 from LDeakin/concurrency_sync
Browse files Browse the repository at this point in the history
Improve performance and memory usage
  • Loading branch information
LDeakin authored Feb 19, 2024
2 parents 5687c3f + 47b1361 commit b365efa
Show file tree
Hide file tree
Showing 74 changed files with 3,574 additions and 3,030 deletions.
2 changes: 1 addition & 1 deletion BUILD.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ cargo bench -- --baseline baseline

Install `cargo-llvm-cov`
```bash
cargo +stable install cargo-llvm-cov --locked
cargo +nightly install cargo-llvm-cov --locked
```

Generate a HTML report
Expand Down
31 changes: 26 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add store lock tests
- Added `contiguous_elements` method to `ContiguousIndicesIterator` and `ContiguousLinearisedIndicesIterator`
- Added `ChunkShape::num_elements`
- Added `codec::{Encode,Decode,PartialDecode,PartialDecoder}Options`
- Added new `Array::opt` methods which can use new encode/decode options
- **Breaking** Existing `Array` `_opt` use new encode/decode options insted of `parallel: bool`
- Added `codec::CodecOptions{Builder}`
- **Breaking**: Added new `Array::opt` methods which can use `codec::CodecOptions` and existing methods use it instead of `parallel: bool`
- Implement `DoubleEndedIterator` for `{Indices,LinearisedIndices,ContiguousIndices,ContiguousLinearisedIndicesIterator}Iterator`
- Add `ParIndicesIterator` and `ParChunksIterator`
- Implement `From<String>` for `DimensionName`
- Add `{Async}ReadableWritableListableStorageTraits` and `{Async}ReadableWritableListableStorage`
- Add `ArrayCodecTraits::decode_into_array_view_opt` with default implementation
- TODO: Same for async
- Add `ArrayPartialDecoderTraits::partial_decode_into_array_view_opt` with default implementation
- TODO: Same for async
- Add `array::ArrayView`
- Add array `into_array_view` methods
- `Array::{async_}retrieve_chunk{_subset}_into_array_view{_opt}`
- `Array::{async_}retrieve_array_subset_into_array_view{_opt}`
- TODO: `Array::{async_}retrieve_array_chunks_into_array_view{_opt}`
- Add `array::unsafe_cell_slice::UnsafeCellSlice::len()`
- Add `{Array,Chunk}Representation::dimensionality()`
- Add `ArraySubset::new_empty()` and `ArraySubset::is_empty()`
- Add missing `IncompatibleArraySubsetAndShapeError::new()`
- Add more array tests
- Add `Array::dimensionality()`

### Changed
- Dependency bumps
Expand All @@ -39,6 +53,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `array_subset_iterators.rs`
- **Major breaking**: storage transformers must be `Arc` wrapped as `StorageTransformerExtension` trait method now take `self: Arc<Self>`
- Removed lifetimes from `{Async}{Readable,Writable,ReadableWritable,Listable,ReadableListable}Storage`
- **Breaking**: Add `create{_async}_readable_writable_listable_transformer` to `StorageTransformerExtension` trait
- **Breaking**: `Group` and `Array` methods generic on storage now require the storage have a `'static` lifetime
- **Breaking**: remove `Array::{set_}parallel_codecs` and `ArrayBuilder::parallel_codecs`
- **Breaking**: added `recommended_concurrency` to codec trait methods to facilitate improved parallelisation
Expand All @@ -55,14 +70,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add a fast path to `Array::retrieve_chunk_subset{_opt}` if the entire chunk is requested
- `DimensionName::new()` generalised to accept a name implementing `Into<String>`
- **Breaking**: `ArrayBuilder::dimension_names()` generalised to accept `Option<I>` where `I: IntoIterator<Item = D>` and `D: Into<DimensionName>`
- Can now write
`builder.dimension_names(["y", "x"].into())` instead of `builder.dimension_names(vec!["y".into(), "x".into()].into())`
- Can now write `builder.dimension_names(["y", "x"].into())` instead of `builder.dimension_names(vec!["y".into(), "x".into()].into())`
- **Breaking**: Add `ArrayPartialDecoderTraits::element_size()`
- Cleanup uninitialised `Vec` handling

### Removed
- **Breaking**: remove `InvalidArraySubsetError` and `ArrayExtractElementsError`
- **Breaking**: Remove non-default store lock constructors
- **Breaking**: Remove unused `storage::store::{Readable,Writable,ReadableWritable,Listable}Store`

### Fixed
- `Array::retrieve_array_subset` and variants now correctly return the fill value if the array subset does not intersect any chunks
- **Breaking**: `ArraySubset::end_inc` now returns an `Option`, which is `None` for an empty array subset
- Add missing input validation to some `partial_decode` methods

## [0.11.6] - 2024-02-06

### Added
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ parking_lot = "0.12"
pathdiff = "0.2"
pco = { version = "0.1.3", optional = true }
rayon = "1.6"
rayon_iter_concurrent_limit = "0.1.0"
reqwest = { version = "0.11", features = ["blocking"], optional = true }
serde = { version = "1.0.100", features = ["derive"] }
serde_json = { version = "1.0.71", features = ["float_roundtrip", "preserve_order"] }
Expand All @@ -71,7 +72,6 @@ walkdir = "2.3.2"
zfp-sys = {version = "0.1.4", features = ["static"], optional = true }
zip = { version = "0.6", optional = true }
zstd = { version = "0.13", features = ["zstdmt"], optional = true }
rayon_iter_concurrent_limit = "0.1.0-alpha3"

[dev-dependencies]
chrono = "0.4"
Expand Down
24 changes: 19 additions & 5 deletions benches/codecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use zarrs::array::{
codec::{
array_to_bytes::bytes::Endianness,
bytes_to_bytes::blosc::{BloscCompressor, BloscShuffleMode},
ArrayCodecTraits, BloscCodec, BytesCodec, BytesToBytesCodecTraits,
ArrayCodecTraits, BloscCodec, BytesCodec, BytesToBytesCodecTraits, CodecOptions,
},
BytesRepresentation, ChunkRepresentation, DataType,
};
Expand Down Expand Up @@ -36,7 +36,11 @@ fn codec_bytes(c: &mut Criterion) {
group.throughput(Throughput::Bytes(size3));
// encode and decode have the same implementation
group.bench_function(BenchmarkId::new("encode_decode", size3), |b| {
b.iter(|| codec.encode(data.clone(), &rep).unwrap());
b.iter(|| {
codec
.encode(data.clone(), &rep, &CodecOptions::default())
.unwrap()
});
});
}
}
Expand All @@ -60,13 +64,23 @@ fn codec_blosc(c: &mut Criterion) {
let rep = BytesRepresentation::FixedSize(size3);

let data_decoded: Vec<u8> = (0..size3).map(|i| i as u8).collect();
let data_encoded = codec.encode(data_decoded.clone()).unwrap();
let data_encoded = codec
.encode(data_decoded.clone(), &CodecOptions::default())
.unwrap();
group.throughput(Throughput::Bytes(size3));
group.bench_function(BenchmarkId::new("encode", size3), |b| {
b.iter(|| codec.encode(data_decoded.clone()).unwrap());
b.iter(|| {
codec
.encode(data_decoded.clone(), &CodecOptions::default())
.unwrap()
});
});
group.bench_function(BenchmarkId::new("decode", size3), |b| {
b.iter(|| codec.decode(data_encoded.clone(), &rep).unwrap());
b.iter(|| {
codec
.decode(data_encoded.clone(), &rep, &CodecOptions::default())
.unwrap()
});
});
}
}
Expand Down
40 changes: 16 additions & 24 deletions examples/sharded_array_write_read.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use itertools::Itertools;
use zarrs::storage::ListableStorageTraits;

fn sharded_array_write_read() -> Result<(), Box<dyn std::error::Error>> {
use zarrs::{
Expand All @@ -8,10 +9,7 @@ fn sharded_array_write_read() -> Result<(), Box<dyn std::error::Error>> {
},
array_subset::ArraySubset,
node::Node,
storage::{
storage_transformer::{StorageTransformerExtension, UsageLogStorageTransformer},
store,
},
storage::store,
};

use rayon::prelude::{IntoParallelIterator, ParallelIterator};
Expand All @@ -22,20 +20,18 @@ fn sharded_array_write_read() -> Result<(), Box<dyn std::error::Error>> {
// let store = Arc::new(store::FilesystemStore::new(path.path())?);
// let store = Arc::new(store::FilesystemStore::new("tests/data/sharded_array_write_read.zarr")?);
let store = Arc::new(store::MemoryStore::new());
let log_writer = Arc::new(std::sync::Mutex::new(
// std::io::BufWriter::new(
std::io::stdout(),
// )
));
let usage_log = Arc::new(UsageLogStorageTransformer::new(log_writer, || {
chrono::Utc::now().format("[%T%.3f] ").to_string()
}));
let store_readable_listable = usage_log
.clone()
.create_readable_listable_transformer(store.clone());
let store = usage_log
.clone()
.create_readable_writable_transformer(store);

// let log_writer = Arc::new(std::sync::Mutex::new(
// // std::io::BufWriter::new(
// std::io::stdout(),
// // )
// ));
// let usage_log = Arc::new(UsageLogStorageTransformer::new(log_writer, || {
// chrono::Utc::now().format("[%T%.3f] ").to_string()
// }));
// let store = usage_log
// .clone()
// .create_readable_writable_transformer(store);

// Create a group
let group_path = "/group";
Expand Down Expand Up @@ -154,17 +150,13 @@ fn sharded_array_write_read() -> Result<(), Box<dyn std::error::Error>> {
}

// Show the hierarchy
let node = Node::new(&*store_readable_listable, "/").unwrap();
let node = Node::new(&*store, "/").unwrap();
let tree = node.hierarchy_tree();
println!("The zarr hierarchy tree is:\n{}", tree);

println!(
"The keys in the store are:\n[{}]",
store_readable_listable
.list()
.unwrap_or_default()
.iter()
.format(", ")
store.list().unwrap_or_default().iter().format(", ")
);

Ok(())
Expand Down
89 changes: 60 additions & 29 deletions src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ mod array_builder;
mod array_errors;
mod array_metadata;
mod array_representation;
mod array_view;
mod bytes_representation;
pub mod chunk_grid;
pub mod chunk_key_encoding;
mod chunk_shape;
pub mod codec;
pub mod concurrency;
pub mod data_type;
mod dimension_name;
mod fill_value;
Expand All @@ -29,6 +31,7 @@ pub use self::{
array_errors::{ArrayCreateError, ArrayError},
array_metadata::{ArrayMetadata, ArrayMetadataV3},
array_representation::{ArrayRepresentation, ChunkRepresentation},
array_view::{ArrayView, ArrayViewCreateError},
bytes_representation::BytesRepresentation,
chunk_grid::ChunkGrid,
chunk_key_encoding::ChunkKeyEncoding,
Expand All @@ -40,6 +43,7 @@ pub use self::{
fill_value_metadata::FillValueMetadata,
nan_representations::{ZARR_NAN_BF16, ZARR_NAN_F16, ZARR_NAN_F32, ZARR_NAN_F64},
};
use self::{codec::ArrayCodecTraits, concurrency::RecommendedConcurrency};

use serde::Serialize;
use thiserror::Error;
Expand Down Expand Up @@ -326,6 +330,12 @@ impl<TStorage: ?Sized> Array<TStorage> {
&self.shape
}

/// Get the array dimensionality.
#[must_use]
pub fn dimensionality(&self) -> usize {
self.shape.len()
}

/// Get the codecs.
#[must_use]
pub const fn codecs(&self) -> &CodecChain {
Expand Down Expand Up @@ -463,11 +473,16 @@ impl<TStorage: ?Sized> Array<TStorage> {
/// Returns [`ArrayError::InvalidChunkGridIndicesError`] if a chunk in `chunks` is incompatible with the chunk grid.
#[allow(clippy::similar_names)]
pub fn chunks_subset(&self, chunks: &ArraySubset) -> Result<ArraySubset, ArrayError> {
let chunk0 = self.chunk_subset(chunks.start())?;
let chunk1 = self.chunk_subset(&chunks.end_inc())?;
let start = chunk0.start();
let end = chunk1.end_exc();
Ok(unsafe { ArraySubset::new_with_start_end_exc_unchecked(start.to_vec(), end) })
match chunks.end_inc() {
Some(end) => {
let chunk0 = self.chunk_subset(chunks.start())?;
let chunk1 = self.chunk_subset(&end)?;
let start = chunk0.start();
let end = chunk1.end_exc();
Ok(unsafe { ArraySubset::new_with_start_end_exc_unchecked(start.to_vec(), end) })
}
None => Ok(ArraySubset::new_empty(chunks.dimensionality())),
}
}

/// Return the array subset of `chunks` bounded by the array shape.
Expand Down Expand Up @@ -515,24 +530,38 @@ impl<TStorage: ?Sized> Array<TStorage> {
&self,
array_subset: &ArraySubset,
) -> Result<Option<ArraySubset>, IncompatibleDimensionalityError> {
// Find the chunks intersecting this array subset
let chunks_start = self
.chunk_grid()
.chunk_indices(array_subset.start(), self.shape())?;
let chunks_end = self
.chunk_grid()
.chunk_indices(&array_subset.end_inc(), self.shape())?;
let chunks_end = chunks_end.map_or_else(|| self.chunk_grid_shape(), Some);

Ok(
if let (Some(chunks_start), Some(chunks_end)) = (chunks_start, chunks_end) {
Some(unsafe {
ArraySubset::new_with_start_end_inc_unchecked(chunks_start, chunks_end)
})
} else {
None
},
)
match array_subset.end_inc() {
Some(end) => {
let chunks_start = self
.chunk_grid()
.chunk_indices(array_subset.start(), self.shape())?;
let chunks_end = self
.chunk_grid()
.chunk_indices(&end, self.shape())?
.map_or_else(|| self.chunk_grid_shape(), Some);

Ok(
if let (Some(chunks_start), Some(chunks_end)) = (chunks_start, chunks_end) {
Some(unsafe {
ArraySubset::new_with_start_end_inc_unchecked(chunks_start, chunks_end)
})
} else {
None
},
)
}
None => Ok(Some(ArraySubset::new_empty(self.dimensionality()))),
}
}

/// Calculate the recommended codec concurrency.
fn recommended_codec_concurrency(
&self,
chunk_representation: &ChunkRepresentation,
) -> Result<RecommendedConcurrency, ArrayError> {
Ok(self
.codecs()
.recommended_concurrency(chunk_representation)?)
}
}

Expand Down Expand Up @@ -629,15 +658,16 @@ pub fn transmute_to_bytes_vec<T: bytemuck::NoUninit>(from: Vec<T>) -> Vec<u8> {
#[must_use]
pub fn unravel_index(mut index: u64, shape: &[u64]) -> ArrayIndices {
let len = shape.len();
let mut indices = vec![core::mem::MaybeUninit::uninit(); len];
for (indices_i, &dim) in std::iter::zip(indices.iter_mut().rev(), shape.iter().rev()) {
let mut indices: ArrayIndices = Vec::with_capacity(len);
for (indices_i, &dim) in std::iter::zip(
indices.spare_capacity_mut().iter_mut().rev(),
shape.iter().rev(),
) {
indices_i.write(index % dim);
index /= dim;
}
#[allow(clippy::transmute_undefined_repr)]
unsafe {
core::mem::transmute(indices)
}
unsafe { indices.set_len(len) };
indices
}

/// Ravel ND indices to a linearised index.
Expand Down Expand Up @@ -754,6 +784,7 @@ mod tests {
}

#[test]
#[cfg_attr(miri, ignore)]
fn array_subset_round_trip() {
let store = Arc::new(MemoryStore::default());
let array_path = "/array";
Expand Down
Loading

0 comments on commit b365efa

Please sign in to comment.