Skip to content

Commit 3d6a97a

Browse files
committed
add generation checks
1 parent 54b3bbf commit 3d6a97a

File tree

3 files changed

+102
-31
lines changed

3 files changed

+102
-31
lines changed

datafusion/datasource-parquet/src/opener.rs

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use datafusion_common::pruning::{
3838
use datafusion_common::{exec_err, DataFusionError, Result};
3939
use datafusion_datasource::PartitionedFile;
4040
use datafusion_physical_expr_common::physical_expr::{
41-
is_dynamic_physical_expr, PhysicalExpr,
41+
is_dynamic_physical_expr, snapshot_generation, PhysicalExpr,
4242
};
4343
use datafusion_physical_optimizer::pruning::PruningPredicate;
4444
use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder};
@@ -144,25 +144,23 @@ impl FileOpener for ParquetOpener {
144144
// We'll also check this after every record batch we read,
145145
// and if at some point we are able to prove we can prune the file using just the file level statistics
146146
// we can end the stream early.
147-
let file_pruner = predicate
147+
let mut file_pruner = predicate
148148
.as_ref()
149149
.map(|p| {
150-
Ok::<_, DataFusionError>(
151-
is_dynamic_physical_expr(Arc::clone(p))?.then_some(Arc::new(
152-
FilePruner::new(
153-
Arc::clone(p),
154-
&logical_file_schema,
155-
partition_fields.clone(),
156-
file.clone(),
157-
predicate_creation_errors.clone(),
158-
)?,
159-
)),
160-
)
150+
Ok::<_, DataFusionError>(is_dynamic_physical_expr(&p).then_some(
151+
FilePruner::new(
152+
Arc::clone(p),
153+
&logical_file_schema,
154+
partition_fields.clone(),
155+
file.clone(),
156+
predicate_creation_errors.clone(),
157+
)?,
158+
))
161159
})
162160
.transpose()?
163161
.flatten();
164162

165-
if let Some(file_pruner) = &file_pruner {
163+
if let Some(file_pruner) = &mut file_pruner {
166164
if file_pruner.should_prune()? {
167165
// Return an empty stream immediately to skip the work of setting up the actual stream
168166
file_metrics.files_pruned_statistics.add(1);
@@ -362,7 +360,7 @@ impl FileOpener for ParquetOpener {
362360
.and_then(|b| schema_mapping.map_batch(b).map_err(Into::into))
363361
})
364362
.take_while(move |_| {
365-
if let Some(file_pruner) = file_pruner.as_ref() {
363+
if let Some(file_pruner) = file_pruner.as_mut() {
366364
match file_pruner.should_prune() {
367365
Ok(false) => futures::future::ready(true),
368366
Ok(true) => {
@@ -520,6 +518,7 @@ fn should_enable_page_index(
520518

521519
/// Prune based on partition values and file-level statistics.
522520
pub struct FilePruner {
521+
predicate_generation: u64,
523522
predicate: Arc<dyn PhysicalExpr>,
524523
/// Schema used for pruning, which combines the file schema and partition fields.
525524
/// Partition fields are always at the end, as they are during scans.
@@ -551,6 +550,7 @@ impl FilePruner {
551550
.with_metadata(logical_file_schema.metadata().clone()),
552551
);
553552
Ok(Self {
553+
predicate_generation: snapshot_generation(&predicate),
554554
predicate,
555555
pruning_schema,
556556
file,
@@ -559,7 +559,14 @@ impl FilePruner {
559559
})
560560
}
561561

562-
pub fn should_prune(&self) -> Result<bool> {
562+
pub fn should_prune(&mut self) -> Result<bool> {
563+
let current_generation = self.predicate_generation;
564+
let new_generation = snapshot_generation(&self.predicate);
565+
// If the predicate has not changed since the last time we checked, we can skip pruning
566+
if current_generation == new_generation {
567+
return Ok(false);
568+
}
569+
self.predicate_generation = new_generation;
563570
let pruning_predicate = build_pruning_predicate(
564571
Arc::clone(&self.predicate),
565572
&self.pruning_schema,

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ use arrow::array::BooleanArray;
2727
use arrow::compute::filter_record_batch;
2828
use arrow::datatypes::{DataType, Field, FieldRef, Schema};
2929
use arrow::record_batch::RecordBatch;
30-
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
30+
use datafusion_common::tree_node::{
31+
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
32+
};
3133
use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue};
3234
use datafusion_expr_common::columnar_value::ColumnarValue;
3335
use datafusion_expr_common::interval_arithmetic::Interval;
@@ -345,6 +347,24 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash {
345347
// This is a safe default behavior.
346348
Ok(None)
347349
}
350+
351+
/// Returns the generation of this `PhysicalExpr` for snapshotting purposes.
352+
/// The generation is an arbitrary u64 that can be used to track changes
353+
/// in the state of the `PhysicalExpr` over time without having to do an exhaustive comparison.
354+
/// This is useful to avoid unecessary computation or serialization if there are no changes to the expression.
355+
/// In particular, dynamic expressions that may change over time; this allows cheap checks for changes.
356+
/// Static expressions that do not change over time should return 0, as does the default implementation.
357+
/// You should not call this method directly as it does not handle recursion.
358+
/// Instead use [`snapshot_generation`] to handle recursion and capture the
359+
/// full state of the `PhysicalExpr`.
360+
fn snapshot_generation(&self) -> u64 {
361+
// By default, we return 0 to indicate that this PhysicalExpr does not
362+
// have any dynamic references or state.
363+
// Since the recursive algorithm XORs the generations of all children the overall
364+
// generation will be 0 if no children have a non-zero generation, meaning that
365+
// static expressions will always return 0.
366+
0
367+
}
348368
}
349369

350370
/// [`PhysicalExpr`] can't be constrained by [`Eq`] directly because it must remain object
@@ -538,16 +558,30 @@ pub fn snapshot_physical_expr(
538558
.data()
539559
}
540560

541-
/// Check if the given `PhysicalExpr` is dynamic.
542-
/// See the documentation of [`PhysicalExpr::snapshot`] for more details.
543-
pub fn is_dynamic_physical_expr(expr: Arc<dyn PhysicalExpr>) -> Result<bool> {
544-
let mut is_dynamic = false;
545-
expr.transform_up(|e| {
546-
if e.snapshot()?.is_some() {
547-
is_dynamic = true;
548-
}
549-
Ok(Transformed::no(e))
561+
/// Check the generation of this `PhysicalExpr`.
562+
/// Dynamic `PhysicalExpr`s may have a generation that is incremented
563+
/// every time the state of the `PhysicalExpr` changes.
564+
/// If the generation changes that means this `PhysicalExpr` or one of its children
565+
/// has changed since the last time it was evaluated.
566+
///
567+
/// Note that the algorithm is *not* resistant to collisions, it is possible although highly unlikely
568+
/// that two different `PhysicalExpr`s have the same generation.
569+
pub fn snapshot_generation(expr: &Arc<dyn PhysicalExpr>) -> u64 {
570+
let mut generation = 0;
571+
expr.apply(|e| {
572+
// XOR our global generation with the current generation of the `PhysicalExpr`.
573+
generation ^= e.snapshot_generation();
574+
Ok(TreeNodeRecursion::Continue)
550575
})
551-
.data()?;
552-
Ok(is_dynamic)
576+
.expect("this traversal is infallible");
577+
578+
generation
579+
}
580+
581+
/// Check if the given `PhysicalExpr` is dynamic.
582+
/// Internally this calls [`snapshot_generation`] to check if the generation is non-zero,
583+
/// any dynamic `PhysicalExpr` should have a non-zero generation.
584+
pub fn is_dynamic_physical_expr(expr: &Arc<dyn PhysicalExpr>) -> bool {
585+
// If the generation is non-zero, then this `PhysicalExpr` is dynamic.
586+
snapshot_generation(expr) != 0
553587
}

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,33 @@ pub struct DynamicFilterPhysicalExpr {
4343
/// so that when we update `current()` in subsequent iterations we can re-apply the replacements.
4444
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
4545
/// The source of dynamic filters.
46-
inner: Arc<RwLock<Arc<dyn PhysicalExpr>>>,
46+
inner: Arc<RwLock<Inner>>,
4747
/// For testing purposes track the data type and nullability to make sure they don't change.
4848
/// If they do, there's a bug in the implementation.
4949
/// But this can have overhead in production, so it's only included in our tests.
5050
data_type: Arc<RwLock<Option<DataType>>>,
5151
nullable: Arc<RwLock<Option<bool>>>,
5252
}
5353

54+
#[derive(Debug)]
55+
struct Inner {
56+
/// A counter that gets incremented every time the expression is updated so that we can track changes cheaply.
57+
/// This is used for [`PhysicalExpr::generation`] to have a cheap check for changes.
58+
generation: u64,
59+
expr: Arc<dyn PhysicalExpr>,
60+
}
61+
62+
impl Inner {
63+
fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
64+
Self {
65+
// Start with generation 1 which gives us a different result for [`PhysicalExpr::generation`] than the default 0.
66+
// This is not currently used anywhere but it seems useful to have this simple distinction.
67+
generation: 1,
68+
expr,
69+
}
70+
}
71+
}
72+
5473
impl Hash for DynamicFilterPhysicalExpr {
5574
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
5675
let inner = self.current().expect("Failed to get current expression");
@@ -111,7 +130,7 @@ impl DynamicFilterPhysicalExpr {
111130
Self {
112131
children,
113132
remapped_children: None, // Initially no remapped children
114-
inner: Arc::new(RwLock::new(inner)),
133+
inner: Arc::new(RwLock::new(Inner::new(inner))),
115134
data_type: Arc::new(RwLock::new(None)),
116135
nullable: Arc::new(RwLock::new(None)),
117136
}
@@ -158,6 +177,7 @@ impl DynamicFilterPhysicalExpr {
158177
"Failed to acquire read lock for inner".to_string(),
159178
)
160179
})?
180+
.expr
161181
.clone();
162182
let inner =
163183
Self::remap_children(&self.children, self.remapped_children.as_ref(), inner)?;
@@ -186,7 +206,9 @@ impl DynamicFilterPhysicalExpr {
186206
self.remapped_children.as_ref(),
187207
new_expr,
188208
)?;
189-
*current = new_expr;
209+
current.expr = new_expr;
210+
current.generation += 1; // Increment the generation to indicate that the expression has changed.
211+
// Increment the generation to indicate that the expression has changed.
190212
Ok(())
191213
}
192214
}
@@ -291,6 +313,14 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
291313
// Return the current expression as a snapshot.
292314
Ok(Some(self.current()?))
293315
}
316+
317+
fn snapshot_generation(&self) -> u64 {
318+
// Return the current generation of the expression.
319+
self.inner
320+
.read()
321+
.expect("Failed to acquire read lock for inner")
322+
.generation
323+
}
294324
}
295325

296326
#[cfg(test)]

0 commit comments

Comments
 (0)