diff --git a/e2e_test/ddl/table/watermark.slt.part b/e2e_test/ddl/table/watermark.slt.part index 1fea6ed534cca..4a52c2ed34372 100644 --- a/e2e_test/ddl/table/watermark.slt.part +++ b/e2e_test/ddl/table/watermark.slt.part @@ -1,6 +1,10 @@ -# Create a table with watermark. +# Create an append-only table with watermark. statement ok create table t1 (v1 int, v2 timestamp with time zone, watermark for v2 as v2 - INTERVAL '10 MINUTES') append only; +# Creating a normal table with watermark should fail. +statement error Defining watermarks on table requires the table to be append only +create table t2 (v1 int, v2 timestamp with time zone, watermark for v2 as v2 - INTERVAL '10 MINUTES'); + statement ok drop table t1; diff --git a/e2e_test/source_inline/kafka/upsert_source.slt b/e2e_test/source_inline/kafka/upsert_source.slt index 3b4c8877e27e0..b47c97c4e54f2 100644 --- a/e2e_test/source_inline/kafka/upsert_source.slt +++ b/e2e_test/source_inline/kafka/upsert_source.slt @@ -23,6 +23,19 @@ WITH ( topic = 'test_include_key') FORMAT UPSERT ENCODE JSON +statement error Defining watermarks on source requires the source connector to be append only +CREATE SOURCE upsert_students_with_watermark ( + primary key (rw_key), + ts TIMESTAMP, + WATERMARK FOR ts AS ts - INTERVAL '5' SECOND +) +INCLUDE KEY AS rw_key +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_include_key') +FORMAT UPSERT ENCODE JSON + + query TT SHOW CREATE SOURCE upsert_students ---- diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 6bc8ebd896073..5af80a7097eb7 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -1015,6 +1015,16 @@ HINT: use `CREATE SOURCE WITH (...)` instead of `CREATE SOURCE ( WITH (...)` instead of `CREATE SOURCE () -> Self { + assert!( + input.append_only(), + "StreamWatermarkFilter only supports append-only input, got {}", + input.stream_kind() + ); + let ctx = input.ctx(); let mut watermark_columns = input.watermark_columns().clone(); for i in &watermark_descs {