Skip to content

TopK dynamic filter pushdown attempt 2 #15770

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Jun 17, 2025

Conversation

adriangb
Copy link
Contributor

@adriangb adriangb commented Apr 18, 2025

@github-actions github-actions bot added physical-expr Changes to the physical-expr crates optimizer Optimizer rules core Core DataFusion crate common Related to common crate datasource Changes to the datasource crate labels Apr 18, 2025
@@ -382,7 +383,7 @@ impl PhysicalOptimizerRule for PushdownFilter {

context
.transform_up(|node| {
if node.plan.as_any().downcast_ref::<FilterExec>().is_some() {
if node.plan.as_any().downcast_ref::<FilterExec>().is_some() || node.plan.as_any().downcast_ref::<SortExec>().is_some() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@berkaysynnada I didn't notice this in the original PR. This seems problematic. IMO doing downcast matching here is a smell that the API needs changing. It limits implementations to a hardcoded list of plans, which defeats the purpose of making DataFusion pluggable / having a dyn ExecutionPlan. The original implementation didn't require this. I think this goes hand-in hand with the revisit parameter. It seems that you were able to get from 3 methods down to 2 by replacing one of them with this downcast matching and the other with the extra recursion via the revisit parameter. It would be great to iterate on this and find a way to avoid the downcast matching.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right. We can run this pushdown logic on every operator actually, but then it will work in worst-time complexity always. I've shared the solution of removing revisit parameter, and let me open an issue for that. I strongly believe it will be taken and implemented in short time by some people.

To remove these downcasts, I think we can either introduce a new method to the API just returning a boolean saying that "this operator might introduce a filter or not", or try to understand that by the existing API's, maybe with some refactor. Do you have an idea for the latter?

Copy link
Contributor Author

@adriangb adriangb Apr 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose an API something like this:

trait ExecutionPlan {
    fn gather_filters_for_pushdown(
        &self,
        parent_filters: &[Arc<dyn ExecutionPlan>],
    ) -> Result<FilterPushdownPlan> {
        let unsupported = vec![FilterPushdownSupport::Unsupported; parent_filters.len()];
        Ok(
            FilterPushdownPlan {
                parent_filters_for_children: vec![unsupported; self.children().len()],
                self_filters_for_children: vec![vec![]; self.children().len()],
            },
        )
    }

    fn propagate_filter_pushdown(
        &self,
        parent_pushdown_result: Vec<FilterPushdowChildResult>,
        _self_filter_pushdown_result: Vec<FilterPushdowChildResult>,
    ) -> Result<FilterPushdownPropagation> {
        Ok(
            FilterPushdownPropagation {
                parent_filter_result: parent_pushdown_result,
                new_node: None,
            },
        )
    }
}

pub struct FilterPushdownPropagation {
    parent_filter_result: Vec<FilterPushdowChildResult>,
    new_node: Option<Arc<dyn ExecutionPlan>>,
}


#[derive(Debug, Clone, Copy)]
pub enum FilterPushdowChildResult {
    Supported,
    Unsupported,
}

impl FilterPushdowChildResult {

}

#[derive(Debug, Clone)]
pub enum FilterPushdownSupport {
    Supported(Arc<dyn PhysicalExpr>),
    Unsupported,
}

#[derive(Debug, Clone)]
pub struct FilterPushdownPlan {
    parent_filters_for_children: Vec<Vec<FilterPushdownSupport>>,
    self_filters_for_children: Vec<Vec<FilterPushdownSupport>>,
}

The optimizer rule will have to do a bit of bookeeping and slicing correctly but this should avoid the need for any downcast matching or retry and minimize clones of plans. And it should do one walk down and up regardless of what ends up happening with the filters.

Copy link
Contributor Author

@adriangb adriangb Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs fixing of some failing tests, cleanup of the plethora of helper methods I added and a lot of docs but here's the idea: #15801. The points are:

  • No downcast matching / hardcoding of implementations
  • Only recurses once / no retrying
  • Does no cloning / copying for branches that have no changes
  • Doesn't insert new operators

@adriangb
Copy link
Contributor Author

Pausing this until #15769 is done

@adriangb
Copy link
Contributor Author

Pausing this until #15769 is done

I was able to unblock by wiring up to TestDataSource

@github-actions github-actions bot removed the datasource Changes to the datasource crate label Apr 19, 2025
Comment on lines 1224 to 1227
let mut new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0]))
.with_fetch(self.fetch)
.with_preserve_partitioning(self.preserve_partitioning);
new_sort.filter = Arc::clone(&self.filter);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed this for a while and spent an hour trying to figure out why my test was failing. IMO we should have a test that enforces the invariant that ExecutionPlan::with_new_children(Arc::clone(&node), node.children()) == node

@@ -22,7 +22,7 @@ mod binary;
mod case;
mod cast;
mod column;
mod dynamic_filters;
pub mod dynamic_filters;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bit has me tripped up. I'm not sure where the right place to put dynamic_filters is such that it's public for our internal use in operators but private from the outside world 🤔

@adriangb
Copy link
Contributor Author

@Dandandan I believe with this setup we should be able to achieve with a couple LOC in insert_batch:

// Apply the filter to the batch before processing
let filter = Arc::clone(&self.filter) as Arc<dyn PhysicalExpr>;
let batch = filter_and_project(&batch, &filter, None, batch.schema_ref())?;
if batch.num_rows() == 0 {
    return Ok(());
}

(filter_and_project is from FilterExec, we just need to make it pub(crate))

@Dandandan
Copy link
Contributor

Dandandan commented Apr 19, 2025

@Dandandan I believe with this setup we should be able to achieve with a couple LOC in insert_batch:

// Apply the filter to the batch before processing

@Dandandan I believe with this setup we should be able to achieve with a couple LOC in insert_batch:

// Apply the filter to the batch before processing
let filter = Arc::clone(&self.filter) as Arc<dyn PhysicalExpr>;
let batch = filter_and_project(&batch, &filter, None, batch.schema_ref())?;
if batch.num_rows() == 0 {
    return Ok(());
}

(filter_and_project is from FilterExec, we just need to make it pub(crate))

I think we probably want to avoid filtering the entire batch, but indeed, if the filter expression is available it will be only a couple LOC!

@github-actions github-actions bot added documentation Improvements or additions to documentation sqllogictest SQL Logic Tests (.slt) labels Apr 19, 2025
"
);

