Skip to content

Commit

Permalink
Codec trait and array refactor (#6)
Browse files Browse the repository at this point in the history
Codec trait refactor
  • Loading branch information
LDeakin authored Feb 12, 2024
1 parent 5eab757 commit 8d5a27c
Show file tree
Hide file tree
Showing 46 changed files with 2,220 additions and 1,516 deletions.
12 changes: 11 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add store lock tests
- Added `contiguous_elements` method to `ContiguousIndicesIterator` and `ContiguousLinearisedIndicesIterator`
- Added `ChunkShape::num_elements`
- Added `codec::{Encode,Decode,PartialDecode,PartialDecoder}Options`
- Added new `Array::opt` methods which can use new encode/decode options
- **Breaking** Existing `Array` `_opt` use new encode/decode options insted of `parallel: bool`

### Changed
- Dependency bumps
Expand All @@ -32,7 +35,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `array_subset_iterators.rs`
- **Major breaking**: storage transformers must be `Arc` wrapped as `StorageTransformerExtension` trait method now take `self: Arc<Self>`
- Removed lifetimes from `{Async}{Readable,Writable,ReadableWritable,Listable,ReadableListable}Storage`
- **Breaking**: `Group` and `Array` methods generic on storage now require the storage with a `'static` lifetime
- **Breaking**: `Group` and `Array` methods generic on storage now require the storage have a `'static` lifetime
- **Breaking**: remove `Array::{set_}parallel_codecs` and `ArrayBuilder::parallel_codecs`
- **Breaking**: added `recommended_concurrency` to codec trait methods to facilitate improved parallelisation
- **Major breaking**: refactor codec traits:
- **Breaking**: remove `par_` variants,
- **Breaking**: `_opt` variants use new `codec::{Encode,Decode,PartialDecode,PartialDecoder}Options` instead of `parallel: bool`
- variants without prefix/suffix are no longer serial variants but parallel
- TODO: Remove these?

### Removed
- **Breaking**: remove `InvalidArraySubsetError` and `ArrayExtractElementsError`
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ walkdir = "2.3.2"
zfp-sys = {version = "0.1.4", features = ["static"], optional = true }
zip = { version = "0.6", optional = true }
zstd = { version = "0.13", features = ["zstdmt"], optional = true }
rayon_iter_concurrent_limit = "0.1.0-alpha3"

[dev-dependencies]
chrono = "0.4"
Expand Down
10 changes: 3 additions & 7 deletions benches/array_blosc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ fn array_blosc_write_all(c: &mut Criterion) {
.unwrap();
let data = vec![1u8; num_elements.try_into().unwrap()];
let subset = zarrs::array_subset::ArraySubset::new_with_shape(vec![size; 3]);
array
.par_store_array_subset_elements(&subset, data)
.unwrap();
array.store_array_subset_elements(&subset, data).unwrap();
});
});
}
Expand Down Expand Up @@ -69,13 +67,11 @@ fn array_blosc_read_all(c: &mut Criterion) {
.unwrap();
let data = vec![1u8; num_elements.try_into().unwrap()];
let subset = zarrs::array_subset::ArraySubset::new_with_shape(vec![size; 3]);
array
.par_store_array_subset_elements(&subset, data)
.unwrap();
array.store_array_subset_elements(&subset, data).unwrap();

// Benchmark reading the data
b.iter(|| {
let _bytes = array.par_retrieve_array_subset(&subset).unwrap();
let _bytes = array.retrieve_array_subset(&subset).unwrap();
});
});
}
Expand Down
20 changes: 6 additions & 14 deletions benches/array_uncompressed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ fn array_write_all(c: &mut Criterion) {
.unwrap();
let data = vec![1u8; num_elements.try_into().unwrap()];
let subset = zarrs::array_subset::ArraySubset::new_with_shape(vec![size; 3]);
array
.par_store_array_subset_elements(&subset, data)
.unwrap();
array.store_array_subset_elements(&subset, data).unwrap();
});
});
}
Expand Down Expand Up @@ -49,9 +47,7 @@ fn array_write_all_sharded(c: &mut Criterion) {
.unwrap();
let data = vec![1u16; num_elements.try_into().unwrap()];
let subset = zarrs::array_subset::ArraySubset::new_with_shape(vec![size; 3]);
array
.par_store_array_subset_elements(&subset, data)
.unwrap();
array.store_array_subset_elements(&subset, data).unwrap();
});
});
}
Expand All @@ -76,13 +72,11 @@ fn array_read_all(c: &mut Criterion) {
.unwrap();
let data = vec![1u16; num_elements.try_into().unwrap()];
let subset = zarrs::array_subset::ArraySubset::new_with_shape(vec![size; 3]);
array
.par_store_array_subset_elements(&subset, data)
.unwrap();
array.store_array_subset_elements(&subset, data).unwrap();

// Benchmark reading the data
b.iter(|| {
let _bytes = array.par_retrieve_array_subset(&subset).unwrap();
let _bytes = array.retrieve_array_subset(&subset).unwrap();
});
});
}
Expand Down Expand Up @@ -110,13 +104,11 @@ fn array_read_all_sharded(c: &mut Criterion) {
.unwrap();
let data = vec![0u8; num_elements.try_into().unwrap()];
let subset = zarrs::array_subset::ArraySubset::new_with_shape(vec![size; 3]);
array
.par_store_array_subset_elements(&subset, data)
.unwrap();
array.store_array_subset_elements(&subset, data).unwrap();

// Benchmark reading the data
b.iter(|| {
let _bytes = array.par_retrieve_array_subset(&subset).unwrap();
let _bytes = array.retrieve_array_subset(&subset).unwrap();
});
});
}
Expand Down
6 changes: 0 additions & 6 deletions benches/codecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,6 @@ fn codec_blosc(c: &mut Criterion) {
group.bench_function(BenchmarkId::new("decode", size3), |b| {
b.iter(|| codec.decode(data_encoded.clone(), &rep).unwrap());
});
group.bench_function(BenchmarkId::new("par_encode", size3), |b| {
b.iter(|| codec.par_encode(data_decoded.clone()).unwrap());
});
group.bench_function(BenchmarkId::new("par_decode", size3), |b| {
b.iter(|| codec.par_decode(data_encoded.clone(), &rep).unwrap());
});
}
}

