Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion e2e_test/ddl/table/watermark.slt.part
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Create a table with watermark.
# Create an append-only table with watermark.
statement ok
create table t1 (v1 int, v2 timestamp with time zone, watermark for v2 as v2 - INTERVAL '10 MINUTES') append only;

# Creating a normal table with watermark should fail.
statement error Defining watermarks on table requires the table to be append only
create table t2 (v1 int, v2 timestamp with time zone, watermark for v2 as v2 - INTERVAL '10 MINUTES');

statement ok
drop table t1;
13 changes: 13 additions & 0 deletions e2e_test/source_inline/kafka/upsert_source.slt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ WITH (
topic = 'test_include_key')
FORMAT UPSERT ENCODE JSON

statement error Defining watermarks on source requires the source connector to be append only
CREATE SOURCE upsert_students_with_watermark (
primary key (rw_key),
ts TIMESTAMP,
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
)
INCLUDE KEY AS rw_key
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_include_key')
FORMAT UPSERT ENCODE JSON


query TT
SHOW CREATE SOURCE upsert_students
----
Expand Down
12 changes: 11 additions & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,16 @@ HINT: use `CREATE SOURCE <name> WITH (...)` instead of `CREATE SOURCE <name> (<c
// TODO(yuhao): allow multiple watermark on source.
assert!(watermark_descs.len() <= 1);

let append_only = row_id_index.is_some();
if is_create_source && !append_only && !watermark_descs.is_empty() {
return Err(ErrorCode::NotSupported(
"Defining watermarks on source requires the source connector to be append only."
.to_owned(),
"Use the key words `FORMAT PLAIN`".to_owned(),
)
.into());
}

bind_sql_column_constraints(
session,
source_name.clone(),
Expand All @@ -1039,7 +1049,7 @@ HINT: use `CREATE SOURCE <name> WITH (...)` instead of `CREATE SOURCE <name> (<c
database_id,
columns,
pk_col_ids,
append_only: row_id_index.is_some(),
append_only,
owner: session.user_id(),
info: source_info,
row_id_index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ pub struct StreamWatermarkFilter {

impl StreamWatermarkFilter {
pub fn new(input: PlanRef, watermark_descs: Vec<WatermarkDesc>) -> Self {
assert!(
input.append_only(),
"StreamWatermarkFilter only supports append-only input, got {}",
input.stream_kind()
);
Comment on lines +39 to +43

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Handle non-append-only inputs in StreamWatermarkFilter

The new assert!(input.append_only()) in StreamWatermarkFilter::new will panic whenever a source carries watermarks but its stream kind is upsert/retract. Older clusters could already contain such sources (the preceding binder check only blocks new ones), and planning a new MV on top of them or re-planning during recovery would now crash the frontend instead of returning a user-facing error. Consider returning a structured ErrorCode here or rejecting the plan earlier so existing non-append-only sources with watermarks don’t bring down the service.

Useful? React with 👍 / 👎.


let ctx = input.ctx();
let mut watermark_columns = input.watermark_columns().clone();
for i in &watermark_descs {
Expand Down
Loading