diff --git a/crates/core/src/expr/column_stats.rs b/crates/core/src/expr/column_stats.rs new file mode 100644 index 00000000..42de5a2c --- /dev/null +++ b/crates/core/src/expr/column_stats.rs @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! Column statistics for data skipping optimization. + +use crate::expr::filter::Filter; +use crate::expr::ExprOperator; +use crate::Result; +use parquet::file::metadata::ParquetMetaData; +use parquet::file::statistics::Statistics; +use std::collections::HashMap; + +/// Column statistics for a single column in a file. +/// +/// Contains min/max values that can be used to prune files +/// when evaluating filters. +#[derive(Debug, Clone, PartialEq)] +pub struct ColumnStats { + /// Column name + pub column_name: String, + + /// Minimum value in the column (as string for generic comparison) + pub min_value: Option, + + /// Maximum value in the column (as string for generic comparison) + pub max_value: Option, + + /// Number of null values + pub null_count: Option, + + /// Total number of values + pub value_count: Option, +} + +impl ColumnStats { + /// Creates column statistics. + pub fn new( + column_name: String, + min_value: Option, + max_value: Option, + null_count: Option, + value_count: Option, + ) -> Self { + Self { + column_name, + min_value, + max_value, + null_count, + value_count, + } + } + + /// Checks if this column's min/max range can satisfy a filter. + /// + /// Returns `None` if insufficient stats, `Some(true)` if might match, `Some(false)` to prune. + pub fn can_satisfy_filter(&self, filter: &Filter) -> Option { + if filter.field_name != self.column_name { + return None; + } + + let min = self.min_value.as_ref()?; + let max = self.max_value.as_ref()?; + let value = &filter.field_value; + + // Try numeric comparison first, fallback to string comparison + let (min_cmp, max_cmp, eq_cmp) = if let (Ok(min_num), Ok(max_num), Ok(val_num)) = + (min.parse::(), max.parse::(), value.parse::()) + { + ( + val_num > min_num, // value > min + val_num < max_num, // value < max + val_num >= min_num && val_num <= max_num, // value in [min, max] + ) + } else { + (value > min, value < max, value >= min && value <= max) + }; + + match filter.operator { + ExprOperator::Eq => Some(eq_cmp), + ExprOperator::Ne => None, // Unsupported: can't prove all values != target + ExprOperator::Lt => Some(min_cmp), + ExprOperator::Lte => Some(eq_cmp || min_cmp), + ExprOperator::Gt => Some(max_cmp), + ExprOperator::Gte => Some(eq_cmp || max_cmp), + } + } +} + +/// File-level column statistics for all columns. +/// +/// Used to evaluate filters against a file's statistics +/// to determine if the file should be read. +#[derive(Debug, Clone, Default)] +pub struct FileColumnStats { + /// Statistics for each column, keyed by column name + stats: HashMap, +} + +impl FileColumnStats { + /// Creates a new empty file column statistics. + pub fn new() -> Self { + Self { + stats: HashMap::new(), + } + } + + /// Adds column statistics for a column. + pub fn add_column_stats(&mut self, stats: ColumnStats) { + self.stats.insert(stats.column_name.clone(), stats); + } + + /// Gets statistics for a specific column. + pub fn get_column_stats(&self, column_name: &str) -> Option<&ColumnStats> { + self.stats.get(column_name) + } + + /// Checks if file can satisfy a filter based on column stats. + pub fn can_satisfy_filter(&self, filter: &Filter) -> Option { + self.stats + .get(&filter.field_name) + .and_then(|stats| stats.can_satisfy_filter(filter)) + } + + /// Checks if file can satisfy all filters (AND logic). Returns false to prune file. + pub fn can_satisfy_filters(&self, filters: &[Filter]) -> bool { + for filter in filters { + match self.can_satisfy_filter(filter) { + Some(false) => return false, // Definitely cannot satisfy this filter + Some(true) | None => continue, // Maybe can satisfy, or unknown + } + } + true // All filters might be satisfiable + } + + /// Returns true if file should be included. Conservative: includes file when uncertain. + pub fn should_include_file(&self, filters: &[Filter]) -> bool { + self.can_satisfy_filters(filters) + } + + /// Extracts column statistics from Parquet file metadata. + /// + /// Reads min/max values from Parquet column statistics and converts them to strings + /// for generic comparison. + /// + /// # Arguments + /// * `parquet_metadata` - Parquet file metadata containing column statistics + /// + /// # Returns + /// `FileColumnStats` with statistics for all columns that have min/max available + pub fn from_parquet_metadata(parquet_metadata: &ParquetMetaData) -> Result { + let mut file_stats = Self::new(); + + // Get the schema to map column indices to names + let schema = parquet_metadata.file_metadata().schema_descr(); + + // Iterate over all row groups to collect min/max across the file + for row_group in parquet_metadata.row_groups() { + for (col_idx, column_chunk) in row_group.columns().iter().enumerate() { + let column_descr = schema.column(col_idx); + let column_name = column_descr.name().to_string(); + + // Get statistics if available + if let Some(stats) = column_chunk.statistics() { + let (min_val, max_val) = match stats { + Statistics::Boolean(s) => ( + s.min_opt().map(|v| v.to_string()), + s.max_opt().map(|v| v.to_string()), + ), + Statistics::Int32(s) => ( + s.min_opt().map(|v| v.to_string()), + s.max_opt().map(|v| v.to_string()), + ), + Statistics::Int64(s) => ( + s.min_opt().map(|v| v.to_string()), + s.max_opt().map(|v| v.to_string()), + ), + Statistics::Int96(s) => ( + s.min_opt().map(|v| format!("{:?}", v)), + s.max_opt().map(|v| format!("{:?}", v)), + ), + Statistics::Float(s) => ( + s.min_opt().map(|v| v.to_string()), + s.max_opt().map(|v| v.to_string()), + ), + Statistics::Double(s) => ( + s.min_opt().map(|v| v.to_string()), + s.max_opt().map(|v| v.to_string()), + ), + Statistics::ByteArray(s) => ( + s.min_opt() + .and_then(|v| String::from_utf8(v.data().to_vec()).ok()), + s.max_opt() + .and_then(|v| String::from_utf8(v.data().to_vec()).ok()), + ), + Statistics::FixedLenByteArray(s) => ( + s.min_opt() + .and_then(|v| String::from_utf8(v.data().to_vec()).ok()), + s.max_opt() + .and_then(|v| String::from_utf8(v.data().to_vec()).ok()), + ), + }; + + let null_count = stats.null_count_opt(); + let value_count = stats.distinct_count_opt().map(|n| n as i64); + + // Update or insert stats (take min of mins, max of maxes across row groups) + if let Some(existing_stats) = file_stats.stats.get_mut(&column_name) { + // Update with min of mins, max of maxes + if let (Some(new_min), Some(existing_min)) = + (&min_val, &existing_stats.min_value) + { + if new_min < existing_min { + existing_stats.min_value = min_val.clone(); + } + } + if let (Some(new_max), Some(existing_max)) = + (&max_val, &existing_stats.max_value) + { + if new_max > existing_max { + existing_stats.max_value = max_val.clone(); + } + } + // Aggregate null counts + if let (Some(new_null), Some(existing_null)) = + (null_count, existing_stats.null_count) + { + existing_stats.null_count = Some(existing_null + new_null as i64); + } + } else { + // First time seeing this column + file_stats.add_column_stats(ColumnStats::new( + column_name.clone(), + min_val, + max_val, + null_count.map(|n| n as i64), + value_count, + )); + } + } + } + } + + Ok(file_stats) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn create_filter(field: &str, op: ExprOperator, value: &str) -> Filter { + Filter { + field_name: field.to_string(), + operator: op, + field_value: value.to_string(), + } + } + + #[test] + fn test_column_stats_eq() { + let stats = ColumnStats::new( + "age".to_string(), + Some("10".to_string()), + Some("50".to_string()), + None, + None, + ); + + // Value in range + let filter = create_filter("age", ExprOperator::Eq, "25"); + assert_eq!(stats.can_satisfy_filter(&filter), Some(true)); + + // Value below range + let filter = create_filter("age", ExprOperator::Eq, "5"); + assert_eq!(stats.can_satisfy_filter(&filter), Some(false)); + + // Value above range + let filter = create_filter("age", ExprOperator::Eq, "100"); + assert_eq!(stats.can_satisfy_filter(&filter), Some(false)); + } + + #[test] + fn test_column_stats_gt() { + let stats = ColumnStats::new( + "age".to_string(), + Some("10".to_string()), + Some("50".to_string()), + None, + None, + ); + + // Need age > 5, max=50, so possible + let filter = create_filter("age", ExprOperator::Gt, "5"); + assert_eq!(stats.can_satisfy_filter(&filter), Some(true)); + + // Need age > 50, max=50, not possible + let filter = create_filter("age", ExprOperator::Gt, "50"); + assert_eq!(stats.can_satisfy_filter(&filter), Some(false)); + + // Need age > 100, max=50, not possible + let filter = create_filter("age", ExprOperator::Gt, "100"); + assert_eq!(stats.can_satisfy_filter(&filter), Some(false)); + } + + #[test] + fn test_column_stats_lt() { + let stats = ColumnStats::new( + "age".to_string(), + Some("10".to_string()), + Some("50".to_string()), + None, + None, + ); + + // Need age < 100, min=10, so possible + let filter = create_filter("age", ExprOperator::Lt, "100"); + assert_eq!(stats.can_satisfy_filter(&filter), Some(true)); + + // Need age < 10, min=10, not possible + let filter = create_filter("age", ExprOperator::Lt, "10"); + assert_eq!(stats.can_satisfy_filter(&filter), Some(false)); + + // Need age < 5, min=10, not possible + let filter = create_filter("age", ExprOperator::Lt, "5"); + assert_eq!(stats.can_satisfy_filter(&filter), Some(false)); + } + + #[test] + fn test_column_stats_not_equal_unsupported() { + let stats = ColumnStats::new( + "age".to_string(), + Some("10".to_string()), + Some("50".to_string()), + None, + None, + ); + + // Not-equal operator is not supported, should return None + let filter = create_filter("age", ExprOperator::Ne, "25"); + assert_eq!(stats.can_satisfy_filter(&filter), None); + } + + #[test] + fn test_column_stats_wrong_column() { + let stats = ColumnStats::new( + "age".to_string(), + Some("10".to_string()), + Some("50".to_string()), + None, + None, + ); + + let filter = create_filter("name", ExprOperator::Eq, "John"); + assert_eq!(stats.can_satisfy_filter(&filter), None); + } + + #[test] + fn test_file_column_stats_single_filter() { + let mut file_stats = FileColumnStats::new(); + file_stats.add_column_stats(ColumnStats::new( + "fare".to_string(), + Some("10".to_string()), + Some("100".to_string()), + None, + None, + )); + + // Should include: fare might be > 50 + let filter = create_filter("fare", ExprOperator::Gt, "50"); + assert!(file_stats.should_include_file(&[filter])); + + // Should prune: fare cannot be > 200 + let filter = create_filter("fare", ExprOperator::Gt, "200"); + assert!(!file_stats.should_include_file(&[filter])); + } + + #[test] + fn test_file_column_stats_multiple_filters() { + let mut file_stats = FileColumnStats::new(); + file_stats.add_column_stats(ColumnStats::new( + "fare".to_string(), + Some("10".to_string()), + Some("100".to_string()), + None, + None, + )); + file_stats.add_column_stats(ColumnStats::new( + "passengers".to_string(), + Some("1".to_string()), + Some("4".to_string()), + None, + None, + )); + + // Both conditions can be satisfied (AND logic) + let filters = vec![ + create_filter("fare", ExprOperator::Gt, "20"), + create_filter("passengers", ExprOperator::Eq, "2"), + ]; + assert!(file_stats.should_include_file(&filters)); + + // First condition fails (AND logic - all must pass) + let filters = vec![ + create_filter("fare", ExprOperator::Gt, "200"), + create_filter("passengers", ExprOperator::Eq, "2"), + ]; + assert!(!file_stats.should_include_file(&filters)); + } + + #[test] + fn test_file_column_stats_conservative() { + let file_stats = FileColumnStats::new(); // No stats + + // Unknown column - conservative, include file + let filter = create_filter("unknown", ExprOperator::Gt, "100"); + assert!(file_stats.should_include_file(&[filter])); + } +} diff --git a/crates/core/src/expr/filter.rs b/crates/core/src/expr/filter.rs index b9cb7a6b..fab45487 100644 --- a/crates/core/src/expr/filter.rs +++ b/crates/core/src/expr/filter.rs @@ -26,7 +26,7 @@ use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq}; use arrow_schema::{DataType, Field, Schema}; use std::str::FromStr; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct Filter { pub field_name: String, pub operator: ExprOperator, diff --git a/crates/core/src/expr/mod.rs b/crates/core/src/expr/mod.rs index d592c3f6..c77cb0ad 100644 --- a/crates/core/src/expr/mod.rs +++ b/crates/core/src/expr/mod.rs @@ -17,6 +17,7 @@ * under the License. */ +pub mod column_stats; pub mod filter; use crate::error::CoreError; diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index b0d6f4d9..938ea74b 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -294,7 +294,7 @@ impl Table { /// /// # Arguments /// * `num_splits` - The number of chunks to split the file slices into. - /// * `filters` - Partition filters to apply. + /// * `filters` - Filters to apply. Automatically separated into partition and data filters. pub async fn get_file_slices_splits( &self, num_splits: usize, @@ -306,8 +306,14 @@ impl Table { { if let Some(timestamp) = self.timeline.get_latest_commit_timestamp_as_option() { let filters = from_str_tuples(filters)?; - self.get_file_slices_splits_internal(num_splits, timestamp, &filters) - .await + let (partition_filters, data_filters) = self.separate_filters(&filters).await?; + self.get_file_slices_splits_internal( + num_splits, + timestamp, + &partition_filters, + &data_filters, + ) + .await } else { Ok(Vec::new()) } @@ -334,7 +340,7 @@ impl Table { /// # Arguments /// * `num_splits` - The number of chunks to split the file slices into. /// * `timestamp` - The timestamp which file slices associated with. - /// * `filters` - Partition filters to apply. + /// * `filters` - Filters to apply. Automatically separated into partition and data filters. pub async fn get_file_slices_splits_as_of( &self, num_splits: usize, @@ -347,8 +353,14 @@ impl Table { { let timestamp = format_timestamp(timestamp, &self.timezone())?; let filters = from_str_tuples(filters)?; - self.get_file_slices_splits_internal(num_splits, ×tamp, &filters) - .await + let (partition_filters, data_filters) = self.separate_filters(&filters).await?; + self.get_file_slices_splits_internal( + num_splits, + ×tamp, + &partition_filters, + &data_filters, + ) + .await } /// Same as [Table::get_file_slices_splits_as_of], but blocking. @@ -371,16 +383,112 @@ impl Table { }) } + /// Separates filters into partition filters and data filters based on the partition schema. + /// + /// Partition filters target columns in the partition schema (fast path pruning). + /// Data filters target regular data columns (data skipping using Parquet statistics). + async fn separate_filters(&self, filters: &[Filter]) -> Result<(Vec, Vec)> { + let partition_schema = self.get_partition_schema().await?; + let partition_columns: HashSet<_> = partition_schema + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect(); + + let mut partition_filters = Vec::new(); + let mut data_filters = Vec::new(); + + for filter in filters { + if partition_columns.contains(filter.field_name.as_str()) { + partition_filters.push(filter.clone()); + } else { + data_filters.push(filter.clone()); + } + } + + Ok((partition_filters, data_filters)) + } + async fn get_file_slices_splits_internal( &self, num_splits: usize, timestamp: &str, - filters: &[Filter], + partition_filters: &[Filter], + data_filters: &[Filter], ) -> Result>> { - let file_slices = self.get_file_slices_internal(timestamp, filters).await?; + let file_slices = self + .get_file_slices_with_data_filters_internal(timestamp, partition_filters, data_filters) + .await?; Ok(split_into_chunks(file_slices, num_splits)) } + async fn get_file_slices_with_data_filters_internal( + &self, + timestamp: &str, + partition_filters: &[Filter], + data_filters: &[Filter], + ) -> Result> { + let file_slices = self + .get_file_slices_internal(timestamp, partition_filters) + .await?; + + if data_filters.is_empty() { + return Ok(file_slices); + } + + let mut filtered_slices = Vec::new(); + for file_slice in file_slices { + let base_file = &file_slice.base_file; + + // Get the file metadata to construct the relative path + let Some(ref file_metadata) = base_file.file_metadata else { + // If no metadata, conservatively include the file slice + filtered_slices.push(file_slice); + continue; + }; + + let relative_path = if file_slice.partition_path.is_empty() { + file_metadata.name.clone() + } else { + format!("{}/{}", file_slice.partition_path, file_metadata.name) + }; + + let parquet_metadata = match self + .file_system_view + .storage + .get_parquet_file_metadata(&relative_path) + .await + { + Ok(metadata) => metadata, + Err(_) => { + // If we can't read metadata, conservatively include the file + filtered_slices.push(file_slice); + continue; + } + }; + + // Extract column statistics from Parquet metadata + let column_stats = + match crate::expr::column_stats::FileColumnStats::from_parquet_metadata( + &parquet_metadata, + ) { + Ok(stats) => stats, + Err(_) => { + // If we can't extract stats, conservatively include the file + filtered_slices.push(file_slice); + continue; + } + }; + + // Check if the file should be included based on column statistics + if column_stats.should_include_file(data_filters) { + filtered_slices.push(file_slice); + } + } + + Ok(filtered_slices) + } + /// Get all the [FileSlice]s in the table. /// /// # Arguments @@ -1148,6 +1256,136 @@ mod tests { assert_eq!(file_slices_splits[1].len(), 1); } + #[test] + fn hudi_table_get_file_slices_splits_with_filters() { + // Test partition-only filters + let base_url = SampleTable::V6SimplekeygenNonhivestyle.url_to_cow(); + let hudi_table = Table::new_blocking(base_url.path()).unwrap(); + + // byteField is a partition column - should prune partitions + let filters = vec![("byteField", ">=", "10"), ("byteField", "<", "30")]; + let file_slices_splits = hudi_table + .get_file_slices_splits_blocking(2, filters) + .unwrap(); + + let all_slices: Vec<_> = file_slices_splits.iter().flatten().collect(); + let partition_paths: Vec<_> = all_slices + .iter() + .map(|s| s.partition_path.as_str()) + .collect(); + + assert_eq!( + partition_paths.len(), + 2, + "Should include partitions 10 and 20" + ); + assert!(partition_paths.contains(&"10")); + assert!(partition_paths.contains(&"20")); + assert!(!partition_paths.contains(&"30")); + + // Test data-only filters on non-partitioned table + let base_url = SampleTable::V6Nonpartitioned.url_to_cow(); + let hudi_table = Table::new_blocking(base_url.path()).unwrap(); + + // byteField is a data column - should use Parquet statistics + // Data: [0, 1, 1, 1], so range [0, 1] should include files + let filters = vec![("byteField", ">=", "0"), ("byteField", "<=", "1")]; + let file_slices_splits = hudi_table + .get_file_slices_splits_blocking(2, filters) + .unwrap(); + + let total_slices: usize = file_slices_splits.iter().map(|s| s.len()).sum(); + assert_eq!( + total_slices, 1, + "Should include 1 file with matching statistics" + ); + + // Test mixed partition and data filters + let base_url = SampleTable::V6SimplekeygenNonhivestyle.url_to_cow(); + let hudi_table = Table::new_blocking(base_url.path()).unwrap(); + + let filters = vec![ + ("byteField", ">=", "10"), // partition filter + ("byteField", "<", "30"), // partition filter + ("id", ">=", "1"), // data filter + ]; + let file_slices_splits = hudi_table + .get_file_slices_splits_blocking(2, filters) + .unwrap(); + + let all_slices: Vec<_> = file_slices_splits.iter().flatten().collect(); + assert!( + !all_slices.is_empty(), + "Should have file slices after both partition and data filtering" + ); + + let partition_paths: Vec<_> = all_slices + .iter() + .map(|s| s.partition_path.as_str()) + .collect(); + + // Verify partition pruning worked + assert!( + !partition_paths.contains(&"30"), + "Partition 30 should be pruned" + ); + assert!( + partition_paths.contains(&"10") || partition_paths.contains(&"20"), + "Should contain partition 10 or 20" + ); + } + + #[test] + fn hudi_table_get_file_slices_splits_data_filter_excludes_files() { + // Test that data filters exclude files when statistics show no overlap + let base_url = SampleTable::V6Nonpartitioned.url_to_cow(); + let hudi_table = Table::new_blocking(base_url.path()).unwrap(); + + // byteField data: [0, 1, 1, 1], so byteField > 100 should exclude all files + let filters = vec![("byteField", ">", "100")]; + let file_slices_splits = hudi_table + .get_file_slices_splits_blocking(2, filters) + .unwrap(); + + let total_slices: usize = file_slices_splits.iter().map(|s| s.len()).sum(); + assert_eq!( + total_slices, 0, + "Should exclude all files based on statistics" + ); + } + + #[test] + fn hudi_table_get_file_slices_splits_with_complex_mixed_filters() { + // Test complex scenario with multiple partition and data filters + let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow(); + let hudi_table = Table::new_blocking(base_url.path()).unwrap(); + + // byteField and shortField are partition columns, id is data column + let filters = vec![ + ("byteField", ">=", "10"), // partition filter + ("byteField", "<", "30"), // partition filter + ("shortField", "!=", "100"), // partition filter + ("id", ">=", "1"), // data filter + ]; + let file_slices_splits = hudi_table + .get_file_slices_splits_blocking(2, filters) + .unwrap(); + + // Should apply partition pruning first, then data skipping + assert!(!file_slices_splits.is_empty()); + + let all_slices: Vec<_> = file_slices_splits.iter().flatten().collect(); + let partition_paths: Vec<_> = all_slices + .iter() + .map(|s| s.partition_path.as_str()) + .collect(); + + // Should only have byteField=10 with shortField=300 (not 100) + assert!(partition_paths.iter().all(|p| (p.contains("byteField=10") + || p.contains("byteField=20")) + && !p.contains("byteField=30"))); + } + #[test] fn hudi_table_get_file_slices_splits_as_of_timestamps() { let base_url = SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet();