Skip to content

Commit 9c89043

Browse files
adriangbclaude
andcommitted
feat(parquet): RowFilter per-predicate tag + observer hook (sketch)
`DatafusionArrowPredicate::try_new_tagged` adds an optional caller `Tag` plus a `SharedRowFilterObserver` fired on each `evaluate()`. `build_row_filter_tagged((tag, expr)*, ..., observer)` mirrors `build_row_filter` but plumbs tags and the shared observer through to every constructed predicate. `rows_seen`/`rows_kept` reported to the observer are *conditional* on prior predicates in the same `RowFilter` (parquet-rs evaluates them in sequence and filters rows between them) — the marginal-value signal an adaptive scheduler wants, which falls out of the existing flow. Untagged callers pay nothing: `Option<SharedRowFilterObserver>` is `None`, no Arc, no per-batch lock. Module docs sketch the follow-up `CompoundArrowPredicate` that would own a sub-tree of physical expressions and do datafusion-side OR / NOT short-circuit during decode (where exact masks make per-branch stats and negation sound). Left unimplemented; it stands alone. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 2d5d7ef commit 9c89043

2 files changed

Lines changed: 159 additions & 24 deletions

File tree

datafusion/datasource-parquet/src/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ pub use page_filter::PagePruningAccessPlanFilter;
5252
pub use reader::*; // Expose so downstream crates can use it
5353
pub use row_filter::build_row_filter;
5454
pub use row_filter::can_expr_be_pushed_down_with_schemas;
55+
pub use row_filter::{
56+
NoopRowFilterObserver, RowFilterObserver, SharedRowFilterObserver,
57+
build_row_filter_tagged,
58+
};
5559
pub use row_group_filter::RowGroupAccessPlanFilter;
5660
#[expect(deprecated)]
5761
pub use schema_coercion::{

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 155 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,42 @@
6464
//! For example, given a struct column `s {name: Utf8, value: Int32}`:
6565
//! - `WHERE s['value'] > 5` — pushed down (accesses a primitive leaf)
6666
//! - `WHERE s IS NOT NULL` — not pushed down (references the whole struct)
67+
//!
68+
//! ## Tracking stats and short-circuiting OR
69+
//!
70+
//! Each [`DatafusionArrowPredicate`] optionally carries a [`Tag`] and a
71+
//! shared [`RowFilterObserver`] (see [`SharedRowFilterObserver`]). When
72+
//! both are set, every `evaluate` call fires
73+
//! `observer.on_evaluate(tag, rows_seen, rows_kept)`. Untagged
74+
//! row filters built via [`build_row_filter`] pay nothing (no Arc,
75+
//! no lock).
76+
//!
77+
//! `rows_seen` here is the number of rows arriving at *this*
78+
//! predicate, which parquet-rs reduces between predicates as each
79+
//! predicate's mask is applied. The signal is therefore *conditional*
80+
//! selectivity — exactly the quantity an adaptive scheduler needs to
81+
//! learn the marginal value of running predicate `P_i` after
82+
//! `P_0..P_{i-1}`.
83+
//!
84+
//! ### OR short-circuit at the row-filter layer
85+
//!
86+
//! Parquet-rs's `RowFilter` is a flat `Vec<Box<dyn ArrowPredicate>>`
87+
//! and AND-combines them with column staging between predicates. That
88+
//! makes top-level AND short-circuit "free" — we keep splitting AND
89+
//! conjuncts into separate `DatafusionArrowPredicate`s for that
90+
//! reason.
91+
//!
92+
//! `OR` and `NOT` inside an individual conjunct cannot be split that
93+
//! way; today they are evaluated as a single physical expression with
94+
//! no short-circuit. A natural follow-up is to introduce a
95+
//! `CompoundArrowPredicate` that owns a tree of physical expressions
96+
//! (analog of [`datafusion_pruning::PruningPredicateTree`]) and
97+
//! short-circuits `OR` (skip branch B if branch A already keeps all
98+
//! surviving rows) and `NOT` (cheap negation), firing the observer
99+
//! per leaf actually evaluated. Each compound predicate's
100+
//! [`ArrowPredicate::projection`] returns the union of its leaves'
101+
//! masks so the parquet decoder reads the required columns up front;
102+
//! short-circuit then saves *evaluation*, not IO, for that compound.
67103
68104
use std::collections::BTreeSet;
69105
use std::sync::Arc;
@@ -87,10 +123,39 @@ use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
87123
use datafusion_physical_expr::{PhysicalExpr, split_conjunction};
88124

89125
use datafusion_physical_plan::metrics;
126+
use datafusion_pruning::Tag;
90127

91128
use super::ParquetFileMetrics;
92129
use super::supported_predicates::supports_list_predicates;
93130

131+
/// Observer fired once per row-filter predicate evaluation. The
132+
/// `rows_seen`/`rows_kept` counts are *conditional* on prior
133+
/// predicates in the same `RowFilter` (parquet-rs evaluates them in
134+
/// sequence and filters rows between predicates), which is exactly
135+
/// what an adaptive scheduler needs to learn "what does running this
136+
/// predicate next buy us, given everything that came before".
137+
///
138+
/// Untagged predicates pass `tag = None`. The default implementation
139+
/// does nothing, so a `NoopRowFilterObserver` is just `()`.
140+
pub trait RowFilterObserver: std::fmt::Debug + Send {
141+
/// Called after a `DatafusionArrowPredicate::evaluate` runs.
142+
fn on_evaluate(&mut self, _tag: Option<Tag>, _rows_seen: usize, _rows_kept: usize) {}
143+
}
144+
145+
/// No-op observer; row filters built via [`build_row_filter`] (no
146+
/// tags) use this implicitly.
147+
#[derive(Debug, Default, Clone, Copy)]
148+
pub struct NoopRowFilterObserver;
149+
impl RowFilterObserver for NoopRowFilterObserver {}
150+
151+
/// Shared handle used to plug a single observer into every
152+
/// `DatafusionArrowPredicate` constructed for a given file. Parquet-rs
153+
/// owns the predicates after `RowFilter::new`, so each predicate
154+
/// holds an `Arc<Mutex<...>>` to the observer rather than a borrow.
155+
///
156+
/// This mirrors how `metrics::Count` is plumbed through today.
157+
pub type SharedRowFilterObserver = Arc<std::sync::Mutex<dyn RowFilterObserver>>;
158+
94159
/// A "compiled" predicate passed to `ParquetRecordBatchStream` to perform
95160
/// row-level filtering during parquet decoding.
96161
///
@@ -119,15 +184,26 @@ pub(crate) struct DatafusionArrowPredicate {
119184
rows_matched: metrics::Count,
120185
/// how long was spent evaluating this predicate
121186
time: metrics::Time,
187+
/// Optional caller-supplied tag fired through `observer.on_evaluate`.
188+
tag: Option<Tag>,
189+
/// Optional observer shared across all predicates in a single
190+
/// row filter. `None` means no per-evaluate hook (and no
191+
/// per-batch lock acquisition cost).
192+
observer: Option<SharedRowFilterObserver>,
122193
}
123194

124195
impl DatafusionArrowPredicate {
125-
/// Create a new `DatafusionArrowPredicate` from a `FilterCandidate`
126-
pub fn try_new(
196+
/// Create a `DatafusionArrowPredicate`. Pass `tag = None` and
197+
/// `observer = None` for untagged row filters — the per-evaluate
198+
/// observer call is then elided. When both are `Some`, each
199+
/// `evaluate` fires `observer.on_evaluate(tag, rows_seen, rows_kept)`.
200+
pub fn try_new_tagged(
127201
candidate: FilterCandidate,
128202
rows_pruned: metrics::Count,
129203
rows_matched: metrics::Count,
130204
time: metrics::Time,
205+
tag: Option<Tag>,
206+
observer: Option<SharedRowFilterObserver>,
131207
) -> Result<Self> {
132208
let physical_expr =
133209
reassign_expr_columns(candidate.expr, &candidate.read_plan.projected_schema)?;
@@ -138,6 +214,8 @@ impl DatafusionArrowPredicate {
138214
rows_pruned,
139215
rows_matched,
140216
time,
217+
tag,
218+
observer,
141219
})
142220
}
143221
}
@@ -160,6 +238,14 @@ impl ArrowPredicate for DatafusionArrowPredicate {
160238
let num_pruned = bool_arr.len() - num_matched;
161239
self.rows_pruned.add(num_pruned);
162240
self.rows_matched.add(num_matched);
241+
if let Some(obs) = &self.observer {
242+
// Cheap on the No-op path: when `obs` is set, the
243+
// caller has chosen to pay for the lock. Untagged
244+
// RowFilters never construct this Arc.
245+
if let Ok(mut guard) = obs.lock() {
246+
guard.on_evaluate(self.tag, bool_arr.len(), num_matched);
247+
}
248+
}
163249
timer.stop();
164250
Ok(bool_arr)
165251
})
@@ -1018,63 +1104,104 @@ pub fn build_row_filter(
10181104
metadata: &ParquetMetaData,
10191105
reorder_predicates: bool,
10201106
file_metrics: &ParquetFileMetrics,
1107+
) -> Result<Option<RowFilter>> {
1108+
// Untagged path: split top-level AND conjuncts, no observer.
1109+
let predicates: Vec<(Option<Tag>, Arc<dyn PhysicalExpr>)> = split_conjunction(expr)
1110+
.into_iter()
1111+
.map(|e| (None, Arc::clone(e)))
1112+
.collect();
1113+
build_row_filter_inner(
1114+
&predicates,
1115+
file_schema,
1116+
metadata,
1117+
reorder_predicates,
1118+
file_metrics,
1119+
None,
1120+
)
1121+
}
1122+
1123+
/// Tagged variant of [`build_row_filter`]. Each `(tag, expr)` pair
1124+
/// becomes one row-filter predicate; evaluations fire
1125+
/// `observer.on_evaluate(tag, rows_seen, rows_kept)`.
1126+
pub fn build_row_filter_tagged(
1127+
tagged: &[(Tag, Arc<dyn PhysicalExpr>)],
1128+
file_schema: &SchemaRef,
1129+
metadata: &ParquetMetaData,
1130+
reorder_predicates: bool,
1131+
file_metrics: &ParquetFileMetrics,
1132+
observer: &SharedRowFilterObserver,
1133+
) -> Result<Option<RowFilter>> {
1134+
let predicates: Vec<(Option<Tag>, Arc<dyn PhysicalExpr>)> = tagged
1135+
.iter()
1136+
.map(|(tag, expr)| (Some(*tag), Arc::clone(expr)))
1137+
.collect();
1138+
build_row_filter_inner(
1139+
&predicates,
1140+
file_schema,
1141+
metadata,
1142+
reorder_predicates,
1143+
file_metrics,
1144+
Some(observer),
1145+
)
1146+
}
1147+
1148+
fn build_row_filter_inner(
1149+
predicates: &[(Option<Tag>, Arc<dyn PhysicalExpr>)],
1150+
file_schema: &SchemaRef,
1151+
metadata: &ParquetMetaData,
1152+
reorder_predicates: bool,
1153+
file_metrics: &ParquetFileMetrics,
1154+
observer: Option<&SharedRowFilterObserver>,
10211155
) -> Result<Option<RowFilter>> {
10221156
let rows_pruned = &file_metrics.pushdown_rows_pruned;
10231157
let rows_matched = &file_metrics.pushdown_rows_matched;
10241158
let time = &file_metrics.row_pushdown_eval_time;
10251159

1026-
// Split into conjuncts:
1027-
// `a = 1 AND b = 2 AND c = 3` -> [`a = 1`, `b = 2`, `c = 3`]
1028-
let predicates = split_conjunction(expr);
1029-
1030-
// Determine which conjuncts can be evaluated as ArrowPredicates, if any
1031-
let mut candidates: Vec<FilterCandidate> = predicates
1032-
.into_iter()
1033-
.map(|expr| {
1160+
let mut candidates: Vec<(Option<Tag>, FilterCandidate)> = predicates
1161+
.iter()
1162+
.map(|(tag, expr)| {
10341163
FilterCandidateBuilder::new(Arc::clone(expr), Arc::clone(file_schema))
10351164
.build(metadata)
1165+
.map(|c| c.map(|c| (*tag, c)))
10361166
})
10371167
.collect::<Result<Vec<_>, _>>()?
10381168
.into_iter()
10391169
.flatten()
10401170
.collect();
10411171

1042-
// no candidates
10431172
if candidates.is_empty() {
10441173
return Ok(None);
10451174
}
10461175

10471176
if reorder_predicates {
1048-
candidates.sort_unstable_by_key(|c| c.required_bytes);
1177+
// Reordering applies only at the parquet-rs AND level, so it
1178+
// is safe to sort by I/O cost across tags too. The observer
1179+
// sees evaluations in the post-reorder order, which is
1180+
// exactly the order the rows flow.
1181+
candidates.sort_unstable_by_key(|(_, c)| c.required_bytes);
10491182
}
10501183

1051-
// To avoid double-counting metrics when multiple predicates are used:
1052-
// - All predicates should count rows_pruned (cumulative pruned rows)
1053-
// - Only the last predicate should count rows_matched (final result)
1054-
// This ensures: rows_matched + rows_pruned = total rows processed
10551184
let total_candidates = candidates.len();
10561185

10571186
candidates
10581187
.into_iter()
10591188
.enumerate()
1060-
.map(|(idx, candidate)| {
1189+
.map(|(idx, (tag, candidate))| {
10611190
let is_last = idx == total_candidates - 1;
1062-
1063-
// All predicates share the pruned counter (cumulative)
10641191
let predicate_rows_pruned = rows_pruned.clone();
1065-
1066-
// Only the last predicate tracks matched rows (final result)
10671192
let predicate_rows_matched = if is_last {
10681193
rows_matched.clone()
10691194
} else {
10701195
metrics::Count::new()
10711196
};
10721197

1073-
DatafusionArrowPredicate::try_new(
1198+
DatafusionArrowPredicate::try_new_tagged(
10741199
candidate,
10751200
predicate_rows_pruned,
10761201
predicate_rows_matched,
10771202
time.clone(),
1203+
tag,
1204+
observer.cloned(),
10781205
)
10791206
.map(|pred| Box::new(pred) as _)
10801207
})
@@ -1247,11 +1374,13 @@ mod test {
12471374
.expect("building candidate")
12481375
.expect("candidate expected");
12491376

1250-
let mut row_filter = DatafusionArrowPredicate::try_new(
1377+
let mut row_filter = DatafusionArrowPredicate::try_new_tagged(
12511378
candidate,
12521379
Count::new(),
12531380
Count::new(),
12541381
Time::new(),
1382+
None,
1383+
None,
12551384
)
12561385
.expect("creating filter predicate");
12571386

@@ -1286,11 +1415,13 @@ mod test {
12861415
.expect("building candidate")
12871416
.expect("candidate expected");
12881417

1289-
let mut row_filter = DatafusionArrowPredicate::try_new(
1418+
let mut row_filter = DatafusionArrowPredicate::try_new_tagged(
12901419
candidate,
12911420
Count::new(),
12921421
Count::new(),
12931422
Time::new(),
1423+
None,
1424+
None,
12941425
)
12951426
.expect("creating filter predicate");
12961427

0 commit comments

Comments
 (0)