Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: evolve promql execution engine #5691

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a7274ddce299f33d23dbe8af5bbe6219f07c559a" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "33592e8814ccd0b93081d215d937ca68195a44e4" }
hex = "0.4"
http = "1"
humantime = "2.1"
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/tests/load_config_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ fn test_load_metasrv_example_config() {
tracing_sample_ratio: Some(Default::default()),
slow_query: SlowQueryOptions {
enable: false,
threshold: Some(Duration::from_secs(10)),
sample_ratio: Some(1.0),
threshold: None,
sample_ratio: None,
},
..Default::default()
},
Expand Down
2 changes: 1 addition & 1 deletion src/promql/src/extension_plan/histogram_fold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl ExecutionPlan for HistogramFoldExec {
}

fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition; self.children().len()]
self.input.required_input_distribution()
}

fn maintains_input_order(&self) -> Vec<bool> {
Expand Down
2 changes: 1 addition & 1 deletion src/promql/src/extension_plan/instant_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,9 @@ impl Stream for InstantManipulateStream {
type Item = DataFusionResult<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let timer = std::time::Instant::now();
let poll = match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let timer = std::time::Instant::now();
self.num_series.add(1);
let result = Ok(batch).and_then(|batch| self.manipulate(batch));
self.metric.elapsed_compute().add_elapsed(timer);
Expand Down
57 changes: 34 additions & 23 deletions src/promql/src/extension_plan/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use datafusion::common::{DFSchema, DFSchemaRef, Result as DataFusionResult, Stat
use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_plan::expressions::Column as ColumnExpr;
use datafusion::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
};
Expand All @@ -32,7 +33,6 @@ use datafusion::physical_plan::{
};
use datatypes::arrow::array::TimestampMillisecondArray;
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::error::Result as ArrowResult;
use datatypes::arrow::record_batch::RecordBatch;
use futures::{ready, Stream, StreamExt};
use greptime_proto::substrait_extension as pb;
Expand All @@ -55,6 +55,7 @@ pub struct SeriesNormalize {
offset: Millisecond,
time_index_column_name: String,
need_filter_out_nan: bool,
tag_columns: Vec<String>,

input: LogicalPlan,
}
Expand Down Expand Up @@ -100,6 +101,7 @@ impl UserDefinedLogicalNodeCore for SeriesNormalize {
time_index_column_name: self.time_index_column_name.clone(),
need_filter_out_nan: self.need_filter_out_nan,
input: inputs.into_iter().next().unwrap(),
tag_columns: self.tag_columns.clone(),
})
}
}
Expand All @@ -109,12 +111,14 @@ impl SeriesNormalize {
offset: Millisecond,
time_index_column_name: N,
need_filter_out_nan: bool,
tag_columns: Vec<String>,
input: LogicalPlan,
) -> Self {
Self {
offset,
time_index_column_name: time_index_column_name.as_ref().to_string(),
need_filter_out_nan,
tag_columns,
input,
}
}
Expand All @@ -129,6 +133,7 @@ impl SeriesNormalize {
time_index_column_name: self.time_index_column_name.clone(),
need_filter_out_nan: self.need_filter_out_nan,
input: exec_input,
tag_columns: self.tag_columns.clone(),
metric: ExecutionPlanMetricsSet::new(),
})
}
Expand All @@ -138,6 +143,7 @@ impl SeriesNormalize {
offset: self.offset,
time_index: self.time_index_column_name.clone(),
filter_nan: self.need_filter_out_nan,
tag_columns: self.tag_columns.clone(),
}
.encode_to_vec()
}
Expand All @@ -152,6 +158,7 @@ impl SeriesNormalize {
pb_normalize.offset,
pb_normalize.time_index,
pb_normalize.filter_nan,
pb_normalize.tag_columns,
placeholder_plan,
))
}
Expand All @@ -162,6 +169,7 @@ pub struct SeriesNormalizeExec {
offset: Millisecond,
time_index_column_name: String,
need_filter_out_nan: bool,
tag_columns: Vec<String>,

input: Arc<dyn ExecutionPlan>,
metric: ExecutionPlanMetricsSet,
Expand All @@ -177,7 +185,14 @@ impl ExecutionPlan for SeriesNormalizeExec {
}

fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition]
let schema = self.input.schema();
vec![Distribution::HashPartitioned(
self.tag_columns
.iter()
// Safety: the tag column names is verified in the planning phase
.map(|tag| Arc::new(ColumnExpr::new_with_schema(tag, &schema).unwrap()) as _)
.collect(),
)]
}

