Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions encodings/alp/benches/alp_compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,16 @@ fn compress_rd<T: ALPRDFloat + NativePType>(bencher: Bencher, args: (usize, f64)
let encoder = RDEncoder::new(primitive.as_slice::<T>());

bencher
.with_inputs(|| (&primitive, &encoder, SESSION.create_execution_ctx()))
.bench_refs(|(primitive, encoder, ctx)| encoder.encode(primitive.as_view(), ctx))
.with_inputs(|| (&primitive, &encoder))
.bench_refs(|(primitive, encoder)| encoder.encode(primitive.as_view()))
}

#[divan::bench(types = [f32, f64], args = RD_BENCH_ARGS)]
fn decompress_rd<T: ALPRDFloat + NativePType>(bencher: Bencher, args: (usize, f64)) {
let (n, fraction_patch) = args;
let primitive = make_rd_array::<T>(n, fraction_patch);
let encoder = RDEncoder::new(primitive.as_slice::<T>());
let encoded = encoder.encode(primitive.as_view(), &mut SESSION.create_execution_ctx());
let encoded = encoder.encode(primitive.as_view());

bencher
.with_inputs(|| (&encoded, SESSION.create_execution_ctx()))
Expand Down
9 changes: 2 additions & 7 deletions encodings/alp/src/alp/compute/mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,13 @@ impl MaskKernel for ALP {
) -> VortexResult<Option<ArrayRef>> {
let vortex_mask = Validity::Array(mask.not()?).execute_mask(array.len(), ctx)?;
let masked_encoded = array.encoded().clone().mask(mask.clone())?;
let masked_dtype = array
.dtype()
.with_nullability(masked_encoded.dtype().nullability());
let masked_patches = array
.patches()
.map(|p| p.mask(&vortex_mask, ctx))
.transpose()?
.flatten()
.map(|patches| patches.cast_values(&masked_dtype))
.transpose()?;
.flatten();
Ok(Some(
ALP::new(masked_encoded, array.exponents(), masked_patches).into_array(),
ALP::try_new(masked_encoded, array.exponents(), masked_patches)?.into_array(),
))
}
}
Expand Down
12 changes: 2 additions & 10 deletions encodings/alp/src/alp/compute/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,9 @@ impl TakeExecute for ALP {
.patches()
.map(|p| p.take(indices, ctx))
.transpose()?
.flatten()
.map(|patches| {
patches.cast_values(
&array
.dtype()
.with_nullability(taken_encoded.dtype().nullability()),
)
})
.transpose()?;
.flatten();
Ok(Some(
ALP::new(taken_encoded, array.exponents(), taken_patches).into_array(),
ALP::try_new(taken_encoded, array.exponents(), taken_patches)?.into_array(),
))
}
}
Expand Down
55 changes: 9 additions & 46 deletions encodings/alp/src/alp_rd/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@ use vortex_array::ArrayParts;
use vortex_array::ArrayRef;
use vortex_array::ArraySlots;
use vortex_array::ArrayView;
use vortex_array::Canonical;
use vortex_array::EqMode;
use vortex_array::ExecutionCtx;
use vortex_array::ExecutionResult;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::TypedArrayRef;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::Primitive;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::buffer::BufferHandle;
Expand Down Expand Up @@ -217,13 +214,6 @@ impl VTable for ALPRD {
)
})
.transpose()?;
// NOTE: `VTable::deserialize` has a fixed trait signature without `ExecutionCtx`, so we
// cannot plumb a ctx in here. We construct a legacy ctx locally at this trait boundary.
let left_parts_patches = ALPRDData::canonicalize_patches(
Comment thread
robert3005 marked this conversation as resolved.
&left_parts,
left_parts_patches,
&mut LEGACY_SESSION.create_execution_ctx(),
)?;
let slots = ALPRDData::make_slots(&left_parts, &right_parts, left_parts_patches.as_ref());
let data = ALPRDData::new(
left_parts_dictionary,
Expand Down Expand Up @@ -376,11 +366,8 @@ impl ALPRD {
right_parts: ArrayRef,
right_bit_width: u8,
left_parts_patches: Option<Patches>,
ctx: &mut ExecutionCtx,
) -> VortexResult<ALPRDArray> {
let len = left_parts.len();
let left_parts_patches =
ALPRDData::canonicalize_patches(&left_parts, left_parts_patches, ctx)?;
let slots = ALPRDData::make_slots(&left_parts, &right_parts, left_parts_patches.as_ref());
let data = ALPRDData::new(left_parts_dictionary, right_bit_width, left_parts_patches);
Array::try_from_parts(ArrayParts::new(ALPRD, dtype, len, data).with_slots(slots))
Expand Down Expand Up @@ -408,28 +395,6 @@ impl ALPRD {
}

impl ALPRDData {
fn canonicalize_patches(
left_parts: &ArrayRef,
left_parts_patches: Option<Patches>,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<Patches>> {
left_parts_patches
.map(|patches| {
if !patches.values().all_valid(ctx)? {
vortex_bail!("patches must be all valid: {}", patches.values());
}
// TODO(ngates): assert the DType, don't cast it.
// TODO(joe): assert the DType, don't cast it in the next PR.
let mut patches = patches.cast_values(&left_parts.dtype().as_nonnullable())?;
// Force execution of the lazy cast so patch values are materialized
// before serialization.
let canonical = patches.values().clone().execute::<Canonical>(ctx)?;
*patches.values_mut() = canonical.into_array();
Ok(patches)
})
.transpose()
}

/// Build a new `ALPRDArray` from components.
pub fn new(
left_parts_dictionary: Buffer<u16>,
Expand Down Expand Up @@ -556,18 +521,16 @@ fn validate_parts(
"patches array_len {} != outer len {len}",
patches.array_len(),
);
// Left-parts exceptions are always all-valid and are stored as the non-nullable left-parts
// dtype. Requiring that exact dtype (rather than ignoring nullability) means each
// construction path must produce correct patches, removing the need to normalize them.
// Non-nullable also implies all-valid, so no separate validity check is required.
let expected = left_parts.dtype().as_nonnullable();
vortex_ensure!(
patches.dtype().eq_ignore_nullability(left_parts.dtype()),
"patches dtype {} does not match left_parts dtype {}",
patches.dtype() == &expected,
"patches dtype {} must be the non-nullable left_parts dtype {}",
patches.dtype(),
left_parts.dtype(),
);
vortex_ensure!(
patches
.values()
.all_valid(&mut LEGACY_SESSION.create_execution_ctx())?,
"patches must be all valid: {}",
patches.values()
expected,
);
}

Expand Down Expand Up @@ -672,7 +635,7 @@ mod test {
// Pick a seed that we know will trigger lots of patches.
let encoder: alp_rd::RDEncoder = alp_rd::RDEncoder::new(&[seed.powi(-2)]);

let rd_array = encoder.encode(real_array.as_view(), &mut ctx);
let rd_array = encoder.encode(real_array.as_view());

let decoded = rd_array
.as_array()
Expand Down
14 changes: 7 additions & 7 deletions encodings/alp/src/alp_rd/compute/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ mod tests {
let values = vec![1.0f32, 1.1, 1.2, 1.3, 1.4];
let arr = PrimitiveArray::from_iter(values.clone());
let encoder = RDEncoder::new(&values);
let alprd = encoder.encode(arr.as_view(), &mut ctx);
let alprd = encoder.encode(arr.as_view());

let casted = alprd
.into_array()
Expand All @@ -92,7 +92,7 @@ mod tests {
PrimitiveArray::from_option_iter([Some(10.0f64), None, Some(10.1), Some(10.2), None]);
let values = vec![10.0f64, 10.1, 10.2];
let encoder = RDEncoder::new(&values);
let alprd = encoder.encode(arr.as_view(), &mut ctx);
let alprd = encoder.encode(arr.as_view());

// Cast to NonNullable should fail since we have nulls. The failure surfaces during
// execution since the reduce path defers when the validity stat is not cached.
Expand Down Expand Up @@ -122,31 +122,31 @@ mod tests {
let values = vec![1.23f32, 4.56, 7.89, 10.11, 12.13];
let arr = PrimitiveArray::from_iter(values.clone());
let encoder = RDEncoder::new(&values);
encoder.encode(arr.as_view(), &mut array_session().create_execution_ctx())
encoder.encode(arr.as_view())
})]
#[case::f64({
let values = vec![100.1f64, 200.2, 300.3, 400.4, 500.5];
let arr = PrimitiveArray::from_iter(values.clone());
let encoder = RDEncoder::new(&values);
encoder.encode(arr.as_view(), &mut array_session().create_execution_ctx())
encoder.encode(arr.as_view())
})]
#[case::single({
let values = vec![42.42f64];
let arr = PrimitiveArray::from_iter(values.clone());
let encoder = RDEncoder::new(&values);
encoder.encode(arr.as_view(), &mut array_session().create_execution_ctx())
encoder.encode(arr.as_view())
})]
#[case::negative({
let values = vec![0.0f32, -1.5, 2.5, -3.5, 4.5];
let arr = PrimitiveArray::from_iter(values.clone());
let encoder = RDEncoder::new(&values);
encoder.encode(arr.as_view(), &mut array_session().create_execution_ctx())
encoder.encode(arr.as_view())
})]
#[case::nullable({
let arr = PrimitiveArray::from_option_iter([Some(1.1f32), None, Some(2.2), Some(3.3), None]);
let values = vec![1.1f32, 2.2, 3.3];
let encoder = RDEncoder::new(&values);
encoder.encode(arr.as_view(), &mut array_session().create_execution_ctx())
encoder.encode(arr.as_view())
})]
fn test_cast_alprd_conformance(#[case] alprd: crate::alp_rd::ALPRDArray) {
test_cast_conformance(&alprd.into_array());
Expand Down
11 changes: 2 additions & 9 deletions encodings/alp/src/alp_rd/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ impl FilterKernel for ALPRD {
array.right_parts().filter(mask.clone())?,
array.right_bit_width(),
left_parts_exceptions,
ctx,
)?
.into_array(),
))
Expand Down Expand Up @@ -71,7 +70,7 @@ mod test {
fn test_filter<T: ALPRDFloat>(#[case] a: T, #[case] b: T, #[case] outlier: T) {
let mut ctx = SESSION.create_execution_ctx();
let array = PrimitiveArray::new(buffer![a, b, outlier], Validity::NonNullable);
let encoded = RDEncoder::new(&[a, b]).encode(array.as_view(), &mut ctx);
let encoded = RDEncoder::new(&[a, b]).encode(array.as_view());

// Make sure that we're testing the exception pathway.
assert!(encoded.left_parts_patches().is_some());
Expand All @@ -87,13 +86,9 @@ mod test {
#[case(0.1f32, 0.2f32, 3e25f32)]
#[case(0.1f64, 0.2f64, 3e100f64)]
fn test_filter_simple<T: ALPRDFloat>(#[case] a: T, #[case] b: T, #[case] outlier: T) {
let mut ctx = SESSION.create_execution_ctx();
test_filter_conformance(
&RDEncoder::new(&[a, b])
.encode(
PrimitiveArray::from_iter([a, b, outlier, b, outlier]).as_view(),
&mut ctx,
)
.encode(PrimitiveArray::from_iter([a, b, outlier, b, outlier]).as_view())
.into_array(),
);
}
Expand All @@ -102,13 +97,11 @@ mod test {
#[case(0.1f32, 3e25f32)]
#[case(0.5f64, 1e100f64)]
fn test_filter_with_nulls<T: ALPRDFloat>(#[case] a: T, #[case] outlier: T) {
let mut ctx = SESSION.create_execution_ctx();
test_filter_conformance(
&RDEncoder::new(&[a])
.encode(
PrimitiveArray::from_option_iter([Some(a), None, Some(outlier), Some(a), None])
.as_view(),
&mut ctx,
)
.into_array(),
);
Expand Down
15 changes: 1 addition & 14 deletions encodings/alp/src/alp_rd/compute/mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
use vortex_array::ArrayRef;
use vortex_array::ArrayView;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::scalar_fn::ScalarFnFactoryExt;
use vortex_array::scalar_fn::EmptyOptions;
use vortex_array::scalar_fn::fns::mask::Mask as MaskExpr;
Expand All @@ -22,8 +20,6 @@ impl MaskReduce for ALPRD {
EmptyOptions,
[array.left_parts().clone(), mask.clone()],
)?;
// NOTE: `MaskReduce::mask` has a fixed trait signature without `ExecutionCtx`, so we
// construct a legacy ctx locally at this trait boundary.
Ok(Some(
ALPRD::try_new(
array.dtype().as_nullable(),
Expand All @@ -32,7 +28,6 @@ impl MaskReduce for ALPRD {
array.right_parts().clone(),
array.right_bit_width(),
array.left_parts_patches(),
&mut LEGACY_SESSION.create_execution_ctx(),
)?
.into_array(),
))
Expand All @@ -43,8 +38,6 @@ impl MaskReduce for ALPRD {
mod tests {
use rstest::rstest;
use vortex_array::IntoArray;
use vortex_array::VortexSessionExecute;
use vortex_array::array_session;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::compute::conformance::mask::test_mask_conformance;

Expand All @@ -55,13 +48,9 @@ mod tests {
#[case(0.1f32, 0.2f32, 3e25f32)]
#[case(0.1f64, 0.2f64, 3e100f64)]
fn test_mask_simple<T: ALPRDFloat>(#[case] a: T, #[case] b: T, #[case] outlier: T) {
let mut ctx = array_session().create_execution_ctx();
test_mask_conformance(
&RDEncoder::new(&[a, b])
.encode(
PrimitiveArray::from_iter([a, b, outlier, b, outlier]).as_view(),
&mut ctx,
)
.encode(PrimitiveArray::from_iter([a, b, outlier, b, outlier]).as_view())
.into_array(),
);
}
Expand All @@ -70,13 +59,11 @@ mod tests {
#[case(0.1f32, 3e25f32)]
#[case(0.5f64, 1e100f64)]
fn test_mask_with_nulls<T: ALPRDFloat>(#[case] a: T, #[case] outlier: T) {
let mut ctx = array_session().create_execution_ctx();
test_mask_conformance(
&RDEncoder::new(&[a])
.encode(
PrimitiveArray::from_option_iter([Some(a), None, Some(outlier), Some(a), None])
.as_view(),
&mut ctx,
)
.into_array(),
);
Expand Down
Loading
Loading