Skip to content

Commit ea4a4e0

Browse files
committed
add comments
1 parent c799847 commit ea4a4e0

File tree

2 files changed

+56
-22
lines changed

2 files changed

+56
-22
lines changed

datafusion/physical-optimizer/src/filter_pushdown.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -422,8 +422,8 @@ fn push_down_filters(
422422

423423
for (child, parent_filters, self_filters) in izip!(
424424
children,
425-
filter_description.parent_filters,
426-
filter_description.self_filters
425+
filter_description.parent_filters(),
426+
filter_description.self_filters()
427427
) {
428428
// Here, `parent_filters` are the predicates which are provided by the parent node of
429429
// the current node, and tried to be pushed down over the child which the loop points

datafusion/physical-plan/src/filter_pushdown.rs

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -189,25 +189,59 @@ impl<T> FilterPushdownPropagation<T> {
189189
}
190190
}
191191

192+
#[derive(Debug, Clone)]
193+
struct ChildFilterDescription {
194+
/// Description of which parent filters can be pushed down into this node.
195+
/// Since we need to transmit filter pushdown results back to this node's parent
196+
/// we need to track each parent filter for each child, even those that are unsupported / won't be pushed down.
197+
/// We do this using a [`PredicateSupports`] which simplifies manipulating supported/unsupported filters.
198+
parent_filters: PredicateSupports,
199+
/// Description of which filters this node is pushing down to its children.
200+
/// Since this is not transmitted back to the parents we can have variable sized inner arrays
201+
/// instead of having to track supported/unsupported.
202+
self_filters: Vec<Arc<dyn PhysicalExpr>>,
203+
}
204+
205+
impl ChildFilterDescription {
206+
fn new() -> Self {
207+
Self {
208+
parent_filters: PredicateSupports::new(vec![]),
209+
self_filters: vec![],
210+
}
211+
}
212+
}
213+
192214
#[derive(Debug, Clone)]
193215
pub struct FilterDescription {
194-
num_children: usize,
195-
/// Vector storing the [`PredicateSupports`] for each child.
196-
pub parent_filters: Vec<PredicateSupports>,
197-
/// Vector storing the physical expressions for each child.
198-
/// Inner vector is for multiple predicates, if the node stores them such.
199-
pub self_filters: Vec<Vec<Arc<dyn PhysicalExpr>>>,
216+
/// A filter description for each child.
217+
/// This includes which parent filters and which self filters (from the node in question)
218+
/// will get pushed down to each child.
219+
child_filter_descriptions: Vec<ChildFilterDescription>,
200220
}
201221

202222
impl FilterDescription {
203223
pub fn new_with_child_count(num_children: usize) -> Self {
204224
Self {
205-
num_children,
206-
parent_filters: Vec::with_capacity(num_children),
207-
self_filters: vec![vec![]; num_children],
225+
child_filter_descriptions: vec![ChildFilterDescription::new(); num_children],
208226
}
209227
}
210228

229+
pub fn parent_filters(&self) -> Vec<PredicateSupports> {
230+
self.child_filter_descriptions
231+
.iter()
232+
.map(|d| &d.parent_filters)
233+
.cloned()
234+
.collect()
235+
}
236+
237+
pub fn self_filters(&self) -> Vec<Vec<Arc<dyn PhysicalExpr>>> {
238+
self.child_filter_descriptions
239+
.iter()
240+
.map(|d| &d.self_filters)
241+
.cloned()
242+
.collect()
243+
}
244+
211245
/// Mark all parent filters as supported for all children.
212246
/// This is the case if the node allows filters to be pushed down through it
213247
/// without any modification.
@@ -219,15 +253,14 @@ impl FilterDescription {
219253
///
220254
/// [`RepartitionExec`]: crate::repartition::RepartitionExec
221255
pub fn all_parent_filters_supported(
222-
self,
256+
mut self,
223257
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
224258
) -> Self {
225259
let supported = PredicateSupports::all_supported(parent_filters);
226-
Self {
227-
num_children: self.num_children,
228-
parent_filters: vec![supported; self.num_children],
229-
self_filters: self.self_filters,
260+
for child in &mut self.child_filter_descriptions {
261+
child.parent_filters = supported.clone();
230262
}
263+
self
231264
}
232265

233266
/// Mark all parent filters as unsupported for all children.
@@ -240,15 +273,14 @@ impl FilterDescription {
240273
///
241274
/// [`ExecutionPlan`]: crate::ExecutionPlan
242275
pub fn all_parent_filters_unsupported(
243-
self,
276+
mut self,
244277
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
245278
) -> Self {
246279
let unsupported = PredicateSupports::all_unsupported(parent_filters);
247-
Self {
248-
num_children: self.num_children,
249-
parent_filters: vec![unsupported; self.num_children],
250-
self_filters: self.self_filters,
280+
for child in &mut self.child_filter_descriptions {
281+
child.parent_filters = unsupported.clone();
251282
}
283+
self
252284
}
253285

254286
/// Add a filter generated / owned by the current node to be pushed down to all children.
@@ -260,7 +292,9 @@ impl FilterDescription {
260292
/// - `TopK` uses this to push down a single filter to all children, it can use this method.
261293
/// - `HashJoinExec` pushes down a filter only to the probe side, it cannot use this method.
262294
pub fn with_self_filter(mut self, predicate: Arc<dyn PhysicalExpr>) -> Self {
263-
self.self_filters = vec![vec![predicate]; self.num_children];
295+
for child in &mut self.child_filter_descriptions {
296+
child.self_filters = vec![Arc::clone(&predicate)];
297+
}
264298
self
265299
}
266300
}

0 commit comments

Comments
 (0)