Skip to content

Commit

Permalink
Remove codec async methods, aside from async_partial_decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
LDeakin committed Feb 19, 2024
1 parent 099528d commit 665e315
Show file tree
Hide file tree
Showing 12 changed files with 10 additions and 712 deletions.
3 changes: 1 addition & 2 deletions src/array/array_async_readable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ impl<TStorage: ?Sized + AsyncReadableStorageTraits + 'static> Array<TStorage> {
let chunk_representation = self.chunk_array_representation(chunk_indices)?;
let chunk_decoded = self
.codecs()
.async_decode(chunk_encoded, &chunk_representation, options)
.await
.decode(chunk_encoded, &chunk_representation, options)
.map_err(ArrayError::CodecError)?;
let chunk_decoded_size =
chunk_representation.num_elements_usize() * chunk_representation.data_type().size();
Expand Down
3 changes: 1 addition & 2 deletions src/array/array_async_writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ impl<TStorage: ?Sized + AsyncWritableStorageTraits + 'static> Array<TStorage> {
.create_async_writable_transformer(storage_handle);
let chunk_encoded: Vec<u8> = self
.codecs()
.async_encode(chunk_bytes, &chunk_array_representation, options)
.await
.encode(chunk_bytes, &chunk_array_representation, options)
.map_err(ArrayError::CodecError)?;
crate::storage::async_store_chunk(
&*storage_transformer,
Expand Down
64 changes: 0 additions & 64 deletions src/array/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ pub trait CodecTraits: Send + Sync {
}

/// Traits for both array to array and array to bytes codecs.
#[cfg_attr(feature = "async", async_trait::async_trait)]
pub trait ArrayCodecTraits: CodecTraits {
/// Return the recommended concurrency for the requested decoded representation.
///
Expand All @@ -206,22 +205,6 @@ pub trait ArrayCodecTraits: CodecTraits {
options: &CodecOptions,
) -> Result<Vec<u8>, CodecError>;

#[cfg(feature = "async")]
/// Asynchronously encode a chunk.
///
/// The default implementation calls [`encode`](ArrayCodecTraits::encode).
///
/// # Errors
/// Returns [`CodecError`] if a codec fails or the decoded output is incompatible with `decoded_representation`.
async fn async_encode(
&self,
decoded_value: Vec<u8>,
decoded_representation: &ChunkRepresentation,
options: &CodecOptions,
) -> Result<Vec<u8>, CodecError> {
self.encode(decoded_value, decoded_representation, options)
}

/// Decode a chunk.
///
/// # Errors
Expand All @@ -233,22 +216,6 @@ pub trait ArrayCodecTraits: CodecTraits {
options: &CodecOptions,
) -> Result<Vec<u8>, CodecError>;

#[cfg(feature = "async")]
/// Asynchronously decode a chunk.
///
/// The default implementation calls [`decode`](ArrayCodecTraits::decode).
///
/// # Errors
/// Returns [`CodecError`] if a codec fails or the decoded output is incompatible with `decoded_representation`.
async fn async_decode(
&self,
encoded_value: Vec<u8>,
decoded_representation: &ChunkRepresentation,
options: &CodecOptions,
) -> Result<Vec<u8>, CodecError> {
self.decode(encoded_value, decoded_representation, options)
}

/// Decode into the subset of an array.
///
/// The default implementation decodes the chunk as normal then copies it into the array subset.
Expand Down Expand Up @@ -700,37 +667,6 @@ pub trait BytesToBytesCodecTraits: CodecTraits + dyn_clone::DynClone + core::fmt
options: &CodecOptions,
) -> Result<Box<dyn BytesPartialDecoderTraits + 'a>, CodecError>;

#[cfg(feature = "async")]
/// Asynchronously encode chunk bytes.
///
/// The default implementation calls [`encode`](BytesToBytesCodecTraits::encode).
///
/// # Errors
/// Returns [`CodecError`] if a codec fails.
async fn async_encode(
&self,
decoded_value: Vec<u8>,
options: &CodecOptions,
) -> Result<Vec<u8>, CodecError> {
self.encode(decoded_value, options)
}

#[cfg(feature = "async")]
/// Asynchronously decode chunk bytes.
///
/// The default implementation calls [`decode`](BytesToBytesCodecTraits::decode).
///
/// # Errors
/// Returns [`CodecError`] if a codec fails.
async fn async_decode(
&self,
encoded_value: Vec<u8>,
decoded_representation: &BytesRepresentation,
options: &CodecOptions,
) -> Result<Vec<u8>, CodecError> {
self.decode(encoded_value, decoded_representation, options)
}

#[cfg(feature = "async")]
/// Initialises an asynchronous partial decoder.
///
Expand Down
1 change: 0 additions & 1 deletion src/array/codec/array_to_array/bitround/bitround_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ impl CodecTraits for BitroundCodec {
}
}

