Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ pub enum CoreError {
#[error("{0}")]
Unsupported(String),

#[error("Invalid input: {0}")]
InvalidInput(String),

#[error(transparent)]
Utf8Error(#[from] std::str::Utf8Error),
}
2 changes: 2 additions & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,5 @@ pub mod timeline;
pub mod util;

use error::Result;

pub use crate::table::query::QueryType;
254 changes: 254 additions & 0 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub mod builder;
mod fs_view;
mod listing;
pub mod partition;
pub mod query;
mod validation;

use crate::config::read::HudiReadConfig;
Expand All @@ -104,6 +105,7 @@ use crate::schema::resolver::{resolve_avro_schema, resolve_schema};
use crate::table::builder::TableBuilder;
use crate::table::fs_view::FileSystemView;
use crate::table::partition::PartitionPruner;
use crate::table::query::QueryType;
use crate::timeline::util::format_timestamp;
use crate::timeline::{Timeline, EARLIEST_START_TIMESTAMP};
use crate::util::collection::split_into_chunks;
Expand Down Expand Up @@ -611,6 +613,120 @@ impl Table {
)
}

/// Execute a query using the standardized query type enum
///
/// # Arguments
/// * `query_type` - The type of query to execute
/// * `filters` - Partition filters to apply
///
/// # Example
/// ```rust
/// use hudi_core::table::query::QueryType;
///
/// // Snapshot query
/// let results = table.query(QueryType::SNAPSHOT, filters).await?;
///
/// // Time travel query
/// let results = table.query(
/// QueryType::time_travel("20231201120000"),
/// filters
/// ).await?;
///
/// // Incremental query
/// let results = table.query(
/// QueryType::incremental("20231201000000", Some("20231201235959")),
/// filters
/// ).await?;
/// ```
pub async fn query<I, S>(&self, query_type: QueryType, filters: I) -> Result<Vec<RecordBatch>>
where
I: IntoIterator<Item = (S, S, S)>,
S: AsRef<str>,
{
match query_type {
QueryType::Snapshot => self.read_snapshot(filters).await,
QueryType::TimeTravel { timestamp } => {
self.read_snapshot_as_of(&timestamp, filters).await
}
QueryType::Incremental {
start_timestamp,
end_timestamp,
} => {
let end_ts = end_timestamp
.as_deref()
.or_else(|| self.timeline.get_latest_commit_timestamp_as_option())
.ok_or_else(|| {
crate::error::CoreError::InvalidInput(
"No end timestamp available for incremental query".to_string(),
)
})?;

self.read_incremental_records(&start_timestamp, Some(end_ts))
.await
}
QueryType::ReadOptimized => {
// Create a file group reader with read-optimized mode enabled
let _reader = self.create_file_group_reader_with_options([(
HudiReadConfig::UseReadOptimizedMode.as_ref(),
"true",
)])?;

// Use existing snapshot logic but with read-optimized reader
self.read_snapshot(filters).await
}
}
}

/// Get file slices using the standardized query type enum
pub async fn get_file_slices_by_query_type<I, S>(
&self,
query_type: QueryType,
filters: I,
) -> Result<Vec<FileSlice>>
where
I: IntoIterator<Item = (S, S, S)>,
S: AsRef<str>,
{
let filters = from_str_tuples(filters)?;

match query_type {
QueryType::Snapshot => {
if let Some(timestamp) = self.timeline.get_latest_commit_timestamp_as_option() {
self.get_file_slices_internal(timestamp, &filters).await
} else {
Ok(Vec::new())
}
}
QueryType::TimeTravel { timestamp } => {
self.get_file_slices_internal(&timestamp, &filters).await
}
QueryType::Incremental {
start_timestamp,
end_timestamp,
} => {
let end_ts = end_timestamp
.as_deref()
.or_else(|| self.timeline.get_latest_commit_timestamp_as_option())
.ok_or_else(|| {
crate::error::CoreError::InvalidInput(
"No end timestamp available".to_string(),
)
})?;

self.get_file_slices_between_internal(&start_timestamp, end_ts)
.await
}
QueryType::ReadOptimized => {
// Same as snapshot but could be enhanced to filter out log files
if let Some(timestamp) = self.timeline.get_latest_commit_timestamp_as_option() {
self.get_file_slices_internal(timestamp, &filters).await
} else {
Ok(Vec::new())
}
}
}
}

/// Get all the latest records in the table.
///
/// # Arguments
Expand Down Expand Up @@ -774,6 +890,7 @@ mod tests {
use crate::metadata::meta_field::MetaField;
use crate::storage::util::join_url_segments;
use crate::storage::Storage;
use crate::table::query::QueryType;
use hudi_test::{assert_arrow_field_names_eq, assert_avro_field_names_eq, SampleTable};
use std::collections::HashSet;
use std::fs::canonicalize;
Expand Down Expand Up @@ -1789,4 +1906,141 @@ mod tests {
Ok(())
}
}

