Skip to content

Commit 78f93f1

Browse files
committed
Add pruning aggregate functions
Signed-off-by: Nicholas Gates <nick@nickgates.com>
1 parent 2138a72 commit 78f93f1

14 files changed

Lines changed: 1247 additions & 19 deletions

File tree

vortex-array/public-api.lock

Lines changed: 348 additions & 0 deletions
Large diffs are not rendered by default.

vortex-array/src/aggregate_fn/accumulator.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -124,14 +124,19 @@ impl<V: AggregateFnVTable> DynAccumulator for Accumulator<V> {
124124
if let Some(stat) = Stat::from_aggregate_fn(&self.aggregate_fn)
125125
&& let Some(Precision::Exact(partial)) = batch.statistics().get(stat)
126126
{
127-
vortex_ensure!(
128-
partial.dtype() == &self.partial_dtype,
129-
"Aggregate {} read legacy stat {} with dtype {}, expected {}",
130-
self.aggregate_fn,
131-
stat,
132-
partial.dtype(),
133-
self.partial_dtype,
134-
);
127+
let partial = if partial.dtype() == &self.partial_dtype {
128+
partial
129+
} else {
130+
vortex_ensure!(
131+
partial.dtype().eq_ignore_nullability(&self.partial_dtype),
132+
"Aggregate {} read legacy stat {} with dtype {}, expected {}",
133+
self.aggregate_fn,
134+
stat,
135+
partial.dtype(),
136+
self.partial_dtype,
137+
);
138+
partial.cast(&self.partial_dtype)?
139+
};
135140
self.vtable.combine_partials(&mut self.partial, partial)?;
136141
return Ok(());
137142
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use vortex_error::VortexResult;
5+
6+
use crate::ArrayRef;
7+
use crate::Columnar;
8+
use crate::ExecutionCtx;
9+
use crate::IntoArray;
10+
use crate::aggregate_fn::AggregateFnId;
11+
use crate::aggregate_fn::AggregateFnVTable;
12+
use crate::aggregate_fn::EmptyOptions;
13+
use crate::dtype::DType;
14+
use crate::dtype::Nullability;
15+
use crate::scalar::Scalar;
16+
17+
/// Compute whether every value in an array is non-null.
18+
#[derive(Clone, Debug)]
19+
pub struct AllNonNull;
20+
21+
impl AggregateFnVTable for AllNonNull {
22+
type Options = EmptyOptions;
23+
type Partial = bool;
24+
25+
fn id(&self) -> AggregateFnId {
26+
AggregateFnId::new("vortex.all_non_null")
27+
}
28+
29+
fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
30+
Ok(None)
31+
}
32+
33+
fn return_dtype(&self, _options: &Self::Options, _input_dtype: &DType) -> Option<DType> {
34+
Some(DType::Bool(Nullability::NonNullable))
35+
}
36+
37+
fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
38+
self.return_dtype(options, input_dtype)
39+
}
40+
41+
fn empty_partial(
42+
&self,
43+
_options: &Self::Options,
44+
_input_dtype: &DType,
45+
) -> VortexResult<Self::Partial> {
46+
Ok(true)
47+
}
48+
49+
fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
50+
*partial &= bool::try_from(&other)?;
51+
Ok(())
52+
}
53+
54+
fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
55+
Ok(Scalar::bool(*partial, Nullability::NonNullable))
56+
}
57+
58+
fn reset(&self, partial: &mut Self::Partial) {
59+
*partial = true;
60+
}
61+
62+
fn is_saturated(&self, partial: &Self::Partial) -> bool {
63+
!*partial
64+
}
65+
66+
fn try_accumulate(
67+
&self,
68+
state: &mut Self::Partial,
69+
batch: &ArrayRef,
70+
ctx: &mut ExecutionCtx,
71+
) -> VortexResult<bool> {
72+
*state &= batch.invalid_count(ctx)? == 0;
73+
Ok(true)
74+
}
75+
76+
fn accumulate(
77+
&self,
78+
partial: &mut Self::Partial,
79+
batch: &Columnar,
80+
ctx: &mut ExecutionCtx,
81+
) -> VortexResult<()> {
82+
*partial &= match batch {
83+
Columnar::Constant(c) => c.is_empty() || !c.scalar().is_null(),
84+
Columnar::Canonical(c) => c.clone().into_array().invalid_count(ctx)? == 0,
85+
};
86+
Ok(())
87+
}
88+
89+
fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
90+
Ok(partials)
91+
}
92+
93+
fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
94+
self.to_scalar(partial)
95+
}
96+
}
97+
98+
#[cfg(test)]
99+
mod tests {
100+
use vortex_error::VortexResult;
101+
102+
use crate::IntoArray;
103+
use crate::LEGACY_SESSION;
104+
use crate::VortexSessionExecute;
105+
use crate::aggregate_fn::Accumulator;
106+
use crate::aggregate_fn::DynAccumulator;
107+
use crate::aggregate_fn::EmptyOptions;
108+
use crate::aggregate_fn::fns::all_non_null::AllNonNull;
109+
use crate::arrays::PrimitiveArray;
110+
use crate::dtype::DType;
111+
use crate::dtype::Nullability;
112+
use crate::dtype::PType;
113+
114+
#[test]
115+
fn all_non_null_aggregate_fn() -> VortexResult<()> {
116+
let mut ctx = LEGACY_SESSION.create_execution_ctx();
117+
let dtype = DType::Primitive(PType::I32, Nullability::Nullable);
118+
let mut acc = Accumulator::try_new(AllNonNull, EmptyOptions, dtype)?;
119+
120+
let batch = PrimitiveArray::from_option_iter([Some(1i32), Some(2), Some(3)]).into_array();
121+
acc.accumulate(&batch, &mut ctx)?;
122+
123+
assert!(bool::try_from(&acc.finish()?)?);
124+
Ok(())
125+
}
126+
127+
#[test]
128+
fn all_non_null_false_with_nulls() -> VortexResult<()> {
129+
let mut ctx = LEGACY_SESSION.create_execution_ctx();
130+
let dtype = DType::Primitive(PType::I32, Nullability::Nullable);
131+
let mut acc = Accumulator::try_new(AllNonNull, EmptyOptions, dtype)?;
132+
133+
let batch = PrimitiveArray::from_option_iter([Some(1i32), None, Some(3)]).into_array();
134+
acc.accumulate(&batch, &mut ctx)?;
135+
136+
assert!(!bool::try_from(&acc.finish()?)?);
137+
Ok(())
138+
}
139+
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use vortex_error::VortexResult;
5+
6+
use crate::ArrayRef;
7+
use crate::Columnar;
8+
use crate::ExecutionCtx;
9+
use crate::IntoArray;
10+
use crate::aggregate_fn::AggregateFnId;
11+
use crate::aggregate_fn::AggregateFnVTable;
12+
use crate::aggregate_fn::EmptyOptions;
13+
use crate::dtype::DType;
14+
use crate::dtype::Nullability;
15+
use crate::scalar::Scalar;
16+
17+
/// Compute whether every value in an array is null.
18+
#[derive(Clone, Debug)]
19+
pub struct AllNull;
20+
21+
impl AggregateFnVTable for AllNull {
22+
type Options = EmptyOptions;
23+
type Partial = bool;
24+
25+
fn id(&self) -> AggregateFnId {
26+
AggregateFnId::new("vortex.all_null")
27+
}
28+
29+
fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
30+
Ok(None)
31+
}
32+
33+
fn return_dtype(&self, _options: &Self::Options, _input_dtype: &DType) -> Option<DType> {
34+
Some(DType::Bool(Nullability::NonNullable))
35+
}
36+
37+
fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
38+
self.return_dtype(options, input_dtype)
39+
}
40+
41+
fn empty_partial(
42+
&self,
43+
_options: &Self::Options,
44+
_input_dtype: &DType,
45+
) -> VortexResult<Self::Partial> {
46+
Ok(true)
47+
}
48+
49+
fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
50+
*partial &= bool::try_from(&other)?;
51+
Ok(())
52+
}
53+
54+
fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
55+
Ok(Scalar::bool(*partial, Nullability::NonNullable))
56+
}
57+
58+
fn reset(&self, partial: &mut Self::Partial) {
59+
*partial = true;
60+
}
61+
62+
fn is_saturated(&self, partial: &Self::Partial) -> bool {
63+
!*partial
64+
}
65+
66+
fn try_accumulate(
67+
&self,
68+
state: &mut Self::Partial,
69+
batch: &ArrayRef,
70+
ctx: &mut ExecutionCtx,
71+
) -> VortexResult<bool> {
72+
*state &= batch.invalid_count(ctx)? == batch.len();
73+
Ok(true)
74+
}
75+
76+
fn accumulate(
77+
&self,
78+
partial: &mut Self::Partial,
79+
batch: &Columnar,
80+
ctx: &mut ExecutionCtx,
81+
) -> VortexResult<()> {
82+
*partial &= match batch {
83+
Columnar::Constant(c) => c.is_empty() || c.scalar().is_null(),
84+
Columnar::Canonical(c) => {
85+
let array = c.clone().into_array();
86+
array.invalid_count(ctx)? == array.len()
87+
}
88+
};
89+
Ok(())
90+
}
91+
92+
fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
93+
Ok(partials)
94+
}
95+
96+
fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
97+
self.to_scalar(partial)
98+
}
99+
}
100+
101+
#[cfg(test)]
102+
mod tests {
103+
use vortex_error::VortexResult;
104+
105+
use crate::IntoArray;
106+
use crate::LEGACY_SESSION;
107+
use crate::VortexSessionExecute;
108+
use crate::aggregate_fn::Accumulator;
109+
use crate::aggregate_fn::DynAccumulator;
110+
use crate::aggregate_fn::EmptyOptions;
111+
use crate::aggregate_fn::fns::all_null::AllNull;
112+
use crate::arrays::PrimitiveArray;
113+
use crate::dtype::DType;
114+
use crate::dtype::Nullability;
115+
use crate::dtype::PType;
116+
117+
#[test]
118+
fn all_null_aggregate_fn() -> VortexResult<()> {
119+
let mut ctx = LEGACY_SESSION.create_execution_ctx();
120+
let dtype = DType::Primitive(PType::I32, Nullability::Nullable);
121+
let mut acc = Accumulator::try_new(AllNull, EmptyOptions, dtype)?;
122+
123+
let batch = PrimitiveArray::from_option_iter::<i32, _>([None, None, None]).into_array();
124+
acc.accumulate(&batch, &mut ctx)?;
125+
126+
assert!(bool::try_from(&acc.finish()?)?);
127+
Ok(())
128+
}
129+
130+
#[test]
131+
fn all_null_false_with_non_nulls() -> VortexResult<()> {
132+
let mut ctx = LEGACY_SESSION.create_execution_ctx();
133+
let dtype = DType::Primitive(PType::I32, Nullability::Nullable);
134+
let mut acc = Accumulator::try_new(AllNull, EmptyOptions, dtype)?;
135+
136+
let batch = PrimitiveArray::from_option_iter([Some(1i32), None, Some(3)]).into_array();
137+
acc.accumulate(&batch, &mut ctx)?;
138+
139+
assert!(!bool::try_from(&acc.finish()?)?);
140+
Ok(())
141+
}
142+
}

0 commit comments

Comments
 (0)