-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add late pruning of Parquet files based on file level statistics #16014
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
A couple of thoughts:
|
Yes this would be super nice -- the more we can do to consolidate statistics / pruning the better off the code will be I think. Right now it is kind of scattered in several places |
You can use something like https://docs.rs/futures/latest/futures/stream/fn.iter.html perhaps -- like |
@@ -367,7 +368,7 @@ impl Default for OnError { | |||
pub trait FileOpener: Unpin + Send + Sync { | |||
/// Asynchronously open the specified file and return a stream | |||
/// of [`RecordBatch`] | |||
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture>; | |||
fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result<FileOpenFuture>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it sufficient to provide only file statistics? PartitionedFile seems like an overkill to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe? But I feel like we have the partitioned file we might as well pass it in. Maybe we use it in the future to enable optimizations that use the partition values (eg late pruning based on partition values, including partition values in the scan so that more filters can be evaluated, etc)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using PartitionedFile as the "data we have at plan time" including statistics and potentially information about size, encryption, special indexes, etc makes a lot of sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe? But I feel like we have the partitioned file we might as well pass it in. Maybe we use it in the future to enable optimizations that use the partition values (eg late pruning based on partition values, including partition values in the scan so that more filters can be evaluated, etc)
I believe these can also be inferred from statistics in a more generalized fashion(don't know partition columns exist in column_statistics now) but not a big deal, we can keep this 👍🏻
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please update the documetnation for open() to mention that file
has plan time per-file information (such as statistics) and leave a doc link back?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea makes a lot sense. I've one implementation suggestion. Thanks again @adriangb
0e03bdc
to
94726cc
Compare
@alamb please review again I implemented and added a test 😄 |
(Some(stats), Some(predicate)) => { | ||
let pruning_predicate = build_pruning_predicate( | ||
Arc::clone(predicate), | ||
&self.table_schema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it use table_schema
here?
match (&file.statistics, &self.predicate) { | ||
(Some(stats), Some(predicate)) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that there is only one branch, I suggest using if let (Some(_), Some(_)) = xxx {}
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very cool -- I think this is very close
@@ -367,7 +368,7 @@ impl Default for OnError { | |||
pub trait FileOpener: Unpin + Send + Sync { | |||
/// Asynchronously open the specified file and return a stream | |||
/// of [`RecordBatch`] | |||
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture>; | |||
fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result<FileOpenFuture>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please update the documetnation for open() to mention that file
has plan time per-file information (such as statistics) and leave a doc link back?
} | ||
} | ||
|
||
/// Returns [`BooleanArray`] where each row represents information known |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment cna probably be trimmed with a link back to the original trait source
@@ -995,6 +996,184 @@ fn build_statistics_record_batch<S: PruningStatistics>( | |||
}) | |||
} | |||
|
|||
/// Prune a set of containers represented by their statistics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a nice structure -- I think it makes lots of sense and is 100%
Specifically, I thought there was already code that pruned individual files based on statistics but I cound not find any in LIstingTable (we have something like this in influxdb_iox
).
My opinion is if we are going to this code it into the DataFusion codebase we should
- Ensure that it helps a as many users as possble
- Make sure it is executed as much as possible (to ensure test coverage)
Thus, what do you think about using the PrunableStatistics to prune the FileGroup in ListingTable here:
?
Pruning on statistics during plan time would potentially be redundant with also trying to prune again during opening, but it would reduce the files earlier int he plan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about I bundle in the PartitionValues somehow and then we can re-use and compose that?
Specifically:
- TableProvider's use just the partition values
- ParquetOpener combines both
- Something else can use just the stats
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pruning on statistics during plan time would potentially be redundant with also trying to prune again during opening, but it would reduce the files earlier int he plan
Yeah I don't think it's redundant: you either prune or you don't. If we prune earlier the files don't make it this far. If we don't we may now be able to prune them. What's redundant is if there are no changes to the filters (i.e. no dynamic filters), but that sounds both hard to track and like a possible future optimization 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kk
/// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`], | ||
/// and [`Self::row_counts`]. | ||
fn num_containers(&self) -> usize { | ||
1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be self.statistics.len(), right?
@alamb I pushed 4607643 which adds some nice APIs for partition values. In particular I think it's important to have a way to prune based on partition values + file level statistics (#15935). However I can't implement it for |
Maybe it is time to make a |
FYI @xudong963 I think this is relevant to your work on statistics / partition pruning as well |
Seems reasonable to me. I guess it'd be at the same level as |
Moving to Next hurdle: at this point we've long lost information on the actual table schema / partition files.
I think any of these also sets us up to refactor how the partition filters actually get applied (i.e. we don't have to inject them in the @alamb any preference? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM, thank you
if let (Some(stats), Some(predicate)) = (&file.statistics, &self.predicate) { | ||
let pruning_predicate = build_pruning_predicate( | ||
Arc::clone(predicate), | ||
&self.table_schema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it reasonable to use table_schema
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the only schema we have. And it's not even really the table schema, the name is misleading for historical reasons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be better to add some notes about it. (I often confused when I reading the parquet part code, all kinds of schema, lol)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
datafusion/datafusion/datasource-parquet/src/opener.rs
Lines 182 to 185 in 4607643
// Note about schemas: we are actually dealing with **3 different schemas** here: | |
// - The table schema as defined by the TableProvider. This is what the user sees, what they get when they `SELECT * FROM table`, etc. | |
// - The "virtual" file schema: this is the table schema minus any hive partition columns and projections. This is what the file schema is coerced to. | |
// - The physical file schema: this is the schema as defined by the parquet file. This is what the parquet file actually contains. |
I think the next step here is to resolve #16014 (comment) In my mind it makes sense to both push down the information and continue to have the ability to do it after the scan. |
I think we should try and avoid moving everything to datafusion_common. Since the pruning stuff relies on PhysicalExpr I don't think we can directly put it in datafusion_common
I think this sounds like the most straightforward thing to me and the easiest way to get the required information Seems like Maybe we can do something like this (change to use a FieldRef rather than Field to avoid copies): pub struct PartitionedFile {
...
pub partition_values: Vec<ScalarValue>,
...
} to pub struct PartitionedFile {
...
pub partition_values: Vec<(FieldRef, ScalarValue)>,
...
} |
BTW the other thing I somewhat worry about reapplying pruning during file opening is that it is in the critical path and directly will add to the query latency. I wonder if there is some way to ensure we have hidden it behind IO if possible (aka make sure we are applying the extra pruning while the next file is opened rather than waiting to do it before starting that IO |
That sounds good to me. It kinda makes sense that if you're carrying around partition values you'd carry around info on what columns they belong to. Maybe it will help resolve #13270 as well in the future.
I think we can move it a couple lines lower into |
e8eb87f
to
cc120d0
Compare
@alamb @xudong963 I've pushed a change that:
|
My plan for this PR now is to first resolve blockers. In particular:
And then come back here and resolve the rest of the points of discussion. |
de0590c
to
d6e974c
Compare
d6e974c
to
7178a63
Compare
fields.extend(conf.table_partition_cols.iter().cloned().map(Arc::new)); | ||
fields.extend(conf.table_partition_cols.iter().cloned()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this may have just been clippy, but it's not a bad change!
I've rebased this and it's looking nice now. https://github.com/apache/datafusion/pull/16014/files#r2093515834 |
I'll fire up some benchmarks and see if we can see anything concerning |
🤖 |
🤖: Benchmark completed Details
|
Do any of those benchmarks actually collect statistics or use partition pruning? If not I do expect this to essentially be a no-op. |
|
It might just be that cheap 😃, I do expect it to be very cheap. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's merge this one in -- I think it looks pretty sweet and will make dynamic filtering that much more effectivel
One thing I was thinking was how to show off how good dynamic filtering is / tell people about it.
@adriangb what do you think about making a benchmark for using dynamic filtering? Perhaps we could take the clickbench dataset and rewrite it so it was partitioned by EventDate
(so each file had a distinct date).
Then I bet dynamic filters / file opener pruning would show a pretty big difference
I think that’d be great! But don’t we need #15770 first? I guess we can prototype on the merged commit in the meantime. But once we have that in I’ll work on benchmarks, blog posts, etc! |
Yes I think you are right. |
@alamb should we be running these checks for every batch? obviously that makes your concerns about overhead / performance much worse but I think it will have an even greater impact for dynamic filters: currently once the file is opened if midway through the stream the topk state becomes such that we could exclude the whole file we still stream every row from the file and exclude it via the predicate pushdown, despite the fact that we now know from the stats that we could immediately exit. I propose the following:
|
PartitionedFile
intoFileSource
for late file stats based pruning #16000