Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,11 @@ FLUSH;

sleep 5s

# query ??? rowsort
# SELECT id, price, date FROM price WHERE date <= DATE '2025-02-10' - interval '5' day;
# ----
# 1 100.5000000000 2025-02-01
# 2 200.7500000000 2025-02-05
query ??? rowsort
SELECT id, price, date FROM price WHERE date <= DATE '2025-02-10' - interval '5' day;
----
1 100.5000000000 2025-02-01
2 200.7500000000 2025-02-05

query ??? rowsort
SELECT id, price, date FROM price WHERE price < 180.20
Expand Down
83 changes: 74 additions & 9 deletions src/frontend/src/datafusion/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use datafusion::arrow::datatypes::DataType as DFDataType;
use datafusion::prelude::Expr as DFExpr;
use datafusion_common::{Column, DFSchema, JoinType as DFJoinType, ScalarValue};
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::Schema as RwSchema;
use risingwave_common::types::{DataType as RwDataType, ScalarImpl};
use risingwave_pb::plan_common::JoinType as RwJoinType;

use crate::datafusion::convert_function_call;
use crate::error::Result as RwResult;
use crate::expr::{Expr, ExprImpl};

pub fn convert_expr(expr: &ExprImpl, input_schema: &impl ColumnTrait) -> RwResult<DFExpr> {
pub fn convert_expr(expr: &ExprImpl, input_columns: &impl ColumnTrait) -> RwResult<DFExpr> {
match expr {
ExprImpl::Literal(lit) => {
let scalar = match lit.get_data() {
Expand All @@ -32,8 +34,10 @@ pub fn convert_expr(expr: &ExprImpl, input_schema: &impl ColumnTrait) -> RwResul
};
Ok(DFExpr::Literal(scalar, None))
}
ExprImpl::InputRef(input_ref) => Ok(DFExpr::Column(input_schema.column(input_ref.index()))),
ExprImpl::FunctionCall(func_call) => convert_function_call(func_call, input_schema),
ExprImpl::InputRef(input_ref) => {
Ok(DFExpr::Column(input_columns.column(input_ref.index())))
}
ExprImpl::FunctionCall(func_call) => convert_function_call(func_call, input_columns),
_ => bail_not_implemented!("DataFusionPlanConverter: unsupported expression {:?}", expr),
}
}
Expand Down Expand Up @@ -90,24 +94,85 @@ pub trait ColumnTrait {
}
fn len(&self) -> usize;
fn column(&self, index: usize) -> Column;
fn rw_data_type(&self, index: usize) -> RwDataType;
fn df_data_type(&self, index: usize) -> DFDataType;
}

impl ColumnTrait for DFSchema {
impl<'a> ColumnTrait for (&'a DFSchema, &'a RwSchema) {
fn len(&self) -> usize {
self.fields().len()
self.0.fields().len()
}

fn column(&self, index: usize) -> Column {
Column::from(self.qualified_field(index))
Column::from(self.0.qualified_field(index))
}

fn rw_data_type(&self, index: usize) -> RwDataType {
let field = &self.1.fields[index];
field.data_type()
}

fn df_data_type(&self, index: usize) -> DFDataType {
self.0.field(index).data_type().clone()
}
}

pub struct ConcatColumns<'a> {
df_left: &'a DFSchema,
rw_left: &'a RwSchema,
df_right: &'a DFSchema,
rw_right: &'a RwSchema,
left_len: usize,
}

impl<'a> ConcatColumns<'a> {
pub fn new(
df_left: &'a DFSchema,
rw_left: &'a RwSchema,
df_right: &'a DFSchema,
rw_right: &'a RwSchema,
) -> Self {
Self {
df_left,
rw_left,
df_right,
rw_right,
left_len: df_left.fields().len(),
}
}
}

impl ColumnTrait for Vec<Column> {
impl ColumnTrait for ConcatColumns<'_> {
fn len(&self) -> usize {
Vec::len(self)
self.df_left.fields().len() + self.df_right.fields().len()
}

fn column(&self, index: usize) -> Column {
self[index].clone()
if index < self.left_len {
Column::from(self.df_left.qualified_field(index))
} else {
Column::from(self.df_right.qualified_field(index - self.left_len))
}
}

fn rw_data_type(&self, index: usize) -> RwDataType {
if index < self.left_len {
let field = &self.rw_left.fields[index];
field.data_type()
} else {
let field = &self.rw_right.fields[index - self.left_len];
field.data_type()
}
}

fn df_data_type(&self, index: usize) -> DFDataType {
if index < self.left_len {
self.df_left.field(index).data_type().clone()
} else {
self.df_right
.field(index - self.left_len)
.data_type()
.clone()
}
}
}
152 changes: 98 additions & 54 deletions src/frontend/src/datafusion/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,21 @@

