Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion e2e_test/ddl/table/watermark.slt.part
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Create a table with watermark.
# Create an append-only table with watermark.
statement ok
create table t1 (v1 int, v2 timestamp with time zone, watermark for v2 as v2 - INTERVAL '10 MINUTES') append only;

# Creating a normal table with watermark should fail.
statement error Defining watermarks on table requires the table to be append only
create table t2 (v1 int, v2 timestamp with time zone, watermark for v2 as v2 - INTERVAL '10 MINUTES');

statement ok
drop table t1;
13 changes: 13 additions & 0 deletions e2e_test/source_inline/kafka/upsert_source.slt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ WITH (
topic = 'test_include_key')
FORMAT UPSERT ENCODE JSON

statement error Defining watermarks on source requires the source connector to be append only
CREATE SOURCE upsert_students_with_watermark (
primary key (rw_key),
ts TIMESTAMP,
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
)
INCLUDE KEY AS rw_key
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_include_key')
FORMAT UPSERT ENCODE JSON


query TT
SHOW CREATE SOURCE upsert_students
----
Expand Down
2 changes: 2 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ message WatermarkDesc {
uint32 watermark_idx = 1;
// The expression to calculate the watermark value.
expr.ExprNode expr = 2;
// Whether this watermark is used for TTL (`WITH TTL`).
bool with_ttl = 3;
}

