-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Skip re-pruning based on partition values and file level stats if there are no dynamic filters #16424
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
@@ -524,6 +512,91 @@ fn should_enable_page_index( | |||
.unwrap_or(false) | |||
} | |||
|
|||
/// Prune based on partition values and file-level statistics. | |||
pub struct FilePruner { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made this pub
as I think it could be useful for other data sources.
I do think we should move this + PruningPredicate
stuff into a datafusion-pruning
create or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. It's time to put all the pruning logic to a place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall I make a datafusion-pruning
crate? I guess it will have all of the same deps as datasource-parquet
sans the parquet specific bits.
let pruning_predicate = build_pruning_predicate( | ||
Arc::clone(&self.predicate), | ||
&self.pruning_schema, | ||
&self.predicate_creation_errors, | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unfortunate we need to re-do this work every iteration.
I wonder if we should call snapshot_physical_expr
manually here are keep track of "is the new expression different than last iteration, if not skip the rest of the work" 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option would be to add a generation
to dynamic filters which gets bumped up by 1 every time they get updated. Then it would be super cheap to check if a filter has been updated. But we'd have to come up with APIs for that, put it on PhysicalExpr
(what happens if there are multiple child dynamic filters with different generations...?), etc.
It seems to me that given that if there is a perf tradeoff it's only for some cases with dynamic filters so it should be okay to proceed as is for now and worry about that as a later optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it happens once per file, right?
If so I agree that doing it as a follow on optimization sounds good.
However, I recommend we file a ticket while this is all in our heads / we have the full context otherwise we'll forget what to do
cc @alamb I think this resolves the concern about perf overhead of this late pruning when there are no dynamic filters; it's a tossup of what happens when there are dynamic filters, in the case of a topk with large files it's clearly a win, but there could obviously be cases where the additional checks are more overhead if they don't result in early termination of the streams |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @adriangb -- I like the FilePruner and not pruning if there are no dynamic filters.
I am not sure about trying to prune on each batch -- it does seem like it has the potential to stop reading from certain files earlier, but I worry that the overhead is too high
I'll fire off some benchmarks and see what we can see
} | ||
|
||
impl FilePruner { | ||
pub fn new_opt( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we document under what circumstances it returns None
? I think it is when there are no dynamic predicates
It would also be good to document why it would return None
(in this case because we assume that files have already been pruned using any non-dynamic predicates so additional pruning may happen ONLY when new dynamic predicates are available??)
let pruning_predicate = build_pruning_predicate( | ||
Arc::clone(&self.predicate), | ||
&self.pruning_schema, | ||
&self.predicate_creation_errors, | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it happens once per file, right?
If so I agree that doing it as a follow on optimization sounds good.
However, I recommend we file a ticket while this is all in our heads / we have the full context otherwise we'll forget what to do
/// Prune based on partition values and file-level statistics. | ||
pub struct FilePruner { | ||
predicate: Arc<dyn PhysicalExpr>, | ||
pruning_schema: Arc<Schema>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we maybe add some comments about what a pruning_schema
is? And how it relates to partition_fields
}) | ||
.take_while(move |_| { | ||
if let Some(file_pruner) = file_pruner.as_ref() { | ||
match file_pruner.should_prune() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is basically applying the filter on each record batch, right?
I think once we can actually push the filters into the parquet scan (which I realize I have been talking about for months...) this could become be entirely redundant
On the other hand, it also stops the input immediately if we find out the file should stop 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right the point is the stopping which the parquet pruning will not do!
🤖 |
Do we expect the benchmarks to show anything? I don't think they're using dynamic filters right? Maybe we need to merge #15770 and then we can benchmark this? |
🤖: Benchmark completed Details
|
🤖 |
🤖: Benchmark completed Details
|
I want to make sure the overhead of checking the predicates on each incoming batch didn't slow things down |
If you check the code that only happens if there are dynamic filters. And since there are non right now it becomes just a The only way to actually verify will be to merge #15770 and then compare this PR to main. |
@adriangb I'll review tomorrow, today have some other things |
@alamb sorry for the ping but would you mind running |
LOL I need to make a webpage (or give you access to the sever to queue the jobs yourself) |
🤖 |
I was reading that Arrow has requested AWS credits https://lists.apache.org/thread/q33oofy2v3zpg9s9l8o0w68rmjr3ocsv . Perhaps we can utilize one of those for that use case. |
🤖: Benchmark completed Details
|
I tried to ask GCS for credits... they didn't seem excited and ultimately came up with nothing. |
Interesting results. I'm inclined to believe that the speedups and slowdowns are both real. We'll have to think about this a bit more. |
3d6a97a
to
ebe4196
Compare
@Dandandan @alamb I pushed ebe4196 which adds a very cheap way to track changes to a PhysicalExpr if it's dynamic. I think this will be useful in several places but immediately it gives us the ability to check if the dynamic predicate has been updated before doing the work of re-calculating the pruning predicate, etc. I'm still not sure it will be cheap enough, but I think it's worth a shot if we can re-run the benches. It'll be a shame if we can't figure this out, I think if we are able to get this working it mostly negates the unfortunate situation right now that if you have a TopK it might be faster with less parallelism / partitioning upfront. With this change you still open the files but are able to quickly bail out as opposed to having to stream the whole thing. |
I think this will require @Dandandan 's suggestion of only updating the filters if the new ones are more selective: #16433. Right now since we always update the filters -> it always bumps the generation -> we always re-check. |
@alamb I reverted the filtering during the stream so this should now do strictly less work 😄 |
#16014 (comment)