Skip to content

Commit 4607643

Browse files
committed
add implementations
1 parent 9f92250 commit 4607643

File tree

2 files changed

+173
-3
lines changed

2 files changed

+173
-3
lines changed

datafusion/datasource-parquet/src/opener.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,12 @@ impl FileOpener for ParquetOpener {
100100
&predicate_creation_errors,
101101
);
102102
if let Some(pruning_predicate) = pruning_predicate {
103-
let pruning_stats = PrunableStatistics::new(
103+
let stats_pruning = PrunableStatistics::new(
104104
vec![Arc::clone(stats)],
105105
Arc::clone(&self.table_schema),
106106
);
107-
match pruning_predicate.prune(&pruning_stats) {
107+
// TODO: add partition pruning. Need to pipe in the partition columns.
108+
match pruning_predicate.prune(&stats_pruning) {
108109
Ok(values) => {
109110
// We expect a single container -> if all containers are false skip this file
110111
if values.into_iter().all(|v| !v) {

datafusion/physical-optimizer/src/pruning.rs

Lines changed: 170 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -996,10 +996,113 @@ fn build_statistics_record_batch<S: PruningStatistics>(
996996
})
997997
}
998998

999+
pub struct PartitionPruningStatistics {
1000+
/// Values for each column for each container.
1001+
/// The outer vectors represent the columns while the inner
1002+
/// vectors represent the containers.
1003+
/// The order must match the order of the partition columns in
1004+
/// [`PartitionPruningStatistics::partition_schema`].
1005+
partition_values: Vec<Vec<ScalarValue>>,
1006+
/// The schema of the partition columns.
1007+
/// This must **not** be the schema of the entire file or table:
1008+
/// it must only be the schema of the partition columns,
1009+
/// in the same order as the values in [`PartitionPruningStatistics::partition_values`].
1010+
partition_schema: SchemaRef,
1011+
}
1012+
1013+
impl PartitionPruningStatistics {
1014+
/// Create a new instance of [`PartitionPruningStatistics`].
1015+
///
1016+
/// Args:
1017+
/// * `partition_values`: A vector of vectors of [`ScalarValue`]s.
1018+
/// The outer vector represents the containers while the inner
1019+
/// vector represents the partition values for each column.
1020+
/// Note that this is the **opposite** of the order of the
1021+
/// partition columns in [`PartitionPruningStatistics::partition_schema`].
1022+
/// * `partition_schema`: The schema of the partition columns.
1023+
/// This must **not** be the schema of the entire file or table:
1024+
/// instead it must only be the schema of the partition columns,
1025+
/// in the same order as the values in `partition_values`.
1026+
pub fn new(partition_values: Vec<Vec<ScalarValue>>, partition_schema: SchemaRef) -> Self {
1027+
let mut partition_valeus_by_column = vec![vec![]; partition_schema.fields().len()];
1028+
for partition_value in partition_values.iter() {
1029+
for (i, value) in partition_value.iter().enumerate() {
1030+
partition_valeus_by_column[i].push(value.clone());
1031+
}
1032+
}
1033+
Self {
1034+
partition_values: partition_valeus_by_column,
1035+
partition_schema,
1036+
}
1037+
}
1038+
}
1039+
1040+
impl PruningStatistics for PartitionPruningStatistics {
1041+
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
1042+
let index = self.partition_schema.index_of(column.name()).ok()?;
1043+
let partition_values = self.partition_values.get(index)?;
1044+
let mut values = Vec::with_capacity(self.partition_values.len());
1045+
for partition_value in partition_values {
1046+
match partition_value {
1047+
ScalarValue::Null => values.push(ScalarValue::Null),
1048+
_ => values.push(partition_value.clone()),
1049+
}
1050+
}
1051+
match ScalarValue::iter_to_array(values) {
1052+
Ok(array) => Some(array),
1053+
Err(_) => {
1054+
log::warn!(
1055+
"Failed to convert min values to array for column {}",
1056+
column.name()
1057+
);
1058+
None
1059+
}
1060+
}
1061+
}
1062+
1063+
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
1064+
self.min_values(column)
1065+
}
1066+
1067+
fn num_containers(&self) -> usize {
1068+
self.partition_values.len()
1069+
}
1070+
1071+
fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
1072+
None
1073+
}
1074+
1075+
fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
1076+
None
1077+
}
1078+
1079+
fn contained(
1080+
&self,
1081+
column: &Column,
1082+
values: &HashSet<ScalarValue>,
1083+
) -> Option<BooleanArray> {
1084+
let index = self.partition_schema.index_of(column.name()).ok()?;
1085+
let partition_values = self.partition_values.get(index)?;
1086+
let mut contained = Vec::with_capacity(self.partition_values.len());
1087+
for partition_value in partition_values {
1088+
let contained_value = if values.contains(partition_value) {
1089+
Some(true)
1090+
} else {
1091+
Some(false)
1092+
};
1093+
contained.push(contained_value);
1094+
}
1095+
let array = BooleanArray::from(contained);
1096+
Some(array)
1097+
}
1098+
}
1099+
9991100
/// Prune a set of containers represented by their statistics.
10001101
/// Each [`Statistics`] represents a container (e.g. a file or a partition of files).
10011102
pub struct PrunableStatistics {
1103+
/// Statistics for each container.
10021104
statistics: Vec<Arc<Statistics>>,
1105+
/// The schema of the file these statistics are for.
10031106
schema: SchemaRef,
10041107
}
10051108

