diff --git a/e2e_test/iceberg/test_case/pure_slt/iceberg_datafusion_engine.slt b/e2e_test/iceberg/test_case/pure_slt/iceberg_datafusion_engine.slt index 2d29b4ec8b963..019ed1eb36926 100644 --- a/e2e_test/iceberg/test_case/pure_slt/iceberg_datafusion_engine.slt +++ b/e2e_test/iceberg/test_case/pure_slt/iceberg_datafusion_engine.slt @@ -33,6 +33,7 @@ statement ok FLUSH; sleep 5s + query ?? select * from t; ---- @@ -43,6 +44,44 @@ select * from t for system_time as of '2222-12-10 11:48:06'; ---- 1 xxx +statement ok +insert into t select i, repeat('x', i + 2) from generate_series(2,5) as s(i); + +statement ok +FLUSH; + +sleep 5s + +statement ok +delete from t where id = 1; + +statement ok +FLUSH; + +sleep 5s + +query ?? rowsort +select * from t; +---- +2 xxxx +3 xxxxx +4 xxxxxx +5 xxxxxxx + +statement ok +delete from t where id >= 4; + +statement ok +FLUSH; + +sleep 5s + +query ?? rowsort +select * from t; +---- +2 xxxx +3 xxxxx + statement ok DROP TABLE t; diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index c4f126bc72252..b0b15217aa7b1 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -592,7 +592,7 @@ impl IcebergSplitEnumerator { /// The time complexity is O(n log k), where n is the number of file scan tasks and k is the number of splits. /// The space complexity is O(k), where k is the number of splits. /// The algorithm is stable, so the order of the file scan tasks will be preserved. - fn split_n_vecs( + pub fn split_n_vecs( file_scan_tasks: Vec, split_num: usize, ) -> Vec> { diff --git a/src/frontend/src/datafusion/execute.rs b/src/frontend/src/datafusion/execute.rs index aa37dc5d14454..c21ee7a5f8f55 100644 --- a/src/frontend/src/datafusion/execute.rs +++ b/src/frontend/src/datafusion/execute.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use datafusion::physical_plan::execute_stream; -use datafusion::prelude::SessionContext; +use datafusion::prelude::{SessionConfig as DFSessionConfig, SessionContext as DFSessionContext}; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; use pgwire::types::Format; @@ -47,13 +47,11 @@ pub async fn execute_datafusion_plan( plan: DfBatchQueryPlanResult, formats: Vec, ) -> RwResult { - let ctx = SessionContext::new(); + let df_config = create_config(session.as_ref()); + let ctx = DFSessionContext::new_with_config(df_config); let state = ctx.state(); - // TODO: update datafusion context with risingwave session info - let pg_descs: Vec = plan.schema.fields().iter().map(to_pg_field).collect(); - let column_types = plan.schema.fields().iter().map(|f| f.data_type()).collect(); // avoid optimizing by datafusion @@ -182,3 +180,14 @@ impl CastExecutor { Ok(DataChunk::new(arrays, chunk.visibility().clone())) } } + +fn create_config(session: &SessionImpl) -> DFSessionConfig { + let rw_config = session.config(); + + let mut df_config = DFSessionConfig::new(); + if let Some(batch_parallelism) = rw_config.batch_parallelism().0 { + df_config = df_config.with_target_partitions(batch_parallelism.get().try_into().unwrap()); + } + df_config = df_config.with_batch_size(session.env().batch_config().developer.chunk_size); + df_config +} diff --git a/src/frontend/src/datafusion/iceberg_executor.rs b/src/frontend/src/datafusion/iceberg_executor.rs index 41b985add764a..ee421d5d41411 100644 --- a/src/frontend/src/datafusion/iceberg_executor.rs +++ b/src/frontend/src/datafusion/iceberg_executor.rs @@ -13,11 +13,11 @@ // limitations under the License. use std::any::Any; +use std::collections::HashSet; use std::sync::Arc; -use datafusion::arrow::array::RecordBatch; -use datafusion::arrow::compute::concat_batches; -use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::catalog::TableProvider; use datafusion::error::Result as DFResult; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; @@ -26,10 +26,17 @@ use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties}; use datafusion::prelude::Expr; -use datafusion_common::DataFusionError; +use datafusion_common::{DataFusionError, internal_err, not_impl_err}; +use futures::{StreamExt, TryStreamExt}; use futures_async_stream::try_stream; +use iceberg::scan::FileScanTask; +use iceberg::spec::DataContentType; +use iceberg::table::Table; +use risingwave_common::catalog::{ + ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME, +}; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_connector::source::iceberg::IcebergProperties; +use risingwave_connector::source::prelude::IcebergSplitEnumerator; use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType; use super::{IcebergTableProvider, to_datafusion_error}; @@ -42,17 +49,17 @@ pub struct IcebergScan { inner: Arc, } -#[derive(Debug)] +#[derive(educe::Educe)] +#[educe(Debug)] struct IcebergScanInner { - iceberg_properties: Arc, + #[educe(Debug(ignore))] + table: Table, snapshot_id: Option, - #[allow(dead_code)] + tasks: Vec>, iceberg_scan_type: IcebergScanType, arrow_schema: SchemaRef, - column_names: Option>, - #[allow(dead_code)] + column_names: Vec, need_seq_num: bool, - #[allow(dead_code)] need_file_path_and_pos: bool, plan_properties: PlanProperties, } @@ -97,14 +104,14 @@ impl ExecutionPlan for IcebergScan { partition: usize, context: Arc, ) -> datafusion_common::Result { - if partition != 0 { + if partition >= self.inner.tasks.len() { return Err(DataFusionError::Internal( - "IcebergScan only supports single partition".to_owned(), + "IcebergScan: partition out of bounds".to_owned(), )); } let chunk_size = context.session_config().batch_size(); - let stream = self.inner.clone().execute_inner(chunk_size); + let stream = self.inner.clone().execute_inner(chunk_size, partition); Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), stream, @@ -113,19 +120,14 @@ impl ExecutionPlan for IcebergScan { } impl IcebergScan { - pub fn new( + pub async fn new( provider: &IcebergTableProvider, // TODO: handle these params _projection: Option<&Vec>, _filters: &[Expr], _limit: Option, + batch_parallelism: usize, ) -> DFResult { - if provider.iceberg_scan_type != IcebergScanType::DataScan { - return Err(DataFusionError::NotImplemented( - "Only DataScan is supported currently".to_owned(), - )); - } - let plan_properties = PlanProperties::new( EquivalenceProperties::new(provider.schema()), // TODO: determine partitioning @@ -133,23 +135,48 @@ impl IcebergScan { EmissionType::Incremental, Boundedness::Bounded, ); - let column_names = provider + let mut column_names: Vec = provider .arrow_schema .fields() .iter() .map(|f| f.name().clone()) .collect(); + let table = provider + .iceberg_properties + .load_table() + .await + .map_err(to_datafusion_error)?; + let need_seq_num = column_names + .iter() + .any(|name| name == ICEBERG_SEQUENCE_NUM_COLUMN_NAME); + let need_file_path_and_pos = column_names + .iter() + .any(|name| name == ICEBERG_FILE_PATH_COLUMN_NAME) + && matches!(provider.iceberg_scan_type, IcebergScanType::DataScan); + column_names.retain(|name| { + ![ + ICEBERG_FILE_PATH_COLUMN_NAME, + ICEBERG_SEQUENCE_NUM_COLUMN_NAME, + ICEBERG_FILE_POS_COLUMN_NAME, + ] + .contains(&name.as_str()) + }); - let inner = IcebergScanInner { - iceberg_properties: provider.iceberg_properties.clone(), + let mut inner = IcebergScanInner { + table, snapshot_id: provider.snapshot_id, + tasks: vec![], iceberg_scan_type: provider.iceberg_scan_type, arrow_schema: provider.arrow_schema.clone(), - column_names: Some(column_names), - need_seq_num: false, - need_file_path_and_pos: false, + column_names, + need_seq_num, + need_file_path_and_pos, plan_properties, }; + let scan_tasks = inner.list_iceberg_scan_task().try_collect().await?; + inner.tasks = IcebergSplitEnumerator::split_n_vecs(scan_tasks, batch_parallelism); + inner.plan_properties.partitioning = Partitioning::UnknownPartitioning(inner.tasks.len()); + Ok(Self { inner: Arc::new(inner), }) @@ -157,91 +184,142 @@ impl IcebergScan { } impl IcebergScanInner { - #[try_stream(ok = RecordBatch, error = DataFusionError)] - pub async fn execute_inner(self: Arc, chunk_size: usize) { - let mut buffer = RecordBatchBuffer::new(chunk_size); - let table = self - .iceberg_properties - .load_table() - .await - .map_err(to_datafusion_error)?; - let mut scan_builder = table.scan().with_batch_size(Some(chunk_size)); - if let Some(column_names) = &self.column_names { - scan_builder = scan_builder.select(column_names); - } + #[try_stream(ok = FileScanTask, error = DataFusionError)] + async fn list_iceberg_scan_task(&self) { + let mut scan_builder = self.table.scan().select(&self.column_names); if let Some(snapshot_id) = self.snapshot_id { scan_builder = scan_builder.snapshot_id(snapshot_id); } + if matches!( + self.iceberg_scan_type, + IcebergScanType::EqualityDeleteScan | IcebergScanType::PositionDeleteScan + ) { + scan_builder = scan_builder.with_delete_file_processing_enabled(true); + } let scan = scan_builder.build().map_err(to_datafusion_error)?; - let stream = scan.to_arrow().await.map_err(to_datafusion_error)?; + + let mut position_delete_files_set = HashSet::new(); + let mut equality_delete_files_set = HashSet::new(); #[for_await] - for batch in stream { - let batch = batch.map_err(to_datafusion_error)?; - let batch = cast_batch(self.arrow_schema.clone(), batch)?; - if let Some(batch) = buffer.add(batch)? { - yield batch; + for scan_task in scan.plan_files().await.map_err(to_datafusion_error)? { + let scan_task = scan_task.map_err(to_datafusion_error)?; + match self.iceberg_scan_type { + IcebergScanType::DataScan => { + if scan_task.data_file_content != DataContentType::Data { + return internal_err!( + "Files of type {:?} should not be in the data files", + scan_task.data_file_content + ); + } + yield scan_task; + } + IcebergScanType::EqualityDeleteScan => { + for delete_file in scan_task.deletes { + if delete_file.data_file_content == DataContentType::EqualityDeletes + && equality_delete_files_set.insert(delete_file.data_file_path.clone()) + { + yield delete_file.as_ref().clone() + } + } + } + IcebergScanType::PositionDeleteScan => { + for delete_file in scan_task.deletes { + if delete_file.data_file_content == DataContentType::PositionDeletes + && position_delete_files_set.insert(delete_file.data_file_path.clone()) + { + let mut task = delete_file.as_ref().clone(); + task.project_field_ids = Vec::new(); + yield task; + } + } + } + _ => { + return not_impl_err!( + "Iceberg scan type {:?} is not supported", + self.iceberg_scan_type + ); + } } } - if let Some(batch) = buffer.finish()? { - yield batch; - } } -} -struct RecordBatchBuffer { - buffer: Vec, - current_rows: usize, - max_record_batch_rows: usize, -} + #[try_stream(ok = RecordBatch, error = DataFusionError)] + pub async fn execute_inner(self: Arc, chunk_size: usize, partition: usize) { + let reader = self + .table + .reader_builder() + .with_batch_size(chunk_size) + .build(); -impl RecordBatchBuffer { - fn new(max_record_batch_rows: usize) -> Self { - Self { - buffer: vec![], - current_rows: 0, - max_record_batch_rows, - } - } + for task in &self.tasks[partition] { + let stream = reader + .clone() + .read(tokio_stream::once(Ok(task.clone())).boxed()) + .await + .map_err(to_datafusion_error)?; + let mut pos_start: i64 = 0; - fn add(&mut self, batch: RecordBatch) -> Result, DataFusionError> { - // Case 1: New batch itself is large enough and buffer is empty or too small to be significant - if batch.num_rows() >= self.max_record_batch_rows && self.buffer.is_empty() { - // Buffer was empty, yield current large batch directly - return Ok(Some(batch)); + #[for_await] + for batch in stream { + let batch = batch.map_err(to_datafusion_error)?; + let batch = append_metadata( + batch, + self.need_seq_num, + self.need_file_path_and_pos, + task, + pos_start, + )?; + let batch = cast_batch(self.arrow_schema.clone(), batch)?; + pos_start += i64::try_from(batch.num_rows()).unwrap(); + yield batch; + } } + } +} - // Case 2: Buffer will overflow with the new batch - if !self.buffer.is_empty() - && (self.current_rows + batch.num_rows() > self.max_record_batch_rows) - { - let combined = self.finish_internal()?; // Drain and combine buffer - self.current_rows = batch.num_rows(); - self.buffer.push(batch); // Add current batch to now-empty buffer - return Ok(combined); // Return the combined batch from buffer - } +fn append_metadata( + batch: RecordBatch, + need_seq_num: bool, + need_file_path_and_pos: bool, + task: &FileScanTask, + pos_start: i64, +) -> Result { + let mut columns = batch.columns().to_vec(); + let mut fields = batch.schema().fields().to_vec(); - // Case 3: Buffer has space - self.current_rows += batch.num_rows(); - self.buffer.push(batch); - Ok(None) + if need_seq_num { + let seq_num_array = Int64Array::from_value(task.sequence_number, batch.num_rows()); + columns.push(Arc::new(seq_num_array)); + fields.push(Arc::new(Field::new( + ICEBERG_SEQUENCE_NUM_COLUMN_NAME, + DataType::Int64, + false, + ))); } - // Helper to drain and combine buffer, used by add and finish - fn finish_internal(&mut self) -> Result, DataFusionError> { - if self.buffer.is_empty() { - return Ok(None); - } - let schema_to_use = self.buffer[0].schema(); - let batches_to_combine: Vec<_> = std::mem::take(&mut self.buffer); - let combined = concat_batches(&schema_to_use, &batches_to_combine)?; - self.current_rows = 0; - Ok(Some(combined)) + if need_file_path_and_pos { + let file_path_array = StringArray::from_iter_values(std::iter::repeat_n( + task.data_file_path(), + batch.num_rows(), + )); + let file_pos_array = Int64Array::from_iter((pos_start..).take(batch.num_rows())); + columns.push(Arc::new(file_path_array)); + columns.push(Arc::new(file_pos_array)); + fields.push(Arc::new(Field::new( + ICEBERG_FILE_PATH_COLUMN_NAME, + DataType::Utf8, + false, + ))); + fields.push(Arc::new(Field::new( + ICEBERG_FILE_POS_COLUMN_NAME, + DataType::Int64, + false, + ))); } - fn finish(mut self) -> Result, DataFusionError> { - self.finish_internal() - } + let new_schema = Arc::new(Schema::new(fields)); + RecordBatch::try_new(new_schema, columns).map_err(to_datafusion_error) } fn cast_batch( diff --git a/src/frontend/src/datafusion/iceberg_table_provider.rs b/src/frontend/src/datafusion/iceberg_table_provider.rs index ec0ab00331831..ca19e1682b77d 100644 --- a/src/frontend/src/datafusion/iceberg_table_provider.rs +++ b/src/frontend/src/datafusion/iceberg_table_provider.rs @@ -60,14 +60,14 @@ impl TableProvider for IcebergTableProvider { async fn scan( &self, - _state: &dyn Session, + state: &dyn Session, projection: Option<&Vec>, filters: &[Expr], limit: Option, ) -> DFResult> { - Ok(Arc::new(IcebergScan::new( - self, projection, filters, limit, - )?)) + let batch_parallelism = state.config().target_partitions(); + let scan = IcebergScan::new(self, projection, filters, limit, batch_parallelism).await?; + Ok(Arc::new(scan)) } fn supports_filters_pushdown( @@ -97,7 +97,6 @@ impl IcebergTableProvider { .core .column_catalog .iter() - .filter(|column| !column.is_hidden()) .map(|column| { let column_desc = &column.column_desc; let field = IcebergArrowConvert