diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs index 4e6aa1d2..7cfdc97b 100644 --- a/crates/core/src/table/fs_view.rs +++ b/crates/core/src/table/fs_view.rs @@ -17,13 +17,14 @@ * under the License. */ -use std::collections::{HashMap, HashSet}; -use std::sync::Arc; - use crate::config::HudiConfigs; use crate::file_group::base_file::BaseFile; use crate::file_group::{FileGroup, FileSlice}; use crate::storage::{get_leaf_dirs, Storage}; +use async_recursion::async_recursion; +use std::collections::{HashMap, HashSet}; +use std::path::PathBuf; +use std::sync::Arc; use crate::config::read::HudiReadConfig::ListingParallelism; use crate::error::CoreError; @@ -33,6 +34,8 @@ use crate::Result; use dashmap::DashMap; use futures::stream::{self, StreamExt, TryStreamExt}; +pub const HOODIE_PARTITION_METAFILE_NAME: &str = ".hoodie_partition_metadata"; + /// A view of the Hudi table's data files (files stored outside the `.hoodie/` directory) in the file system. It provides APIs to load and /// access the file groups and file slices. #[derive(Clone, Debug)] @@ -57,6 +60,7 @@ impl FileSystemView { }) } + #[allow(dead_code)] async fn list_all_partition_paths(storage: &Storage) -> Result> { Self::list_partition_paths(storage, &PartitionPruner::empty()).await } @@ -73,34 +77,84 @@ impl FileSystemView { .collect(); let mut partition_paths = Vec::new(); for dir in top_level_dirs { - partition_paths.extend(get_leaf_dirs(storage, Some(&dir)).await?); + let leaf_paths = if partition_pruner.is_empty() { + // full dirs listing + get_leaf_dirs(storage, Some(&dir)).await? + } else { + // leveled pruning + Self::list_partition_paths_with_leveled_pruning(storage, dir, partition_pruner, 0) + .await? + }; + partition_paths.extend(leaf_paths); } if partition_paths.is_empty() { + // TODO: reconsider is it reasonable to add empty partition path? For partitioned table, we should return empty vec rather than vec with empty string partition_paths.push("".to_string()) } - if partition_pruner.is_empty() { - return Ok(partition_paths); - } - Ok(partition_paths - .into_iter() - .filter(|path_str| partition_pruner.should_include(path_str)) - .collect()) + Ok(partition_paths.into_iter().collect()) + } + + #[async_recursion] + async fn list_partition_paths_with_leveled_pruning( + storage: &Storage, + path: String, + partition_pruner: &PartitionPruner, + current_level: usize, + ) -> Result> { + // TODO: consider stop iterating when visit the `partition_metadata` file + let mut leaf_matched_dirs = Vec::new(); + // 1. Check if the current level path can be pruned + if !partition_pruner.should_include_with_level(path.as_str(), current_level) { + // if current level path can be pruned, return empty list + return Ok(leaf_matched_dirs); + } + // 2. Iterate over all child directories and keep listing with pruning + let child_dirs = storage.list_dirs(Some(&path)).await?; + if child_dirs.is_empty() { + // if no child directories, return the current path + leaf_matched_dirs.push(path); + } else { + for child_dir in child_dirs { + let mut child_full_path = PathBuf::new(); + child_full_path.push(&path); + child_full_path.push(&child_dir); + leaf_matched_dirs.extend( + Self::list_partition_paths_with_leveled_pruning( + storage, + child_full_path.to_str().unwrap().to_string(), + partition_pruner, + current_level + 1, + ) + .await?, + ); + } + } + Ok(leaf_matched_dirs) } async fn list_file_groups_for_partition( storage: &Storage, partition_path: &str, - ) -> Result> { - let file_metadata: Vec = storage - .list_files(Some(partition_path)) - .await? + ) -> Result<( + bool, /*if valid partition dir*/ + Vec, /*file groups*/ + )> { + let files = storage.list_files(Some(partition_path)).await?; + if !files + .iter() + .any(|f| f.name.eq(HOODIE_PARTITION_METAFILE_NAME)) + { + // not a partition directory + return Ok((false, Vec::new())); + } + let data_files_metadata: Vec = files .into_iter() .filter(|f| f.name.ends_with(".parquet")) .collect(); let mut fg_id_to_base_files: HashMap> = HashMap::new(); - for metadata in file_metadata { + for metadata in data_files_metadata { let base_file = BaseFile::try_from(metadata)?; let fg_id = &base_file.file_group_id; fg_id_to_base_files @@ -117,16 +171,16 @@ impl FileSystemView { } file_groups.push(fg); } - Ok(file_groups) + Ok((true, file_groups)) } async fn load_file_groups(&self, partition_pruner: &PartitionPruner) -> Result<()> { - let all_partition_paths = Self::list_all_partition_paths(&self.storage).await?; + let need_partition_paths = + Self::list_partition_paths(&self.storage, partition_pruner).await?; - let partition_paths_to_list = all_partition_paths + let partition_paths_to_list = need_partition_paths .into_iter() .filter(|p| !self.partition_to_file_groups.contains_key(p)) - .filter(|p| partition_pruner.should_include(p)) .collect::>(); let parallelism = self @@ -140,8 +194,10 @@ impl FileSystemView { Ok::<_, CoreError>((path, file_groups)) }) .buffer_unordered(parallelism) - .try_for_each(|(path, file_groups)| async move { - self.partition_to_file_groups.insert(path, file_groups); + .try_for_each(|(path, (valid_partition_path, file_groups))| async move { + if valid_partition_path { + self.partition_to_file_groups.insert(path, file_groups); + } Ok(()) }) .await @@ -194,6 +250,7 @@ mod tests { use crate::table::partition::PartitionPruner; use crate::table::Table; + use arrow_schema::Schema; use hudi_tests::TestTable; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -334,4 +391,131 @@ mod tests { assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 2); } } + + #[tokio::test] + async fn fs_view_get_latest_file_slices_with_complex_partition_filters() { + let base_url = TestTable::V6ComplexkeygenHivestyle.url(); + let hudi_table = Table::new(base_url.path()).await.unwrap(); + let partition_schema = hudi_table.get_partition_schema().await.unwrap(); + + fn create_filter( + filter_triples: Vec<(&str, &str, &str)>, + schema: &Schema, + config: &HudiConfigs, + ) -> PartitionPruner { + let filters: Vec = filter_triples + .iter() + .map(|binary_expr_tuple| Filter::try_from(*binary_expr_tuple).unwrap()) + .collect(); + PartitionPruner::new(&filters, schema, config).unwrap() + } + + verify_partition_pruning( + base_url.clone(), + create_filter( + vec![("byteField", "<", "20"), ("shortField", "=", "300")], + &partition_schema, + hudi_table.hudi_configs.as_ref(), + ), + 1, + 1, + vec!["a22e8257-e249-45e9-ba46-115bc85adcba-0"], + ) + .await; + + verify_partition_pruning( + base_url.clone(), + create_filter( + vec![("byteField", "<", "20"), ("shortField", "=", "100")], + &partition_schema, + hudi_table.hudi_configs.as_ref(), + ), + 0, + 0, + vec![], + ) + .await; + + verify_partition_pruning( + base_url.clone(), + create_filter( + vec![("byteField", "<=", "20")], + &partition_schema, + hudi_table.hudi_configs.as_ref(), + ), + 2, + 2, + vec![ + "a22e8257-e249-45e9-ba46-115bc85adcba-0", + "bb7c3a45-387f-490d-aab2-981c3f1a8ada-0", + ], + ) + .await; + + verify_partition_pruning( + base_url.clone(), + create_filter( + vec![("shortField", ">", "100")], + &partition_schema, + hudi_table.hudi_configs.as_ref(), + ), + 1, + 1, + vec!["a22e8257-e249-45e9-ba46-115bc85adcba-0"], + ) + .await; + + verify_partition_pruning( + base_url.clone(), + create_filter( + vec![("shortField", "=", "100")], + &partition_schema, + hudi_table.hudi_configs.as_ref(), + ), + 2, + 2, + vec![ + "4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0", + "bb7c3a45-387f-490d-aab2-981c3f1a8ada-0", + ], + ) + .await; + + verify_partition_pruning( + base_url.clone(), + PartitionPruner::empty(), + 3, + 3, + vec![ + "4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0", + "a22e8257-e249-45e9-ba46-115bc85adcba-0", + "bb7c3a45-387f-490d-aab2-981c3f1a8ada-0", + ], + ) + .await; + } + + async fn verify_partition_pruning( + base_url: Url, + partition_pruner: PartitionPruner, + expected_fg_num: usize, + expected_slices_num: usize, + expected_fg_ids: Vec<&str>, + ) { + let fs_view = create_test_fs_view(base_url).await; + assert_eq!(fs_view.partition_to_file_groups.len(), 0); + let excludes = HashSet::new(); + let file_slices = fs_view + .get_file_slices_as_of("20240418173235694", &partition_pruner, &excludes) + .await + .unwrap(); + assert_eq!(fs_view.partition_to_file_groups.len(), expected_fg_num); + assert_eq!(file_slices.len(), expected_slices_num); + let mut fg_ids = file_slices + .iter() + .map(|fsl| fsl.file_group_id()) + .collect::>(); + fg_ids.sort(); + assert_eq!(fg_ids, expected_fg_ids); + } } diff --git a/crates/core/src/table/partition.rs b/crates/core/src/table/partition.rs index 0386ad23..c6882e62 100644 --- a/crates/core/src/table/partition.rs +++ b/crates/core/src/table/partition.rs @@ -31,7 +31,7 @@ use std::sync::Arc; /// A partition pruner that filters partitions based on the partition path and its filters. #[derive(Debug, Clone)] pub struct PartitionPruner { - schema: Arc, + pub schema: Arc, is_hive_style: bool, is_url_encoded: bool, and_filters: Vec, @@ -98,6 +98,80 @@ impl PartitionPruner { }) } + pub fn should_include_with_level(&self, partition_path: &str, level: usize) -> bool { + let (field, value) = match self.parse_segment_with_level(partition_path, level) { + Ok(s) => s, + Err(_) => return true, + }; + + self.and_filters.iter().all(|filter| { + if field.eq(filter.field.name()) { + match filter.apply_comparsion(&value) { + Ok(scalar) => scalar.value(0), + Err(_) => true, + } + } else { + true + } + }) + } + + fn parse_segment_with_level( + &self, + partition_path: &str, + level: usize, + ) -> Result<(String, Scalar)> { + let partition_path = if self.is_url_encoded { + percent_encoding::percent_decode(partition_path.as_bytes()) + .decode_utf8()? + .into_owned() + } else { + partition_path.to_string() + }; + + if level >= self.schema.fields.len() { + return Err(InvalidPartitionPath(format!( + "Partition path should have {} part(s) but got {}", + self.schema.fields.len(), + level + ))); + } + + let parts: Vec<&str> = partition_path.split('/').collect(); + if level >= parts.len() { + return Err(InvalidPartitionPath(format!( + "Partition path should have {} part(s) but got {}", + parts.len(), + level + ))); + } + + let need_part = parts[level]; + + let need_field = self.schema.fields.get(level).unwrap(); + + let value = if self.is_hive_style { + let (name, value) = need_part + .split_once('=') + .ok_or(InvalidPartitionPath(format!( + "Partition path should be hive-style but got {}", + need_part + )))?; + if name != need_field.name() { + return Err(InvalidPartitionPath(format!( + "Partition path should contain {} but got {}", + need_field.name(), + name + ))); + } + value + } else { + need_part + }; + let scalar = SchemableFilter::cast_value(&[value], need_field.data_type())?; + Ok((need_field.name().to_string(), scalar)) + } + fn parse_segments(&self, partition_path: &str) -> Result>> { let partition_path = if self.is_url_encoded { percent_encoding::percent_decode(partition_path.as_bytes()) @@ -246,6 +320,77 @@ mod tests { assert!(segments.contains_key("count")); } + #[test] + fn test_partition_pruner_should_include_with_level() { + let schema = create_test_schema(); + let configs = create_hudi_configs(true, false); + + let filter_gt_date = Filter::try_from(("date", ">", "2023-01-01")).unwrap(); + let filter_eq_a = Filter::try_from(("category", "=", "A")).unwrap(); + let filter_lte_100 = Filter::try_from(("count", "<=", "100")).unwrap(); + + let pruner = PartitionPruner::new( + &[filter_gt_date, filter_eq_a, filter_lte_100], + &schema, + &configs, + ) + .unwrap(); + + assert!(pruner.should_include_with_level("date=2023-02-01", 0)); + assert_not!(pruner.should_include_with_level("date=2022-12-31", 0)); + assert!(pruner.should_include_with_level("date=2023-02-01/category=A", 1)); + assert_not!(pruner.should_include_with_level("date=2023-02-01/category=B", 1)); + assert!(pruner.should_include_with_level("date=2022-12-32/category=A", 1)); + assert!(pruner.should_include_with_level("date=2023-02-01/category=A/count=10", 2)); + assert_not!(pruner.should_include_with_level("date=2023-02-01/category=A/count=200", 2)); + assert!(pruner.should_include_with_level("date=2022-12-32/category=B/count=100", 2)); + } + + #[test] + fn test_partition_pruner_parse_segments_with_level() { + let schema = create_test_schema(); + let configs = create_hudi_configs(true, false); + let pruner = PartitionPruner::new(&[], &schema, &configs).unwrap(); + + // 1. test with valid level + let (mut field, _) = pruner + .parse_segment_with_level("date=2023-02-01/category=A/count=10", 0) + .unwrap(); + assert_eq!(field, "date"); + (field, _) = pruner + .parse_segment_with_level("date=2023-02-01/category=A/count=10", 1) + .unwrap(); + assert_eq!(field, "category"); + (field, _) = pruner + .parse_segment_with_level("date=2023-02-01/category=A/count=10", 2) + .unwrap(); + assert_eq!(field, "count"); + (field, _) = pruner + .parse_segment_with_level("date=2023-02-01/category=A", 1) + .unwrap(); + assert_eq!(field, "category"); + (field, _) = pruner + .parse_segment_with_level("date=2023-02-01/category=A", 0) + .unwrap(); + assert_eq!(field, "date"); + + // 2. test with invalid level + let result = pruner.parse_segment_with_level("date=2023-02-01/category=A/count=10", 3); + assert!(matches!(result.unwrap_err(), InvalidPartitionPath(_))); + + // 3. test with mismatch level + let result = pruner.parse_segment_with_level("date=2023-02-01/category=A", 2); + assert!(matches!(result.unwrap_err(), InvalidPartitionPath(_))); + + // 4. test with mismatch partition field name + let result = pruner.parse_segment_with_level("date=2023-02-01/count=10/category=A", 2); + assert!(matches!(result.unwrap_err(), InvalidPartitionPath(_))); + + // 5. test with not-exist partition field + let result = pruner.parse_segment_with_level("dt=2023-02-01/category=A/count=10", 0); + assert!(matches!(result.unwrap_err(), InvalidPartitionPath(_))); + } + #[test] fn test_partition_pruner_url_encoded() { let schema = create_test_schema();