Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions e2e_test/iceberg/test_case/pure_slt/iceberg_datafusion_engine.slt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ statement ok
FLUSH;

sleep 5s

query ??
select * from t;
----
Expand All @@ -43,6 +44,44 @@ select * from t for system_time as of '2222-12-10 11:48:06';
----
1 xxx

statement ok
insert into t select i, repeat('x', i + 2) from generate_series(2,5) as s(i);

statement ok
FLUSH;

sleep 5s

statement ok
delete from t where id = 1;

statement ok
FLUSH;

sleep 5s

query ?? rowsort
select * from t;
----
2 xxxx
3 xxxxx
4 xxxxxx
5 xxxxxxx

statement ok
delete from t where id >= 4;

statement ok
FLUSH;

sleep 5s

query ?? rowsort
select * from t;
----
2 xxxx
3 xxxxx

statement ok
DROP TABLE t;

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ impl IcebergSplitEnumerator {
/// The time complexity is O(n log k), where n is the number of file scan tasks and k is the number of splits.
/// The space complexity is O(k), where k is the number of splits.
/// The algorithm is stable, so the order of the file scan tasks will be preserved.
fn split_n_vecs(
pub fn split_n_vecs(
file_scan_tasks: Vec<FileScanTask>,
split_num: usize,
) -> Vec<Vec<FileScanTask>> {
Expand Down
19 changes: 14 additions & 5 deletions src/frontend/src/datafusion/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::Arc;

use datafusion::physical_plan::execute_stream;
use datafusion::prelude::SessionContext;
use datafusion::prelude::{SessionConfig as DFSessionConfig, SessionContext as DFSessionContext};
use pgwire::pg_field_descriptor::PgFieldDescriptor;
use pgwire::pg_response::{PgResponse, StatementType};
use pgwire::types::Format;
Expand Down Expand Up @@ -47,13 +47,11 @@ pub async fn execute_datafusion_plan(
plan: DfBatchQueryPlanResult,
formats: Vec<Format>,
) -> RwResult<RwPgResponse> {
let ctx = SessionContext::new();
let df_config = create_config(session.as_ref());
let ctx = DFSessionContext::new_with_config(df_config);
let state = ctx.state();

// TODO: update datafusion context with risingwave session info

let pg_descs: Vec<PgFieldDescriptor> = plan.schema.fields().iter().map(to_pg_field).collect();

let column_types = plan.schema.fields().iter().map(|f| f.data_type()).collect();

// avoid optimizing by datafusion
Expand Down Expand Up @@ -182,3 +180,14 @@ impl CastExecutor {
Ok(DataChunk::new(arrays, chunk.visibility().clone()))
}
}

fn create_config(session: &SessionImpl) -> DFSessionConfig {
let rw_config = session.config();

let mut df_config = DFSessionConfig::new();
if let Some(batch_parallelism) = rw_config.batch_parallelism().0 {
df_config = df_config.with_target_partitions(batch_parallelism.get().try_into().unwrap());
}
df_config = df_config.with_batch_size(session.env().batch_config().developer.chunk_size);
df_config
}
Loading
Loading