#[tokio::test]
async fn test_query_interface() {
let table_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
let hudi_table = Table::new(table_url.path()).await.unwrap();
let timestamp = hudi_table.timeline.get_latest_commit_timestamp().unwrap();

let snapshot = hudi_table
.query(QueryType::SNAPSHOT, empty_filters())
.await
.unwrap();
assert!(!snapshot.is_empty());

let time_travel = hudi_table
.query(QueryType::time_travel(&timestamp), empty_filters())
.await
.unwrap();
assert!(!time_travel.is_empty());

let read_optimized = hudi_table
.query(QueryType::READ_OPTIMIZED, empty_filters())
.await
.unwrap();
assert!(!read_optimized.is_empty());

let incremental_table_url = SampleTable::V6TimebasedkeygenNonhivestyle.url_to_cow();
let incremental_table = Table::new(incremental_table_url.path()).await.unwrap();
let commits = incremental_table
.timeline
.get_completed_commits(false)
.await
.unwrap();
if commits.len() >= 2 {
let start = &commits[0].timestamp;
let end = &commits[1].timestamp;
let incremental = incremental_table
.query(QueryType::incremental(start, Some(end)), empty_filters())
.await
.unwrap();
assert!(!incremental.is_empty());
}
}

#[tokio::test]
async fn test_get_file_slices_by_query_type() {
let table_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
let hudi_table = Table::new(table_url.path()).await.unwrap();
let timestamp = hudi_table.timeline.get_latest_commit_timestamp().unwrap();

let snapshot_slices = hudi_table
.get_file_slices_by_query_type(QueryType::SNAPSHOT, empty_filters())
.await
.unwrap();
assert!(!snapshot_slices.is_empty());

let time_travel_slices = hudi_table
.get_file_slices_by_query_type(QueryType::time_travel(&timestamp), empty_filters())
.await
.unwrap();
assert!(!time_travel_slices.is_empty());
}

#[test]
fn test_query_type_builder_methods() {
assert_eq!(QueryType::SNAPSHOT, QueryType::Snapshot);
assert_eq!(QueryType::READ_OPTIMIZED, QueryType::ReadOptimized);

let time_travel = QueryType::time_travel("20231201120000");
match time_travel {
QueryType::TimeTravel { timestamp } => {
assert_eq!(timestamp, "20231201120000");
}
_ => panic!("Expected TimeTravel variant"),
}

let incremental = QueryType::incremental("start", Some("end"));
match incremental {
QueryType::Incremental {
start_timestamp,
end_timestamp,
} => {
assert_eq!(start_timestamp, "start");
assert_eq!(end_timestamp, Some("end".to_string()));
}
_ => panic!("Expected Incremental variant"),
}

let incremental_no_end = QueryType::incremental("start", None::<String>);
match incremental_no_end {
QueryType::Incremental {
start_timestamp,
end_timestamp,
} => {
assert_eq!(start_timestamp, "start");
assert_eq!(end_timestamp, None);
}
_ => panic!("Expected Incremental variant"),
}
}

#[test]
fn test_query_type_property_methods() {
assert!(!QueryType::SNAPSHOT.requires_timestamp());
assert!(!QueryType::READ_OPTIMIZED.requires_timestamp());
assert!(QueryType::time_travel("ts").requires_timestamp());
assert!(QueryType::incremental("start", None::<String>).requires_timestamp());

assert!(!QueryType::SNAPSHOT.is_incremental());
assert!(!QueryType::READ_OPTIMIZED.is_incremental());
assert!(!QueryType::time_travel("ts").is_incremental());
assert!(QueryType::incremental("start", None::<String>).is_incremental());

assert!(!QueryType::SNAPSHOT.is_read_optimized());
assert!(QueryType::READ_OPTIMIZED.is_read_optimized());
assert!(!QueryType::time_travel("ts").is_read_optimized());
assert!(!QueryType::incremental("start", None::<String>).is_read_optimized());

assert_eq!(QueryType::SNAPSHOT.start_timestamp(), None);
assert_eq!(QueryType::READ_OPTIMIZED.start_timestamp(), None);
assert_eq!(QueryType::time_travel("ts").start_timestamp(), None);
assert_eq!(
QueryType::incremental("start", None::<String>).start_timestamp(),
Some("start")
);

assert_eq!(QueryType::SNAPSHOT.end_timestamp(), None);
assert_eq!(QueryType::READ_OPTIMIZED.end_timestamp(), None);
assert_eq!(QueryType::time_travel("ts").end_timestamp(), Some("ts"));
assert_eq!(
QueryType::incremental("start", None::<String>).end_timestamp(),
None
);
assert_eq!(
QueryType::incremental("start", Some("end")).end_timestamp(),
Some("end")
);
}
}
Loading