@@ -1080,7 +1183,7 @@ impl PruningStatistics for PrunableStatistics {
10801183
/// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`],
10811184
/// and [`Self::row_counts`].
10821185
fn num_containers(&self) -> usize {
1083-
1
1186+
self.statistics.len()
10841187
}
10851188

10861189
/// Return the number of null values for the named column as an
@@ -1174,6 +1277,72 @@ impl PruningStatistics for PrunableStatistics {
11741277
}
11751278
}
11761279

1280+
pub struct CompositePruningStatistics {
1281+
pub statistics: Vec<Box<dyn PruningStatistics>>,
1282+
}
1283+
1284+
impl CompositePruningStatistics {
1285+
pub fn new(statistics: Vec<Box<dyn PruningStatistics>>) -> Self {
1286+
assert!(!statistics.is_empty());
1287+
Self { statistics }
1288+
}
1289+
}
1290+
1291+
impl PruningStatistics for CompositePruningStatistics {
1292+
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
1293+
for stats in &self.statistics {
1294+
if let Some(array) = stats.min_values(column) {
1295+
return Some(array);
1296+
}
1297+
}
1298+
None
1299+
}
1300+
1301+
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
1302+
for stats in &self.statistics {
1303+
if let Some(array) = stats.max_values(column) {
1304+
return Some(array);
1305+
}
1306+
}
1307+
None
1308+
}
1309+
1310+
fn num_containers(&self) -> usize {
1311+
self.statistics[0].num_containers()
1312+
}
1313+
1314+
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
1315+
for stats in &self.statistics {
1316+
if let Some(array) = stats.null_counts(column) {
1317+
return Some(array);
1318+
}
1319+
}
1320+
None
1321+
}
1322+
1323+
fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
1324+
for stats in &self.statistics {
1325+
if let Some(array) = stats.row_counts(column) {
1326+
return Some(array);
1327+
}
1328+
}
1329+
None
1330+
}
1331+
1332+
fn contained(
1333+
&self,
1334+
column: &Column,
1335+
values: &HashSet<ScalarValue>,
1336+
) -> Option<BooleanArray> {
1337+
for stats in &self.statistics {
1338+
if let Some(array) = stats.contained(column, values) {
1339+
return Some(array);
1340+
}
1341+
}
1342+
None
1343+
}
1344+
}
1345+
11771346
struct PruningExpressionBuilder<'a> {
11781347
column: phys_expr::Column,
11791348
column_expr: Arc<dyn PhysicalExpr>,

0 commit comments

Comments
 (0)