#[cfg_attr(feature = "async", async_trait::async_trait)]
impl ArrayCodecTraits for BitroundCodec {
fn recommended_concurrency(
&self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ impl ArrayToArrayCodecTraits for TransposeCodec {
}
}

#[cfg_attr(feature = "async", async_trait::async_trait)]
impl ArrayCodecTraits for TransposeCodec {
fn recommended_concurrency(
&self,
Expand Down
1 change: 0 additions & 1 deletion src/array/codec/array_to_bytes/bytes/bytes_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ impl CodecTraits for BytesCodec {
}
}

#[cfg_attr(feature = "async", async_trait::async_trait)]
impl ArrayCodecTraits for BytesCodec {
fn recommended_concurrency(
&self,
Expand Down
96 changes: 0 additions & 96 deletions src/array/codec/array_to_bytes/codec_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,6 @@ impl ArrayToBytesCodecTraits for CodecChain {
}
}

#[cfg_attr(feature = "async", async_trait::async_trait)]
impl ArrayCodecTraits for CodecChain {
fn recommended_concurrency(
&self,
Expand Down Expand Up @@ -504,101 +503,6 @@ impl ArrayCodecTraits for CodecChain {
Ok(encoded_value)
}

#[cfg(feature = "async")]
async fn async_encode(
&self,
decoded_value: Vec<u8>,
decoded_representation: &ChunkRepresentation,
options: &CodecOptions,
) -> Result<Vec<u8>, CodecError> {
if decoded_value.len() as u64 != decoded_representation.size() {
return Err(CodecError::UnexpectedChunkDecodedSize(
decoded_value.len(),
decoded_representation.size(),
));
}

let mut decoded_representation = decoded_representation.clone();

let mut value = decoded_value;
// array->array
for codec in &self.array_to_array {
value = codec
.async_encode(value, &decoded_representation, options)
.await?;
decoded_representation = codec.compute_encoded_size(&decoded_representation)?;
}

// array->bytes
value = self
.array_to_bytes
.async_encode(value, &decoded_representation, options)
.await?;
let mut decoded_representation = self
.array_to_bytes
.compute_encoded_size(&decoded_representation)?;

// bytes->bytes
for codec in &self.bytes_to_bytes {
value = codec.async_encode(value, options).await?;
decoded_representation = codec.compute_encoded_size(&decoded_representation);
}

Ok(value)
}

#[cfg(feature = "async")]
async fn async_decode(
&self,
mut encoded_value: Vec<u8>,
decoded_representation: &ChunkRepresentation,
options: &CodecOptions,
) -> Result<Vec<u8>, CodecError> {
let array_representations =
self.get_array_representations(decoded_representation.clone())?;
let bytes_representations =
self.get_bytes_representations(array_representations.last().unwrap())?;

// bytes->bytes
for (codec, bytes_representation) in std::iter::zip(
self.bytes_to_bytes.iter().rev(),
bytes_representations.iter().rev().skip(1),
) {
encoded_value = codec
.async_decode(encoded_value, bytes_representation, options)
.await?;
}

// bytes->array
encoded_value = self
.array_to_bytes
.async_decode(
encoded_value,
array_representations.last().unwrap(),
options,
)
.await?;

// array->array
for (codec, array_representation) in std::iter::zip(
self.array_to_array.iter().rev(),
array_representations.iter().rev().skip(1),
) {
encoded_value = codec
.async_decode(encoded_value, array_representation, options)
.await?;
}

if encoded_value.len() as u64 != decoded_representation.size() {
return Err(CodecError::UnexpectedChunkDecodedSize(
encoded_value.len(),
decoded_representation.size(),
));
}

Ok(encoded_value)
}

fn decode_into_array_view(
&self,
encoded_value: &[u8],
Expand Down
1 change: 0 additions & 1 deletion src/array/codec/array_to_bytes/pcodec/pcodec_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ impl CodecTraits for PcodecCodec {
}
}

#[cfg_attr(feature = "async", async_trait::async_trait)]
impl ArrayCodecTraits for PcodecCodec {
fn recommended_concurrency(
&self,
Expand Down
23 changes: 2 additions & 21 deletions src/array/codec/array_to_bytes/sharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,23 +119,6 @@ fn decode_shard_index(
.collect())
}

#[cfg(feature = "async")]
async fn async_decode_shard_index(
encoded_shard_index: Vec<u8>,
index_array_representation: &ChunkRepresentation,
index_codecs: &dyn ArrayToBytesCodecTraits,
options: &CodecOptions,
) -> Result<Vec<u64>, CodecError> {
// Decode the shard index
let decoded_shard_index = index_codecs
.async_decode(encoded_shard_index, index_array_representation, options)
.await?;
Ok(decoded_shard_index
.chunks_exact(core::mem::size_of::<u64>())
.map(|v| u64::from_ne_bytes(v.try_into().unwrap() /* safe */))
.collect())
}

#[cfg(test)]
mod tests {
use crate::{
Expand Down Expand Up @@ -336,12 +319,10 @@ mod tests {
.build();

let encoded = codec
.async_encode(bytes.clone(), &chunk_representation, options)
.await
.encode(bytes.clone(), &chunk_representation, options)
.unwrap();
let decoded = codec
.async_decode(encoded.clone(), &chunk_representation, options)
.await
.decode(encoded.clone(), &chunk_representation, options)
.unwrap();
assert_ne!(encoded, decoded);
assert_eq!(bytes, decoded);
Expand Down
Loading

0 comments on commit 665e315

Please sign in to comment.