// Actually apply the optimization to the plan
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recognize these diverge a bit from other tests, happy to move them somewhere better....

@adriangb adriangb marked this pull request as ready for review April 20, 2025 00:48
@adriangb
Copy link
Contributor Author

Marking as ready for review despite not having any numbers to substantiate performance improvement (because we need #15769) given that algorithmically and from experience in the previous PR we know this is a big win it might be okay to merge without interlocking PRs.

@berkaysynnada
Copy link
Contributor

@adriangb I'll complete reviewing this after merging other open PR's.

@adriangb adriangb force-pushed the topk-dynamic-filters branch from 6ec4de1 to b3431ab Compare May 5, 2025 17:19
@github-actions github-actions bot added datasource Changes to the datasource crate and removed optimizer Optimizer rules labels May 5, 2025
@adriangb
Copy link
Contributor Author

adriangb commented May 5, 2025

@adriangb I'll complete reviewing this after merging other open PR's.

Thanks for all of the reviews @berkaysynnada. This one is now ready again.

@adriangb
Copy link
Contributor Author

adriangb commented May 5, 2025

I think some tweaks will be needed based on https://github.com/apache/datafusion/pull/15769/files#r2074207291

@adriangb adriangb force-pushed the topk-dynamic-filters branch from bdc341c to 73b800a Compare May 6, 2025 14:12
@github-actions github-actions bot added optimizer Optimizer rules and removed datasource Changes to the datasource crate labels May 6, 2025
@alamb
Copy link
Contributor

alamb commented Jun 16, 2025

I queued up some benchmarks

Looking at naming now

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great to me -- I have some small comment suggestions, etc. But I also think we can merge this PR as is and do the suggestions as a follow on too

Really nice 🦾

@@ -614,6 +614,13 @@ config_namespace! {
/// during aggregations, if possible
pub enable_topk_aggregation: bool, default = true

/// When set to true attempts to push down dynamic filters generated by operators into the file scan phase.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the idea be to prune hash table state, for example, if we knew some of the groups were no longer needed?

I do think implementing more "late materialization" (aka turn on filter_pushdown) will help too

fn new(phase: FilterPushdownPhase) -> Self {
Self {
phase,
name: format!("FilterPushdown({phase})"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like FilterPushdown and FilterPushdown(Dynamic)

@@ -131,6 +131,8 @@ impl PhysicalOptimizer {
// replacing operators with fetching variants, or adding limits
// past operators that support limit pushdown.
Arc::new(LimitPushdown::new()),
// This FilterPushdown handles dynamic filters that may have references to the source ExecutionPlan
Arc::new(FilterPushdown::new_post_optimization()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is so much nicer than adding it to EnforceSorting ❤️

But shouldn't this be the final pass? (maybe right before SanityCheckPlan?)

As I understand it, this filter pushdown pass has to be run after any pass that modifies the structure of the plan and ProjectionPushdown may actually do that 🤔

It also think it would be good to add a comment here explaining that FilterPushdown::new_post_optimization() must be run after all passes that change the structure of the plan as it can generate pointers from one plan to another

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will move it lower and add a comment, with a reference to the enum with larger docs.

/// but subsequent optimizations may also rewrite the plan tree drastically, thus it is *not guaranteed* that a [`PhysicalExpr`] can hold on to a reference to the plan tree.
/// During this phase static filters (such as `col = 1`) are pushed down.
/// - [`FilterPushdownPhase::Post`]: Filters get pushed down after most other optimizations are applied.
/// At this stage the plan tree is expected to be stable and not change drastically, and operators that do filter pushdown during this phase should also not change the plan tree.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the requirement is that the plan nodes don't change. Given that DynamicFilters effectively can have pointers to existing ExecutionPlan instances if a pass changes / removes / rewrites an ExecutionPlan that added a DynamicFilter I am not sure what will happen 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sort of. I am mincing my words in the comments because the reality is that to push down filters into DataSourceExec a new DataSourceExec has to be created and the whole tree has to be replaced "in place" to reference the new children. But the structure of the plan does not change, and it's pretty much guaranteed that ExecutionPlan::new_with_children does the right thing in terms of preserving internal state that might be pointed to (unlike EnforceSorting).

I'm not sure how to detail that in a comment, it's somewhat confusing.

@@ -548,10 +563,22 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// This can be used alongside [`FilterPushdownPropagation::with_filters`] and [`FilterPushdownPropagation::with_updated_node`]
/// to dynamically build a result with a mix of supported and unsupported filters.
///
/// There are two different phases in filter pushdown, which some operators may handle the same and some differently.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I love documentation, I would personally suggest not duplicating the docs here as duplicates can get out of sync, and instead leave a link to FilterPushdownPhase and focus on getting that documentation to be as clear as possible

Pre,
/// Pushdown that happens after most other optimizations.
/// This pushdown allows filters that reference an [`ExecutionPlan`] to be pushed down.
/// It is guaranteed that subsequent optimizations will not make large changes to the plan tree,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aa above, I think it woudl be good to make it more precise what "large changes to the plan tree" means (basically I think it means don't remove existing ExecutionPlans ? 🤔 )

@adriangb
Copy link
Contributor Author

Thank you Andrew! I will do the renames, docs edits, etc., push those tonight and we can merge this tomorrow evening if there is no more feedback.

@alamb
Copy link
Contributor

alamb commented Jun 16, 2025

🤖 ./gh_compare_branch.sh Benchmark Script Running
Linux aal-dev 6.11.0-1015-gcp #15~24.04.1-Ubuntu SMP Thu Apr 24 20:41:05 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Comparing topk-dynamic-filters (cd56084) to dd936cb diff
Benchmarks: tpch_mem clickbench_partitioned clickbench_extended
Results will be posted here when complete

@alamb
Copy link
Contributor

alamb commented Jun 16, 2025

🤖: Benchmark completed

Details

Comparing HEAD and topk-dynamic-filters
--------------------
Benchmark clickbench_extended.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query        ┃        HEAD ┃ topk-dynamic-filters ┃    Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ QQuery 0     │  1932.37 ms │           1879.32 ms │ no change │
│ QQuery 1     │   700.00 ms │            708.26 ms │ no change │
│ QQuery 2     │  1327.98 ms │           1357.73 ms │ no change │
│ QQuery 3     │   675.15 ms │            663.31 ms │ no change │
│ QQuery 4     │  1358.70 ms │           1371.20 ms │ no change │
│ QQuery 5     │ 14941.43 ms │          15114.86 ms │ no change │
│ QQuery 6     │  2003.62 ms │           1988.73 ms │ no change │
│ QQuery 7     │  1974.89 ms │           1889.14 ms │ no change │
│ QQuery 8     │   804.64 ms │            796.40 ms │ no change │
└──────────────┴─────────────┴──────────────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                   ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)                   │ 25718.78ms │
│ Total Time (topk-dynamic-filters)   │ 25768.95ms │
│ Average Time (HEAD)                 │  2857.64ms │
│ Average Time (topk-dynamic-filters) │  2863.22ms │
│ Queries Faster                      │          0 │
│ Queries Slower                      │          0 │
│ Queries with No Change              │          9 │
│ Queries with Failure                │          0 │
└─────────────────────────────────────┴────────────┘
--------------------
Benchmark clickbench_partitioned.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃        HEAD ┃ topk-dynamic-filters ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │    15.60 ms │             15.34 ms │     no change │
│ QQuery 1     │    33.50 ms │             32.81 ms │     no change │
│ QQuery 2     │    80.76 ms │             80.51 ms │     no change │
│ QQuery 3     │    99.67 ms │             95.83 ms │     no change │
│ QQuery 4     │   628.76 ms │            577.08 ms │ +1.09x faster │
│ QQuery 5     │   871.37 ms │            852.85 ms │     no change │
│ QQuery 6     │    24.15 ms │             23.11 ms │     no change │
│ QQuery 7     │    35.78 ms │             35.47 ms │     no change │
│ QQuery 8     │   868.37 ms │            880.50 ms │     no change │
│ QQuery 9     │  1149.06 ms │           1167.61 ms │     no change │
│ QQuery 10    │   254.37 ms │            252.96 ms │     no change │
│ QQuery 11    │   285.04 ms │            280.14 ms │     no change │
│ QQuery 12    │   867.64 ms │            856.84 ms │     no change │
│ QQuery 13    │  1276.06 ms │           1264.78 ms │     no change │
│ QQuery 14    │   812.54 ms │            791.25 ms │     no change │
│ QQuery 15    │   767.98 ms │            778.42 ms │     no change │
│ QQuery 16    │  1628.17 ms │           1592.90 ms │     no change │
│ QQuery 17    │  1597.69 ms │           1583.41 ms │     no change │
│ QQuery 18    │  2883.08 ms │           2848.60 ms │     no change │
│ QQuery 19    │    83.29 ms │             85.79 ms │     no change │
│ QQuery 20    │  1102.75 ms │           1125.53 ms │     no change │
│ QQuery 21    │  1243.71 ms │           1264.80 ms │     no change │
│ QQuery 22    │  2065.35 ms │           2062.86 ms │     no change │
│ QQuery 23    │  7575.06 ms │           7234.54 ms │     no change │
│ QQuery 24    │   445.22 ms │            429.24 ms │     no change │
│ QQuery 25    │   374.18 ms │            300.39 ms │ +1.25x faster │
│ QQuery 26    │   510.11 ms │            424.15 ms │ +1.20x faster │
│ QQuery 27    │  1514.36 ms │           1528.90 ms │     no change │
│ QQuery 28    │ 11858.75 ms │          12005.25 ms │     no change │
│ QQuery 29    │   532.06 ms │            537.76 ms │     no change │
│ QQuery 30    │   760.11 ms │            750.64 ms │     no change │
│ QQuery 31    │   793.67 ms │            786.40 ms │     no change │
│ QQuery 32    │  2473.13 ms │           2403.92 ms │     no change │
│ QQuery 33    │  3116.67 ms │           3104.59 ms │     no change │
│ QQuery 34    │  3141.07 ms │           3148.95 ms │     no change │
│ QQuery 35    │  1238.65 ms │           1195.36 ms │     no change │
│ QQuery 36    │   124.11 ms │            120.55 ms │     no change │
│ QQuery 37    │    57.10 ms │             55.05 ms │     no change │
│ QQuery 38    │   118.93 ms │            120.48 ms │     no change │
│ QQuery 39    │   189.95 ms │            195.15 ms │     no change │
│ QQuery 40    │    46.63 ms │             47.38 ms │     no change │
│ QQuery 41    │    44.56 ms │             43.14 ms │     no change │
│ QQuery 42    │    39.37 ms │             37.98 ms │     no change │
└──────────────┴─────────────┴──────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                   ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)                   │ 53628.39ms │
│ Total Time (topk-dynamic-filters)   │ 53019.21ms │
│ Average Time (HEAD)                 │  1247.17ms │
│ Average Time (topk-dynamic-filters) │  1233.00ms │
│ Queries Faster                      │          3 │
│ Queries Slower                      │          0 │
│ Queries with No Change              │         40 │
│ Queries with Failure                │          0 │
└─────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_mem_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query        ┃      HEAD ┃ topk-dynamic-filters ┃    Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ QQuery 1     │ 101.86 ms │             98.87 ms │ no change │
│ QQuery 2     │  20.65 ms │             21.20 ms │ no change │
│ QQuery 3     │  32.27 ms │             32.46 ms │ no change │
│ QQuery 4     │  18.87 ms │             18.37 ms │ no change │
│ QQuery 5     │  50.65 ms │             49.11 ms │ no change │
│ QQuery 6     │  11.98 ms │             11.87 ms │ no change │
│ QQuery 7     │  85.68 ms │             86.77 ms │ no change │
│ QQuery 8     │  25.13 ms │             24.62 ms │ no change │
│ QQuery 9     │  54.08 ms │             53.86 ms │ no change │
│ QQuery 10    │  43.18 ms │             43.63 ms │ no change │
│ QQuery 11    │  11.47 ms │             11.92 ms │ no change │
│ QQuery 12    │  34.72 ms │             34.90 ms │ no change │
│ QQuery 13    │  25.32 ms │             25.76 ms │ no change │
│ QQuery 14    │   9.91 ms │              9.83 ms │ no change │
│ QQuery 15    │  19.76 ms │             19.29 ms │ no change │
│ QQuery 16    │  18.61 ms │             19.02 ms │ no change │
│ QQuery 17    │  95.21 ms │             95.70 ms │ no change │
│ QQuery 18    │ 188.48 ms │            196.82 ms │ no change │
│ QQuery 19    │  25.87 ms │             25.89 ms │ no change │
│ QQuery 20    │  32.11 ms │             32.11 ms │ no change │
│ QQuery 21    │ 148.35 ms │            149.18 ms │ no change │
│ QQuery 22    │  14.86 ms │             15.06 ms │ no change │
└──────────────┴───────────┴──────────────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                   ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (HEAD)                   │ 1069.03ms │
│ Total Time (topk-dynamic-filters)   │ 1076.26ms │
│ Average Time (HEAD)                 │   48.59ms │
│ Average Time (topk-dynamic-filters) │   48.92ms │
│ Queries Faster                      │         0 │
│ Queries Slower                      │         0 │
│ Queries with No Change              │        22 │
│ Queries with Failure                │         0 │
└─────────────────────────────────────┴───────────┘

@alamb
Copy link
Contributor

alamb commented Jun 16, 2025

🤖 ./gh_compare_branch.sh Benchmark Script Running
Linux aal-dev 6.11.0-1015-gcp #15~24.04.1-Ubuntu SMP Thu Apr 24 20:41:05 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Comparing topk-dynamic-filters (cd56084) to dd936cb diff
Benchmarks: sort_tpch
Results will be posted here when complete

@alamb
Copy link
Contributor

alamb commented Jun 16, 2025

🤖: Benchmark completed

Details

Comparing HEAD and topk-dynamic-filters
--------------------
Benchmark sort_tpch.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       HEAD ┃ topk-dynamic-filters ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1           │  322.61 ms │            336.40 ms │     no change │
│ Q2           │  323.67 ms │            279.65 ms │ +1.16x faster │
│ Q3           │ 1175.20 ms │           1152.85 ms │     no change │
│ Q4           │  415.50 ms │            414.71 ms │     no change │
│ Q5           │  419.42 ms │            429.43 ms │     no change │
│ Q6           │  460.61 ms │            462.88 ms │     no change │
│ Q7           │  939.97 ms │            928.19 ms │     no change │
│ Q8           │  793.68 ms │            791.83 ms │     no change │
│ Q9           │  830.90 ms │            832.78 ms │     no change │
│ Q10          │ 1219.46 ms │           1237.03 ms │     no change │
│ Q11          │  718.13 ms │            737.31 ms │     no change │
└──────────────┴────────────┴──────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                   ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (HEAD)                   │ 7619.16ms │
│ Total Time (topk-dynamic-filters)   │ 7603.05ms │
│ Average Time (HEAD)                 │  692.65ms │
│ Average Time (topk-dynamic-filters) │  691.19ms │
│ Queries Faster                      │         1 │
│ Queries Slower                      │         0 │
│ Queries with No Change              │        10 │
│ Queries with Failure                │         0 │
└─────────────────────────────────────┴───────────┘

@adriangb
Copy link
Contributor Author

@alamb I think we're ready to merge this and keep chipping away in #16424 and other spots right?

@Dandandan
Copy link
Contributor

Dandandan commented Jun 17, 2025

🤖: Benchmark completed

Details

🤖: Benchmark completed

Details

could you maybe confirm the topk benchmark results @alamb ? topk_tpch?

@alamb
Copy link
Contributor

alamb commented Jun 17, 2025

🤖: Benchmark completed
Details

🤖: Benchmark completed
Details

could you maybe confirm the topk benchmark results @alamb ? topk_tpch?

will do

@alamb
Copy link
Contributor

alamb commented Jun 17, 2025

🤖 ./gh_compare_branch.sh Benchmark Script Running
Linux aal-dev 6.11.0-1015-gcp #15~24.04.1-Ubuntu SMP Thu Apr 24 20:41:05 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Comparing topk-dynamic-filters (8e88bd9) to dd936cb diff
Benchmarks: topk_tpch
Results will be posted here when complete

@alamb
Copy link
Contributor

alamb commented Jun 17, 2025

🤖: Benchmark completed

Details

Comparing HEAD and topk-dynamic-filters
--------------------
Benchmark run_topk_tpch.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃      HEAD ┃ topk-dynamic-filters ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1           │  41.78 ms │             28.62 ms │ +1.46x faster │
│ Q2           │  40.50 ms │             32.33 ms │ +1.25x faster │
│ Q3           │ 144.67 ms │            101.83 ms │ +1.42x faster │
│ Q4           │  43.48 ms │             36.12 ms │ +1.20x faster │
│ Q5           │  31.85 ms │             26.77 ms │ +1.19x faster │
│ Q6           │  54.85 ms │             47.29 ms │ +1.16x faster │
│ Q7           │ 129.61 ms │            138.38 ms │  1.07x slower │
│ Q8           │ 121.74 ms │             76.63 ms │ +1.59x faster │
│ Q9           │ 158.13 ms │            114.49 ms │ +1.38x faster │
│ Q10          │ 215.83 ms │            169.16 ms │ +1.28x faster │
│ Q11          │ 121.35 ms │             83.83 ms │ +1.45x faster │
└──────────────┴───────────┴──────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                   ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (HEAD)                   │ 1103.78ms │
│ Total Time (topk-dynamic-filters)   │  855.46ms │
│ Average Time (HEAD)                 │  100.34ms │
│ Average Time (topk-dynamic-filters) │   77.77ms │
│ Queries Faster                      │        10 │
│ Queries Slower                      │         1 │
│ Queries with No Change              │         0 │
│ Queries with Failure                │         0 │
└─────────────────────────────────────┴───────────┘

@adriangb
Copy link
Contributor Author

🤖: Benchmark completed

Details

Very nice improvement even without filter pushdown!

I'm going to merge this in the next couple of hours if there is no more feedback 😄

@Dandandan
Copy link
Contributor

🤖: Benchmark completed
Details

Very nice improvement even without filter pushdown!

I'm going to merge this in the next couple of hours if there is no more feedback 😄

This is super nice.
I think the speed up changes a bit due to more partitions as you shared earlier. But it's starting to look really nice 🚀

@adriangb
Copy link
Contributor Author

I think the speed up changes a bit due to more partitions

And maybe #16424 will speed up the wide partitions case by stopping those scans early!

@Dandandan
Copy link
Contributor

I'll also run some profiling on those topk benchmarks to see if there is any further low hanging fruit.

@Dandandan
Copy link
Contributor

image Well, it looks like for these benchmarks 95% is now spent on just scanning the data and only 5% elsewhere, so I guess we need to focus there :D

@adriangb adriangb merged commit 1429c92 into apache:main Jun 17, 2025
29 checks passed
@Dandandan
Copy link
Contributor

Dandandan commented Jun 17, 2025

Hm @adriangb another thing I wondered is update_filter does seem to take only the heap of the current partition into account, as in TopK (currently at least) each partition has it's own heap (of k items).

Perhaps we can compare against the current filter and only update the expression if it is greater / more selective?

@adriangb
Copy link
Contributor Author

adriangb commented Jun 17, 2025

Perhaps we can compare against the current filter and only update the expression if it is greater / more selective?

Yeah I think that would be good.
For context (I had to remember for a sec): the filter itself is shared across partitions.

@alamb
Copy link
Contributor

alamb commented Jun 17, 2025

woohoo!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate datasource Changes to the datasource crate documentation Improvements or additions to documentation optimizer Optimizer rules physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Optimize TopK with filter Dynamic pruning filters from TopK state (optimize ORDER BY LIMIT queries)
4 participants