enum SchemaRegistryNameStrategy {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
- id: watermark ttl basic
sql: |
explain create table t (
id int primary key,
ts timestamp,
watermark for ts as ts - interval '1 minute' with ttl
);
expected_outputs:
- explain_output
16 changes: 16 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/watermark_ttl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- id: watermark ttl basic
sql: |
explain create table t (
id int primary key,
ts timestamp,
watermark for ts as ts - interval '1 minute' with ttl
);
explain_output: |
StreamMaterialize { columns: [id, ts], stream_key: [id, ts], pk_columns: [id], pk_conflict: Overwrite, watermark_columns: [ts] }
└─StreamWatermarkFilter [upsert] { watermark_descs: [Desc { column: ts, expr: (ts - '00:01:00':Interval) }], output_watermarks: [[ts]] }
└─StreamUnion { all: true }
├─StreamExchange { dist: HashShard(id) }
│ └─StreamDml { columns: [id, ts] }
│ └─StreamSource
└─StreamUpstreamSinkUnion
20 changes: 19 additions & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,7 @@ pub(super) fn bind_source_watermark(
Ok::<_, RwError>(WatermarkDesc {
watermark_idx: watermark_idx as u32,
expr: Some(expr_proto),
with_ttl: source_watermark.with_ttl,
})
}
})
Expand Down Expand Up @@ -1014,6 +1015,23 @@ HINT: use `CREATE SOURCE <name> WITH (...)` instead of `CREATE SOURCE <name> (<c
bind_source_watermark(session, source_name.clone(), source_watermarks, &columns)?;
// TODO(yuhao): allow multiple watermark on source.
assert!(watermark_descs.len() <= 1);
if is_create_source && watermark_descs.iter().any(|d| d.with_ttl) {
return Err(ErrorCode::NotSupported(
"WITH TTL is not supported in WATERMARK clause for CREATE SOURCE.".to_owned(),
"Use `CREATE TABLE ... WATERMARK ... WITH TTL` instead.".to_owned(),
)
.into());
}

let append_only = row_id_index.is_some();
if is_create_source && !append_only && !watermark_descs.is_empty() {
return Err(ErrorCode::NotSupported(
"Defining watermarks on source requires the source connector to be append only."
.to_owned(),
"Use the key words `FORMAT PLAIN`".to_owned(),
)
.into());
}

bind_sql_column_constraints(
session,
Expand All @@ -1039,7 +1057,7 @@ HINT: use `CREATE SOURCE <name> WITH (...)` instead of `CREATE SOURCE <name> (<c
database_id,
columns,
pk_col_ids,
append_only: row_id_index.is_some(),
append_only,
owner: session.user_id(),
info: source_info,
row_id_index,
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ pub(crate) fn gen_create_table_plan_without_source(
let (mut columns, pk_column_ids, row_id_index) =
bind_pk_and_row_id_on_relation(columns, pk_names, true)?;

let watermark_descs: Vec<WatermarkDesc> = bind_source_watermark(
let watermark_descs = bind_source_watermark(
context.session_ctx(),
table_name.real_value(),
source_watermarks,
Expand Down Expand Up @@ -775,7 +775,9 @@ fn gen_table_plan_inner(
vec![],
);

if !append_only && !watermark_descs.is_empty() {
let has_non_ttl_watermark = watermark_descs.iter().any(|d| !d.with_ttl);

if !append_only && has_non_ttl_watermark {
return Err(ErrorCode::NotSupported(
"Defining watermarks on table requires the table to be append only.".to_owned(),
"Use the key words `APPEND ONLY`".to_owned(),
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,12 @@ impl LogicalPlanRoot {
)
.into();

let ttl_watermark_indices = watermark_descs
.iter()
.filter(|d| d.with_ttl)
.map(|d| d.watermark_idx as usize)
.collect_vec();

// Add WatermarkFilter node.
if !watermark_descs.is_empty() {
stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
Expand Down Expand Up @@ -978,6 +984,7 @@ impl LogicalPlanRoot {
conflict_behavior,
version_column_indices,
pk_column_indices,
ttl_watermark_indices,
row_id_index,
version,
retention_seconds,
Expand Down
13 changes: 11 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use pretty_xmlish::XmlNode;
use risingwave_common::types::DataType;
use risingwave_pb::stream_plan::FilterNode;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;

use super::stream::prelude::*;
use super::utils::impl_distill_by_unit;
use super::{ExprRewritable, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, generic};
use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InputRef};
use crate::optimizer::plan_node::PlanBase;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::generic::DistillUnit as _;
use crate::optimizer::plan_node::utils::{Distill, plan_node_name};
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::Condition;

Expand Down Expand Up @@ -82,7 +84,14 @@ impl PlanTreeNodeUnary<Stream> for StreamFilter {
}

impl_plan_tree_node_for_unary! { Stream, StreamFilter }
impl_distill_by_unit!(StreamFilter, core, "StreamFilter");

impl Distill for StreamFilter {
fn distill<'a>(&self) -> XmlNode<'a> {
self.core.distill_with_name(plan_node_name!("StreamFilter",
{ "upsert", self.input().stream_kind().is_upsert() }
))
}
}

impl StreamNode for StreamFilter {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
Expand Down
18 changes: 14 additions & 4 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ impl StreamMaterialize {
conflict_behavior,
vec![],
None,
vec![],
None,
table_type,
None,
Expand Down Expand Up @@ -190,6 +191,7 @@ impl StreamMaterialize {
conflict_behavior: ConflictBehavior,
version_column_indices: Vec<usize>,
pk_column_indices: Vec<usize>,
ttl_watermark_indices: Vec<usize>,
row_id_index: Option<usize>,
version: TableVersion,
retention_seconds: Option<NonZeroU32>,
Expand All @@ -211,6 +213,7 @@ impl StreamMaterialize {
conflict_behavior,
version_column_indices,
Some(pk_column_indices),
ttl_watermark_indices,
row_id_index,
TableType::Table,
Some(version),
Expand Down Expand Up @@ -312,6 +315,7 @@ impl StreamMaterialize {
conflict_behavior: ConflictBehavior,
version_column_indices: Vec<usize>,
pk_column_indices: Option<Vec<usize>>, // Is some when create table
ttl_watermark_indices: Vec<usize>,
row_id_index: Option<usize>,
table_type: TableType,
version: Option<TableVersion>,
Expand All @@ -331,7 +335,7 @@ impl StreamMaterialize {
// We will record the watermark group information in `TableCatalog` in the future. For now, let's flatten the watermark columns.
let watermark_columns = input.watermark_columns().indices().collect();

let (table_pk, stream_key) = if let Some(pk_column_indices) = pk_column_indices {
let (table_pk, mut stream_key) = if let Some(pk_column_indices) = pk_column_indices {
let table_pk = pk_column_indices
.iter()
.map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
Expand All @@ -341,7 +345,13 @@ impl StreamMaterialize {
} else {
derive_pk(input, user_distributed_by, user_order_by, &columns)
};
// assert: `stream_key` is a subset of `table_pk`

// Add ttl watermark column to stream key.
for idx in ttl_watermark_indices.iter().copied() {
if !stream_key.contains(&idx) {
stream_key.push(idx);
}
}

let read_prefix_len_hint = table_pk.len();
Ok(TableCatalog {
Expand Down Expand Up @@ -372,7 +382,7 @@ impl StreamMaterialize {
cardinality,
created_at_epoch: None,
initialized_at_epoch: None,
cleaned_by_watermark: false,
cleaned_by_watermark: !ttl_watermark_indices.is_empty(),
create_type,
stream_job_status: StreamJobStatus::Creating,
description: None,
Expand All @@ -394,7 +404,7 @@ impl StreamMaterialize {
}
},
clean_watermark_index_in_pk: None, // TODO: fill this field
clean_watermark_indices: vec![], // TODO: fill this field
clean_watermark_indices: ttl_watermark_indices,
refreshable,
vector_index_info: None,
cdc_table_type: None,
Expand Down
16 changes: 15 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanR
use crate::TableCatalog;
use crate::expr::{ExprDisplay, ExprImpl};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::plan_node_name;
use crate::stream_fragmenter::BuildFragmentGraphState;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand All @@ -36,6 +37,14 @@ pub struct StreamWatermarkFilter {

impl StreamWatermarkFilter {
pub fn new(input: PlanRef, watermark_descs: Vec<WatermarkDesc>) -> Self {
if watermark_descs.iter().any(|d| !d.with_ttl) {
assert!(
input.append_only(),
"StreamWatermarkFilter on non-TTL watermark only supports append-only input, got {}",
input.stream_kind()
);
}

let ctx = input.ctx();
let mut watermark_columns = input.watermark_columns().clone();
for i in &watermark_descs {
Expand Down Expand Up @@ -100,7 +109,12 @@ impl Distill for StreamWatermarkFilter {
("watermark_descs", Pretty::Array(display_watermark_descs)),
("output_watermarks", display_output_watermark_groups),
];
childless_record("StreamWatermarkFilter", fields)
childless_record(
plan_node_name!("StreamWatermarkFilter",
{ "upsert", self.input().stream_kind().is_upsert() }
),
fields,
)
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/optimizer/property/stream_kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ impl StreamKind {
matches!(self, Self::AppendOnly)
}

/// Returns `true` if it's [`StreamKind::Upsert`].
pub fn is_upsert(self) -> bool {
matches!(self, Self::Upsert)
}

/// Returns the stream kind representing the merge (union) of the two.
///
/// Note that there should be no conflict on the stream key between the two streams,
Expand Down
8 changes: 7 additions & 1 deletion src/sqlparser/src/ast/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -745,11 +745,17 @@ impl fmt::Display for AlterFragmentOperation {
pub struct SourceWatermark {
pub column: Ident,
pub expr: Expr,
/// Whether `WITH TTL` is specified.
pub with_ttl: bool,
}

impl fmt::Display for SourceWatermark {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "WATERMARK FOR {} AS {}", self.column, self.expr,)
write!(f, "WATERMARK FOR {} AS {}", self.column, self.expr,)?;
if self.with_ttl {
write!(f, " WITH TTL")?;
}
Ok(())
}
}

Expand Down
1 change: 1 addition & 0 deletions src/sqlparser/src/keywords.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ define_keywords!(
TRUE,
TRUNCATE,
TRY_CAST,
TTL,
TYPE,
UESCAPE,
UNBOUNDED,
Expand Down
7 changes: 6 additions & 1 deletion src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2908,7 +2908,12 @@ impl Parser<'_> {
let column = self.parse_identifier_non_reserved()?;
self.expect_keyword(Keyword::AS)?;
let expr = self.parse_expr()?;
Ok(Some(SourceWatermark { column, expr }))
let with_ttl = self.parse_keywords(&[Keyword::WITH, Keyword::TTL]);
Ok(Some(SourceWatermark {
column,
expr,
with_ttl,
}))
} else {
Ok(None)
}
Expand Down
Loading