Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions e2e_test/iceberg/test_case/pure_slt/iceberg_datafusion_engine.slt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,38 @@ select * from t for system_time as of '2222-12-10 11:48:06';
----
1 xxx

statement ok
insert into t select i, repeat('x', i) from generate_series(2,5) as s(i);

statement ok
FLUSH;

statement ok
delete from t where id = 1;

statement ok
FLUSH;

query ?? rowsort
select * from t;
----
2 xxxx
3 xxxxx
4 xxxxxx
5 xxxxxxx

statement ok
delete from t where id >= 4;

statement ok
FLUSH;

query ?? rowsort
select * from t;
----
2 xxxx
3 xxxxx

statement ok
DROP TABLE t;

Expand Down
208 changes: 171 additions & 37 deletions src/frontend/src/datafusion/iceberg_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
// limitations under the License.

use std::any::Any;
use std::collections::HashSet;
use std::sync::Arc;

use datafusion::arrow::array::RecordBatch;
use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray};
use datafusion::arrow::compute::concat_batches;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::catalog::TableProvider;
use datafusion::error::Result as DFResult;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
Expand All @@ -26,10 +27,16 @@ use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties};
use datafusion::prelude::Expr;
use datafusion_common::DataFusionError;
use datafusion_common::{DataFusionError, internal_err, not_impl_err};
use futures::{StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use iceberg::scan::FileScanTask;
use iceberg::spec::DataContentType;
use iceberg::table::Table;
use risingwave_common::catalog::{
ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_connector::source::iceberg::IcebergProperties;
use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;

use super::{IcebergTableProvider, to_datafusion_error};
Expand All @@ -44,12 +51,13 @@ pub struct IcebergScan {

#[derive(Debug)]
struct IcebergScanInner {
iceberg_properties: Arc<IcebergProperties>,
table: Table,
snapshot_id: Option<i64>,
tasks: Vec<FileScanTask>,
#[allow(dead_code)]
iceberg_scan_type: IcebergScanType,
arrow_schema: SchemaRef,
column_names: Option<Vec<String>>,
column_names: Vec<String>,
#[allow(dead_code)]
need_seq_num: bool,
#[allow(dead_code)]
Expand Down Expand Up @@ -97,14 +105,14 @@ impl ExecutionPlan for IcebergScan {
partition: usize,
context: Arc<TaskContext>,
) -> datafusion_common::Result<SendableRecordBatchStream> {
if partition != 0 {
if partition >= self.inner.tasks.len() {
return Err(DataFusionError::Internal(
"IcebergScan only supports single partition".to_owned(),
"IcebergScan: partition out of bounds".to_owned(),
));
}

let chunk_size = context.session_config().batch_size();
let stream = self.inner.clone().execute_inner(chunk_size);
let stream = self.inner.clone().execute_inner(chunk_size, partition);
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
stream,
Expand All @@ -113,76 +121,158 @@ impl ExecutionPlan for IcebergScan {
}

impl IcebergScan {
pub fn new(
pub async fn new(
provider: &IcebergTableProvider,
// TODO: handle these params
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> DFResult<Self> {
if provider.iceberg_scan_type != IcebergScanType::DataScan {
return Err(DataFusionError::NotImplemented(
"Only DataScan is supported currently".to_owned(),
));
}

let plan_properties = PlanProperties::new(
EquivalenceProperties::new(provider.schema()),
// TODO: determine partitioning
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
);
let column_names = provider
let mut column_names: Vec<String> = provider
.arrow_schema
.fields()
.iter()
.map(|f| f.name().clone())
.collect();
let table = provider
.iceberg_properties
.load_table()
.await
.map_err(to_datafusion_error)?;
let need_seq_num = column_names
.iter()
.any(|name| name == ICEBERG_SEQUENCE_NUM_COLUMN_NAME);
let need_file_path_and_pos = column_names
.iter()
.any(|name| name == ICEBERG_FILE_PATH_COLUMN_NAME)
&& matches!(provider.iceberg_scan_type, IcebergScanType::DataScan);
column_names.retain(|name| {
![
ICEBERG_FILE_PATH_COLUMN_NAME,
ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
ICEBERG_FILE_POS_COLUMN_NAME,
]
.contains(&name.as_str())
});

let inner = IcebergScanInner {
iceberg_properties: provider.iceberg_properties.clone(),
let mut inner = IcebergScanInner {
table,
snapshot_id: provider.snapshot_id,
tasks: vec![],
iceberg_scan_type: provider.iceberg_scan_type,
arrow_schema: provider.arrow_schema.clone(),
column_names: Some(column_names),
need_seq_num: false,
need_file_path_and_pos: false,
column_names,
need_seq_num,
need_file_path_and_pos,
plan_properties,
};
inner.tasks = inner.list_iceberg_scan_task().try_collect().await?;
inner.plan_properties.partitioning = Partitioning::UnknownPartitioning(inner.tasks.len());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a session variable called batch_parallelism, I think we can use this value to control the parallelism instead of using the task number directly, because the task number could be very large.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in new commit


Ok(Self {
inner: Arc::new(inner),
})
}
}

impl IcebergScanInner {
#[try_stream(ok = RecordBatch, error = DataFusionError)]
pub async fn execute_inner(self: Arc<Self>, chunk_size: usize) {
let mut buffer = RecordBatchBuffer::new(chunk_size);
let table = self
.iceberg_properties
.load_table()
.await
.map_err(to_datafusion_error)?;
let mut scan_builder = table.scan().with_batch_size(Some(chunk_size));
if let Some(column_names) = &self.column_names {
scan_builder = scan_builder.select(column_names);
}
#[try_stream(ok = FileScanTask, error = DataFusionError)]
async fn list_iceberg_scan_task(&self) {
let mut scan_builder = self.table.scan().select(&self.column_names);
if let Some(snapshot_id) = self.snapshot_id {
scan_builder = scan_builder.snapshot_id(snapshot_id);
}
if matches!(
self.iceberg_scan_type,
IcebergScanType::EqualityDeleteScan | IcebergScanType::PositionDeleteScan
) {
scan_builder = scan_builder.with_delete_file_processing_enabled(true);
}
let scan = scan_builder.build().map_err(to_datafusion_error)?;
let stream = scan.to_arrow().await.map_err(to_datafusion_error)?;

let mut position_delete_files_set = HashSet::new();
let mut equality_delete_files_set = HashSet::new();

#[for_await]
for scan_task in scan.plan_files().await.map_err(to_datafusion_error)? {
let scan_task = scan_task.map_err(to_datafusion_error)?;
match self.iceberg_scan_type {
IcebergScanType::DataScan => {
if scan_task.data_file_content != DataContentType::Data {
return internal_err!(
"Files of type {:?} should not be in the data files",
scan_task.data_file_content
);
}
yield scan_task;
}
IcebergScanType::EqualityDeleteScan => {
for delete_file in scan_task.deletes {
if delete_file.data_file_content == DataContentType::EqualityDeletes
&& equality_delete_files_set.insert(delete_file.data_file_path.clone())
{
yield delete_file.as_ref().clone()
}
}
}
IcebergScanType::PositionDeleteScan => {
for delete_file in scan_task.deletes {
if delete_file.data_file_content == DataContentType::PositionDeletes
&& position_delete_files_set.insert(delete_file.data_file_path.clone())
{
let mut task = delete_file.as_ref().clone();
task.project_field_ids = Vec::new();
yield task;
}
}
}
_ => {
return not_impl_err!(
"Iceberg scan type {:?} is not supported",
self.iceberg_scan_type
);
}
}
}
}

#[try_stream(ok = RecordBatch, error = DataFusionError)]
pub async fn execute_inner(self: Arc<Self>, chunk_size: usize, partition: usize) {
let mut buffer = RecordBatchBuffer::new(chunk_size);
let reader = self
.table
.reader_builder()
.with_batch_size(chunk_size)
.build();
let task = self.tasks[partition].clone();
let stream = reader
.read(tokio_stream::once(Ok(task)).boxed())
.await
.map_err(to_datafusion_error)?;

#[for_await]
for batch in stream {
for (i, batch) in stream.enumerate() {
let batch = batch.map_err(to_datafusion_error)?;
let batch = append_metadata(
batch,
self.need_seq_num,
self.need_file_path_and_pos,
&self.tasks[partition],
(i * chunk_size).try_into().unwrap(),
)?;
let batch = cast_batch(self.arrow_schema.clone(), batch)?;
if let Some(batch) = buffer.add(batch)? {
yield batch;
}
}

if let Some(batch) = buffer.finish()? {
yield batch;
}
Expand Down Expand Up @@ -233,7 +323,7 @@ impl RecordBatchBuffer {
return Ok(None);
}
let schema_to_use = self.buffer[0].schema();
let batches_to_combine: Vec<_> = std::mem::take(&mut self.buffer);
let batches_to_combine = std::mem::take(&mut self.buffer);
let combined = concat_batches(&schema_to_use, &batches_to_combine)?;
self.current_rows = 0;
Ok(Some(combined))
Expand All @@ -244,6 +334,50 @@ impl RecordBatchBuffer {
}
}

fn append_metadata(
batch: RecordBatch,
need_seq_num: bool,
need_file_path_and_pos: bool,
task: &FileScanTask,
pos_start: i64,
) -> Result<RecordBatch, DataFusionError> {
let mut columns = batch.columns().to_vec();
let mut fields = batch.schema().fields().to_vec();

if need_seq_num {
let seq_num_array = Int64Array::from_value(task.sequence_number, batch.num_rows());
columns.push(Arc::new(seq_num_array));
fields.push(Arc::new(Field::new(
ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
DataType::Int64,
false,
)));
}

if need_file_path_and_pos {
let file_path_array = StringArray::from_iter_values(std::iter::repeat_n(
task.data_file_path(),
batch.num_rows(),
));
let file_pos_array = Int64Array::from_iter((pos_start..).take(batch.num_rows()));
columns.push(Arc::new(file_path_array));
columns.push(Arc::new(file_pos_array));
fields.push(Arc::new(Field::new(
ICEBERG_FILE_PATH_COLUMN_NAME,
DataType::Utf8,
false,
)));
fields.push(Arc::new(Field::new(
ICEBERG_FILE_POS_COLUMN_NAME,
DataType::Int64,
false,
)));
}

let new_schema = Arc::new(Schema::new(fields));
RecordBatch::try_new(new_schema, columns).map_err(to_datafusion_error)
}

fn cast_batch(
target_schema: SchemaRef,
batch: RecordBatch,
Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/datafusion/iceberg_table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,8 @@ impl TableProvider for IcebergTableProvider {
filters: &[Expr],
limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(IcebergScan::new(
self, projection, filters, limit,
)?))
let scan = IcebergScan::new(self, projection, filters, limit).await?;
Ok(Arc::new(scan))
}

fn supports_filters_pushdown(
Expand Down Expand Up @@ -97,7 +96,6 @@ impl IcebergTableProvider {
.core
.column_catalog
.iter()
.filter(|column| !column.is_hidden())
.map(|column| {
let column_desc = &column.column_desc;
let field = IcebergArrowConvert
Expand Down
Loading