Expand Down
3 changes: 1 addition & 2 deletions examples/array_write_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,14 @@ fn array_write_read() -> Result<(), Box<dyn std::error::Error>> {
println!("store_chunk [0, 0] and [0, 1]:\n{data_all:+4.1}\n");

// Store multiple chunks
array.store_chunks_elements_opt::<f32>(
array.store_chunks_elements::<f32>(
&ArraySubset::new_with_ranges(&[1..2, 0..2]),
vec![
//
1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1,
//
1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1,
],
true,
)?;
let data_all = array.retrieve_array_subset_ndarray::<f32>(&subset_all)?;
println!("store_chunks [1..2, 0..2]:\n{data_all:+4.1}\n");
Expand Down
2 changes: 1 addition & 1 deletion examples/sharded_array_write_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ fn sharded_array_write_read() -> Result<(), Box<dyn std::error::Error>> {
ArraySubset::new_with_start_shape(vec![0, 0], inner_chunk_shape.clone())?,
ArraySubset::new_with_start_shape(vec![0, 4], inner_chunk_shape.clone())?,
];
let decoded_inner_chunks = partial_decoder.par_partial_decode(&inner_chunks_to_decode)?;
let decoded_inner_chunks = partial_decoder.partial_decode(&inner_chunks_to_decode)?;
let decoded_inner_chunks = decoded_inner_chunks
.into_iter()
.map(|bytes| {
Expand Down
16 changes: 0 additions & 16 deletions src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@ pub struct Array<TStorage: ?Sized> {
dimension_names: Option<Vec<DimensionName>>,
/// Additional fields annotated with `"must_understand": false`.
additional_fields: AdditionalFields,
/// If true, codecs run with multithreading (where supported)
parallel_codecs: bool,
/// Zarrs metadata.
include_zarrs_metadata: bool,
}
Expand Down Expand Up @@ -289,7 +287,6 @@ impl<TStorage: ?Sized> Array<TStorage> {
additional_fields: metadata.additional_fields,
storage_transformers,
dimension_names: metadata.dimension_names,
parallel_codecs: true,
include_zarrs_metadata: true,
})
}
Expand Down Expand Up @@ -371,19 +368,6 @@ impl<TStorage: ?Sized> Array<TStorage> {
&self.additional_fields
}

/// Returns true if codecs can use multiple threads for encoding and decoding (where supported).
#[must_use]
pub const fn parallel_codecs(&self) -> bool {
self.parallel_codecs
}

/// Enable or disable multithreaded codec encoding/decoding. Enabled by default.
///
/// It may be advantageous to turn this off if parallelisation is external to avoid thrashing.
pub fn set_parallel_codecs(&mut self, parallel_codecs: bool) {
self.parallel_codecs = parallel_codecs;
}

/// Enable or disable the inclusion of zarrs metadata in the array attributes. Enabled by default.
///
/// Zarrs metadata includes the zarrs version and some parameters.
Expand Down
Loading

0 comments on commit 8d5a27c

Please sign in to comment.