use std::sync::Arc;

use datafusion::arrow::datatypes::Field;
use datafusion::config::ConfigOptions;
use datafusion::physical_plan::execute_stream;
use datafusion::prelude::{SessionConfig as DFSessionConfig, SessionContext as DFSessionContext};
use datafusion_common::DFSchema;
use datafusion_common::arrow::datatypes::DataType as DFDataType;
use futures_async_stream::for_await;
use pgwire::pg_field_descriptor::PgFieldDescriptor;
use pgwire::pg_response::{PgResponse, StatementType};
use pgwire::types::Format;
use risingwave_common::array::DataChunk;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::catalog::Schema;
use risingwave_common::catalog::Schema as RwSchema;
use risingwave_common::error::BoxedError;
use risingwave_common::types::DataType;
use risingwave_common::types::DataType as RwDataType;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::expr::{BoxedExpression, build_from_prost};
use tokio::sync::mpsc;
Expand All @@ -38,7 +43,7 @@ use crate::session::SessionImpl;

pub struct DfBatchQueryPlanResult {
pub(crate) plan: Arc<datafusion::logical_expr::LogicalPlan>,
pub(crate) schema: Schema,
pub(crate) schema: RwSchema,
pub(crate) stmt_type: StatementType,
}

Expand All @@ -54,30 +59,36 @@ pub async fn execute_datafusion_plan(
let pg_descs: Vec<PgFieldDescriptor> = plan.schema.fields().iter().map(to_pg_field).collect();
let column_types = plan.schema.fields().iter().map(|f| f.data_type()).collect();

// avoid optimizing by datafusion
// TODO: some optimizing rules will cause inconsistency, need to investigate later
// Currently we disable all optimizing rules to ensure correctness
let df_plan = state.analyzer().execute_and_check(
plan.plan.as_ref().clone(),
&ConfigOptions::default(),
|_, _| {},
)?;
Comment on lines +62 to +68
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment states "Currently we disable all optimizing rules" but the code calls state.analyzer().execute_and_check() which may still run analysis passes. If the intent is to completely disable optimization, consider clarifying what this analyzer call does and whether it's truly disabling all optimization rules, or update the comment to accurately reflect what analysis/optimization is still being performed.

Copilot uses AI. Check for mistakes.
let physical_plan = state
.query_planner()
.create_physical_plan(&plan.plan, &state)
.create_physical_plan(&df_plan, &state)
.await?;
let data_stream = execute_stream(physical_plan, ctx.task_ctx())?;

let compute_runtime = session.env().compute_runtime();
let (sender1, receiver) = mpsc::channel(10);
let shutdown_rx = session.reset_cancel_query_flag();
let sender2 = sender1.clone();
let cast_executor = build_cast_executor(&plan.schema)?;
let cast_executor = CastExecutor::new(plan.plan.schema().as_ref(), &plan.schema)?;
let exec = async move {
#[futures_async_stream::for_await]
#[for_await]
for record in data_stream {
let res: std::result::Result<DataChunk, BoxedError> = async {
let res: Result<DataChunk, BoxedError> = async {
let record = record?;
if shutdown_rx.is_cancelled() {
Err(SchedulerError::QueryCancelled(
"Cancelled by user".to_owned(),
))?;
}
let chunk = IcebergArrowConvert.chunk_from_record_batch(&record)?;
let chunk = cast_executor.execute(&chunk).await?;
let chunk = cast_executor.execute(chunk).await?;
Ok(chunk)
}
.await;
Expand All @@ -94,21 +105,22 @@ pub async fn execute_datafusion_plan(
};
if let Some(timeout) = timeout {
let exec = async move {
if tokio::time::timeout(timeout, exec).await.is_err() {
tracing::error!(
"Datafusion query execution timeout after {} seconds",
timeout.as_secs()
);
if sender1
.send(Err(Box::new(SchedulerError::QueryCancelled(format!(
"timeout after {} seconds",
timeout.as_secs(),
))) as BoxedError))
.await
.is_err()
{
tracing::info!("Receiver closed.");
}
if tokio::time::timeout(timeout, exec).await.is_ok() {
return;
}
tracing::error!(
"Datafusion query execution timeout after {} seconds",
timeout.as_secs()
);
if sender1
.send(Err(Box::new(SchedulerError::QueryCancelled(format!(
"timeout after {} seconds",
timeout.as_secs(),
))) as BoxedError))
.await
.is_err()
{
tracing::info!("Receiver closed.");
}
};
compute_runtime.spawn(exec);
Expand All @@ -131,31 +143,77 @@ pub async fn execute_datafusion_plan(
.into())
}

struct CastExecutor {
#[derive(Debug)]
pub struct CastExecutor {
executors: Vec<Option<BoxedExpression>>,
}

fn build_cast_executor(schema: &Schema) -> RwResult<CastExecutor> {
let mut executors = Vec::with_capacity(schema.fields().len());
for (i, field) in schema.fields().iter().enumerate() {
let target_type = field.data_type();
let source_type = IcebergArrowConvert
.type_from_field(&IcebergArrowConvert.to_arrow_field("", &target_type)?)?;

if source_type == target_type {
executors.push(None);
} else {
let cast_executor = build_single_cast_executor(i, source_type, target_type)?;
executors.push(Some(cast_executor));
impl CastExecutor {
pub fn new(df_schema: &DFSchema, rw_schema: &RwSchema) -> RwResult<Self> {
let mut executors = Vec::with_capacity(df_schema.fields().len());
for (i, (df_field, rw_field)) in df_schema
.fields()
.iter()
.zip_eq_fast(rw_schema.fields().iter())
.enumerate()
{
let target_type = rw_field.data_type();
let source_type = IcebergArrowConvert.type_from_field(df_field)?;

if source_type == target_type {
executors.push(None);
} else {
let cast_executor = build_single_cast_executor(i, source_type, target_type)?;
executors.push(Some(cast_executor));
}
}
Ok(CastExecutor { executors })
}

pub fn from_iter(
source_types: impl ExactSizeIterator<Item = DFDataType>,
target_types: impl ExactSizeIterator<Item = RwDataType>,
) -> RwResult<Self> {
if source_types.len() != target_types.len() {
return Err(ErrorCode::InternalError(format!(
"source types length {} not equal to target types length {}",
source_types.len(),
target_types.len()
))
.into());
}
let mut executors = Vec::with_capacity(target_types.len());
for (i, (source_type, target_type)) in source_types.zip_eq_fast(target_types).enumerate() {
let source_type =
IcebergArrowConvert.type_from_field(&Field::new("", source_type.clone(), true))?;

if source_type == target_type {
executors.push(None);
} else {
let cast_executor = build_single_cast_executor(i, source_type, target_type)?;
executors.push(Some(cast_executor));
}
}
Ok(CastExecutor { executors })
}

pub async fn execute(&self, chunk: DataChunk) -> RwResult<DataChunk> {
let mut arrays = Vec::with_capacity(chunk.columns().len());
for (exe, col) in self.executors.iter().zip_eq_fast(chunk.columns()) {
if let Some(exe) = exe {
arrays.push(exe.eval(&chunk).await?);
} else {
arrays.push(col.clone());
}
}
Ok(DataChunk::new(arrays, chunk.into_parts_v2().1))
}
Ok(CastExecutor { executors })
}

fn build_single_cast_executor(
idx: usize,
source_type: DataType,
target_type: DataType,
source_type: RwDataType,
target_type: RwDataType,
) -> RwResult<BoxedExpression> {
let expr: ExprImpl = InputRef::new(idx, source_type).into();
let expr = expr.cast_explicit(&target_type)?;
Expand All @@ -167,20 +225,6 @@ fn build_single_cast_executor(
Ok(res)
}

impl CastExecutor {
pub async fn execute(&self, chunk: &DataChunk) -> RwResult<DataChunk> {
let mut arrays = Vec::with_capacity(chunk.columns().len());
for (exe, col) in self.executors.iter().zip_eq_fast(chunk.columns()) {
if let Some(exe) = exe {
arrays.push(exe.eval(chunk).await?);
} else {
arrays.push(col.clone());
}
}
Ok(DataChunk::new(arrays, chunk.visibility().clone()))
}
}

fn create_config(session: &SessionImpl) -> DFSessionConfig {
let rw_config = session.config();

Expand Down
Loading
Loading