diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index 2cff4359..418a0a79 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -84,6 +84,9 @@ pub enum CoreError { #[error("{0}")] Unsupported(String), + #[error("Invalid input: {0}")] + InvalidInput(String), + #[error(transparent)] Utf8Error(#[from] std::str::Utf8Error), } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 3ba206a6..71397dd6 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -58,3 +58,5 @@ pub mod timeline; pub mod util; use error::Result; + +pub use crate::table::query::QueryType; diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 42e9053a..c247916a 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -91,6 +91,7 @@ pub mod builder; mod fs_view; mod listing; pub mod partition; +pub mod query; mod validation; use crate::config::read::HudiReadConfig; @@ -104,6 +105,7 @@ use crate::schema::resolver::{resolve_avro_schema, resolve_schema}; use crate::table::builder::TableBuilder; use crate::table::fs_view::FileSystemView; use crate::table::partition::PartitionPruner; +use crate::table::query::QueryType; use crate::timeline::util::format_timestamp; use crate::timeline::{Timeline, EARLIEST_START_TIMESTAMP}; use crate::util::collection::split_into_chunks; @@ -611,6 +613,120 @@ impl Table { ) } + /// Execute a query using the standardized query type enum + /// + /// # Arguments + /// * `query_type` - The type of query to execute + /// * `filters` - Partition filters to apply + /// + /// # Example + /// ```rust + /// use hudi_core::table::query::QueryType; + /// + /// // Snapshot query + /// let results = table.query(QueryType::SNAPSHOT, filters).await?; + /// + /// // Time travel query + /// let results = table.query( + /// QueryType::time_travel("20231201120000"), + /// filters + /// ).await?; + /// + /// // Incremental query + /// let results = table.query( + /// QueryType::incremental("20231201000000", Some("20231201235959")), + /// filters + /// ).await?; + /// ``` + pub async fn query(&self, query_type: QueryType, filters: I) -> Result> + where + I: IntoIterator, + S: AsRef, + { + match query_type { + QueryType::Snapshot => self.read_snapshot(filters).await, + QueryType::TimeTravel { timestamp } => { + self.read_snapshot_as_of(×tamp, filters).await + } + QueryType::Incremental { + start_timestamp, + end_timestamp, + } => { + let end_ts = end_timestamp + .as_deref() + .or_else(|| self.timeline.get_latest_commit_timestamp_as_option()) + .ok_or_else(|| { + crate::error::CoreError::InvalidInput( + "No end timestamp available for incremental query".to_string(), + ) + })?; + + self.read_incremental_records(&start_timestamp, Some(end_ts)) + .await + } + QueryType::ReadOptimized => { + // Create a file group reader with read-optimized mode enabled + let _reader = self.create_file_group_reader_with_options([( + HudiReadConfig::UseReadOptimizedMode.as_ref(), + "true", + )])?; + + // Use existing snapshot logic but with read-optimized reader + self.read_snapshot(filters).await + } + } + } + + /// Get file slices using the standardized query type enum + pub async fn get_file_slices_by_query_type( + &self, + query_type: QueryType, + filters: I, + ) -> Result> + where + I: IntoIterator, + S: AsRef, + { + let filters = from_str_tuples(filters)?; + + match query_type { + QueryType::Snapshot => { + if let Some(timestamp) = self.timeline.get_latest_commit_timestamp_as_option() { + self.get_file_slices_internal(timestamp, &filters).await + } else { + Ok(Vec::new()) + } + } + QueryType::TimeTravel { timestamp } => { + self.get_file_slices_internal(×tamp, &filters).await + } + QueryType::Incremental { + start_timestamp, + end_timestamp, + } => { + let end_ts = end_timestamp + .as_deref() + .or_else(|| self.timeline.get_latest_commit_timestamp_as_option()) + .ok_or_else(|| { + crate::error::CoreError::InvalidInput( + "No end timestamp available".to_string(), + ) + })?; + + self.get_file_slices_between_internal(&start_timestamp, end_ts) + .await + } + QueryType::ReadOptimized => { + // Same as snapshot but could be enhanced to filter out log files + if let Some(timestamp) = self.timeline.get_latest_commit_timestamp_as_option() { + self.get_file_slices_internal(timestamp, &filters).await + } else { + Ok(Vec::new()) + } + } + } + } + /// Get all the latest records in the table. /// /// # Arguments @@ -774,6 +890,7 @@ mod tests { use crate::metadata::meta_field::MetaField; use crate::storage::util::join_url_segments; use crate::storage::Storage; + use crate::table::query::QueryType; use hudi_test::{assert_arrow_field_names_eq, assert_avro_field_names_eq, SampleTable}; use std::collections::HashSet; use std::fs::canonicalize; @@ -1789,4 +1906,141 @@ mod tests { Ok(()) } } + + #[tokio::test] + async fn test_query_interface() { + let table_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow(); + let hudi_table = Table::new(table_url.path()).await.unwrap(); + let timestamp = hudi_table.timeline.get_latest_commit_timestamp().unwrap(); + + let snapshot = hudi_table + .query(QueryType::SNAPSHOT, empty_filters()) + .await + .unwrap(); + assert!(!snapshot.is_empty()); + + let time_travel = hudi_table + .query(QueryType::time_travel(×tamp), empty_filters()) + .await + .unwrap(); + assert!(!time_travel.is_empty()); + + let read_optimized = hudi_table + .query(QueryType::READ_OPTIMIZED, empty_filters()) + .await + .unwrap(); + assert!(!read_optimized.is_empty()); + + let incremental_table_url = SampleTable::V6TimebasedkeygenNonhivestyle.url_to_cow(); + let incremental_table = Table::new(incremental_table_url.path()).await.unwrap(); + let commits = incremental_table + .timeline + .get_completed_commits(false) + .await + .unwrap(); + if commits.len() >= 2 { + let start = &commits[0].timestamp; + let end = &commits[1].timestamp; + let incremental = incremental_table + .query(QueryType::incremental(start, Some(end)), empty_filters()) + .await + .unwrap(); + assert!(!incremental.is_empty()); + } + } + + #[tokio::test] + async fn test_get_file_slices_by_query_type() { + let table_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow(); + let hudi_table = Table::new(table_url.path()).await.unwrap(); + let timestamp = hudi_table.timeline.get_latest_commit_timestamp().unwrap(); + + let snapshot_slices = hudi_table + .get_file_slices_by_query_type(QueryType::SNAPSHOT, empty_filters()) + .await + .unwrap(); + assert!(!snapshot_slices.is_empty()); + + let time_travel_slices = hudi_table + .get_file_slices_by_query_type(QueryType::time_travel(×tamp), empty_filters()) + .await + .unwrap(); + assert!(!time_travel_slices.is_empty()); + } + + #[test] + fn test_query_type_builder_methods() { + assert_eq!(QueryType::SNAPSHOT, QueryType::Snapshot); + assert_eq!(QueryType::READ_OPTIMIZED, QueryType::ReadOptimized); + + let time_travel = QueryType::time_travel("20231201120000"); + match time_travel { + QueryType::TimeTravel { timestamp } => { + assert_eq!(timestamp, "20231201120000"); + } + _ => panic!("Expected TimeTravel variant"), + } + + let incremental = QueryType::incremental("start", Some("end")); + match incremental { + QueryType::Incremental { + start_timestamp, + end_timestamp, + } => { + assert_eq!(start_timestamp, "start"); + assert_eq!(end_timestamp, Some("end".to_string())); + } + _ => panic!("Expected Incremental variant"), + } + + let incremental_no_end = QueryType::incremental("start", None::); + match incremental_no_end { + QueryType::Incremental { + start_timestamp, + end_timestamp, + } => { + assert_eq!(start_timestamp, "start"); + assert_eq!(end_timestamp, None); + } + _ => panic!("Expected Incremental variant"), + } + } + + #[test] + fn test_query_type_property_methods() { + assert!(!QueryType::SNAPSHOT.requires_timestamp()); + assert!(!QueryType::READ_OPTIMIZED.requires_timestamp()); + assert!(QueryType::time_travel("ts").requires_timestamp()); + assert!(QueryType::incremental("start", None::).requires_timestamp()); + + assert!(!QueryType::SNAPSHOT.is_incremental()); + assert!(!QueryType::READ_OPTIMIZED.is_incremental()); + assert!(!QueryType::time_travel("ts").is_incremental()); + assert!(QueryType::incremental("start", None::).is_incremental()); + + assert!(!QueryType::SNAPSHOT.is_read_optimized()); + assert!(QueryType::READ_OPTIMIZED.is_read_optimized()); + assert!(!QueryType::time_travel("ts").is_read_optimized()); + assert!(!QueryType::incremental("start", None::).is_read_optimized()); + + assert_eq!(QueryType::SNAPSHOT.start_timestamp(), None); + assert_eq!(QueryType::READ_OPTIMIZED.start_timestamp(), None); + assert_eq!(QueryType::time_travel("ts").start_timestamp(), None); + assert_eq!( + QueryType::incremental("start", None::).start_timestamp(), + Some("start") + ); + + assert_eq!(QueryType::SNAPSHOT.end_timestamp(), None); + assert_eq!(QueryType::READ_OPTIMIZED.end_timestamp(), None); + assert_eq!(QueryType::time_travel("ts").end_timestamp(), Some("ts")); + assert_eq!( + QueryType::incremental("start", None::).end_timestamp(), + None + ); + assert_eq!( + QueryType::incremental("start", Some("end")).end_timestamp(), + Some("end") + ); + } } diff --git a/crates/core/src/table/query.rs b/crates/core/src/table/query.rs new file mode 100644 index 00000000..4464ff92 --- /dev/null +++ b/crates/core/src/table/query.rs @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::fmt; + +/// Standardized query types for Hudi table operations +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum QueryType { + Snapshot, + TimeTravel { + timestamp: String, + }, + Incremental { + start_timestamp: String, + end_timestamp: Option, + }, + ReadOptimized, +} + +impl fmt::Display for QueryType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Snapshot => write!(f, "snapshot"), + Self::TimeTravel { timestamp } => write!(f, "time_travel({})", timestamp), + Self::Incremental { + start_timestamp, + end_timestamp, + } => match end_timestamp { + Some(end) => write!(f, "incremental({}, {})", start_timestamp, end), + None => write!(f, "incremental({}, latest)", start_timestamp), + }, + Self::ReadOptimized => write!(f, "read_optimized"), + } + } +} + +impl QueryType { + pub fn requires_timestamp(&self) -> bool { + matches!(self, Self::TimeTravel { .. } | Self::Incremental { .. }) + } + + pub fn is_incremental(&self) -> bool { + matches!(self, Self::Incremental { .. }) + } + + pub fn is_read_optimized(&self) -> bool { + matches!(self, Self::ReadOptimized) + } + + pub fn start_timestamp(&self) -> Option<&str> { + match self { + Self::Incremental { + start_timestamp, .. + } => Some(start_timestamp), + _ => None, + } + } + + pub fn end_timestamp(&self) -> Option<&str> { + match self { + Self::TimeTravel { timestamp } => Some(timestamp), + Self::Incremental { end_timestamp, .. } => end_timestamp.as_deref(), + _ => None, + } + } +} + +/// Helper constants for commonly used query types +impl QueryType { + pub const SNAPSHOT: Self = Self::Snapshot; + pub const READ_OPTIMIZED: Self = Self::ReadOptimized; + pub fn time_travel(timestamp: impl Into) -> Self { + Self::TimeTravel { + timestamp: timestamp.into(), + } + } + pub fn incremental( + start_timestamp: impl Into, + end_timestamp: Option>, + ) -> Self { + Self::Incremental { + start_timestamp: start_timestamp.into(), + end_timestamp: end_timestamp.map(Into::into), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_query_type_display() { + assert_eq!(QueryType::SNAPSHOT.to_string(), "snapshot"); + assert_eq!( + QueryType::time_travel("20231201120000").to_string(), + "time_travel(20231201120000)" + ); + assert_eq!( + QueryType::incremental("20231201000000", Some("20231201235959")).to_string(), + "incremental(20231201000000, 20231201235959)" + ); + assert_eq!( + QueryType::incremental("20231201000000", None::).to_string(), + "incremental(20231201000000, latest)" + ); + } + + #[test] + fn test_query_type_properties() { + assert!(!QueryType::SNAPSHOT.requires_timestamp()); + assert!(QueryType::time_travel("20231201120000").requires_timestamp()); + assert!(QueryType::incremental("start", None::).is_incremental()); + assert!(QueryType::READ_OPTIMIZED.is_read_optimized()); + } +}