fn properties(&self) -> &PlanProperties {
Expand All @@ -198,6 +213,7 @@ impl ExecutionPlan for SeriesNormalizeExec {
time_index_column_name: self.time_index_column_name.clone(),
need_filter_out_nan: self.need_filter_out_nan,
input: children[0].clone(),
tag_columns: self.tag_columns.clone(),
metric: self.metric.clone(),
}))
}
Expand Down Expand Up @@ -288,31 +304,24 @@ impl SeriesNormalizeStream {

// bias the timestamp column by offset
let ts_column_biased = if self.offset == 0 {
ts_column.clone()
Arc::new(ts_column.clone()) as _
} else {
TimestampMillisecondArray::from_iter(
Arc::new(TimestampMillisecondArray::from_iter(
ts_column.iter().map(|ts| ts.map(|ts| ts + self.offset)),
)
))
};
let mut columns = input.columns().to_vec();
columns[self.time_index] = Arc::new(ts_column_biased);

// sort the record batch
let ordered_indices = compute::sort_to_indices(&columns[self.time_index], None, None)?;
let ordered_columns = columns
.iter()
.map(|array| compute::take(array, &ordered_indices, None))
.collect::<ArrowResult<Vec<_>>>()?;
let ordered_batch = RecordBatch::try_new(input.schema(), ordered_columns)?;
columns[self.time_index] = ts_column_biased;

let result_batch = RecordBatch::try_new(input.schema(), columns)?;
if !self.need_filter_out_nan {
return Ok(ordered_batch);
return Ok(result_batch);
}

// TODO(ruihang): consider the "special NaN"
// filter out NaN
let mut filter = vec![true; input.num_rows()];
for column in ordered_batch.columns() {
for column in result_batch.columns() {
if let Some(float_column) = column.as_any().downcast_ref::<Float64Array>() {
for (i, flag) in filter.iter_mut().enumerate() {
if float_column.value(i).is_nan() {
Expand All @@ -322,7 +331,7 @@ impl SeriesNormalizeStream {
}
}

let result = compute::filter_record_batch(&ordered_batch, &BooleanArray::from(filter))
let result = compute::filter_record_batch(&result_batch, &BooleanArray::from(filter))
.map_err(|e| DataFusionError::ArrowError(e, None))?;
Ok(result)
}
Expand All @@ -338,10 +347,10 @@ impl Stream for SeriesNormalizeStream {
type Item = DataFusionResult<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let timer = std::time::Instant::now();
let poll = match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
self.num_series.add(1);
let timer = std::time::Instant::now();
let result = Ok(batch).and_then(|batch| self.normalize(batch));
self.metric.elapsed_compute().add_elapsed(timer);
Poll::Ready(Some(result))
Expand Down Expand Up @@ -399,6 +408,7 @@ mod test {
time_index_column_name: TIME_INDEX_COLUMN.to_string(),
need_filter_out_nan: true,
input: memory_exec,
tag_columns: vec!["path".to_string()],
metric: ExecutionPlanMetricsSet::new(),
});
let session_context = SessionContext::default();
Expand All @@ -413,11 +423,11 @@ mod test {
"+---------------------+--------+------+\
\n| timestamp | value | path |\
\n+---------------------+--------+------+\
\n| 1970-01-01T00:01:00 | 0.0 | foo |\
\n| 1970-01-01T00:02:00 | 1.0 | foo |\
\n| 1970-01-01T00:00:00 | 10.0 | foo |\
\n| 1970-01-01T00:00:30 | 100.0 | foo |\
\n| 1970-01-01T00:01:00 | 0.0 | foo |\
\n| 1970-01-01T00:01:30 | 1000.0 | foo |\
\n| 1970-01-01T00:02:00 | 1.0 | foo |\
\n+---------------------+--------+------+",
);

Expand All @@ -428,11 +438,12 @@ mod test {
async fn test_offset_record_batch() {
let memory_exec = Arc::new(prepare_test_data());
let normalize_exec = Arc::new(SeriesNormalizeExec {
offset: 1_000, // offset 1s
offset: 1_000,
time_index_column_name: TIME_INDEX_COLUMN.to_string(),
need_filter_out_nan: true,
input: memory_exec,
metric: ExecutionPlanMetricsSet::new(),
tag_columns: vec!["path".to_string()],
});
let session_context = SessionContext::default();
let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
Expand All @@ -446,11 +457,11 @@ mod test {
"+---------------------+--------+------+\
\n| timestamp | value | path |\
\n+---------------------+--------+------+\
\n| 1970-01-01T00:01:01 | 0.0 | foo |\
\n| 1970-01-01T00:02:01 | 1.0 | foo |\
\n| 1970-01-01T00:00:01 | 10.0 | foo |\
\n| 1970-01-01T00:00:31 | 100.0 | foo |\
\n| 1970-01-01T00:01:01 | 0.0 | foo |\
\n| 1970-01-01T00:01:31 | 1000.0 | foo |\
\n| 1970-01-01T00:02:01 | 1.0 | foo |\
\n+---------------------+--------+------+",
);

Expand Down
16 changes: 11 additions & 5 deletions src/promql/src/extension_plan/range_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ impl ExecutionPlan for RangeManipulateExec {
}

fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition]
self.input.required_input_distribution()
}

fn with_new_children(
Expand Down Expand Up @@ -564,18 +564,24 @@ impl RangeManipulateStream {
let mut ranges = vec![];

// calculate for every aligned timestamp (`curr_ts`), assume the ts column is ordered.
let mut range_start_index = 0usize;
for curr_ts in (self.start..=self.end).step_by(self.interval as _) {
let mut range_start = ts_column.len();
let mut range_end = 0;
for (index, ts) in ts_column.values().iter().enumerate() {
let mut cursor = range_start_index;
while cursor < ts_column.len() {
let ts = ts_column.value(cursor);
if ts + self.range >= curr_ts {
range_start = range_start.min(index);
range_start = range_start.min(cursor);
range_start_index = range_start;
}
if *ts <= curr_ts {
range_end = range_end.max(index);
if ts <= curr_ts {
range_end = range_end.max(cursor);
} else {
range_start_index = range_start_index.checked_sub(1usize).unwrap_or_default();
break;
}
cursor += 1;
}
if range_start > range_end {
ranges.push((0, 0));
Expand Down
2 changes: 1 addition & 1 deletion src/promql/src/extension_plan/scalar_calculate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl ExecutionPlan for ScalarCalculateExec {
}

fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition]
self.input.required_input_distribution()
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
Expand Down
Loading
Loading