Because of the disorder of ChangeLog data caused by Shuffle in distributed system, the data received by Sink may not be the order of global upsert. So add upsert materialize operator before upsert sink. It receives the upstream changelog records and generate an upsert view for the downstream. By default, the materialize operator will be added when a distributed disorder occurs on unique keys. You can also choose no materialization(NONE) or force materialization(FORCE).
When using strategy=ADAPTIVE, defines the number of entries per key when the implementation is changed from VALUE to MAP. If not specified, Flink uses state-backend specific defaults (400 for hashmap state backend and 50 for RocksDB and the rest). The option takes effect during job (re)starting
When using strategy=ADAPTIVE, defines the number of entries per key when the implementation is changed from MAP to VALUE. If not specified, Flink uses state-backend specific defaults (300 for hashmap state backend and 40 for RocksDB and the rest). The option takes effect during job (re)starting
+
+
+
table.exec.sink.upsert-materialize-strategy.type
Streaming
+
LEGACY
+
Enum
+
Which strategy of SinkUpsertMaterializer to use. Supported strategies: LEGACY: Simple implementation based on ValueState<List> (the original implementation). MAP: OrderedMultiSetState-based implementation based on a combination of several MapState maintaining ordering and fast lookup properties. VALUE: Similar to LEGACY, but compatible with MAP and therefore allows to switch to ADAPTIVE. ADAPTIVE: Alternate between MAP and VALUE depending on the number of entries for the given key starting with VALUE and switching to MAP upon reaching threshold.high value (and back to VALUE, when reaching low). The default is LEGACY The option takes effect during planning / compile plan generation. Existing jobs won't be affected by this option.
Possible values:
"LEGACY"
"MAP"
"VALUE"
"ADAPTIVE"
+
table.exec.sort.async-merge-enabled
Batch
true
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 1e0fb43ebfec0..b259d65d9cef8 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -159,6 +159,68 @@ public class ExecutionConfigOptions {
+ "or force materialization(FORCE).")
.build());
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption
+ TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW =
+ key("table.exec.sink.upsert-materialize-strategy.adaptive.threshold.low")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "When using strategy=ADAPTIVE, defines the number of entries per key when the implementation is changed from MAP to VALUE. "
+ + "If not specified, Flink uses state-backend specific defaults (300 for hashmap state backend and 40 for RocksDB and the rest).")
+ .linebreak()
+ .text("The option takes effect during job (re)starting")
+ .linebreak()
+ .build());
+
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption
+ TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH =
+ key("table.exec.sink.upsert-materialize-strategy.adaptive.threshold.high")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "When using strategy=ADAPTIVE, defines the number of entries per key when the implementation is changed from VALUE to MAP. "
+ + "If not specified, Flink uses state-backend specific defaults (400 for hashmap state backend and 50 for RocksDB and the rest).")
+ .linebreak()
+ .text("The option takes effect during job (re)starting")
+ .linebreak()
+ .build());
+
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption
+ TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY =
+ key("table.exec.sink.upsert-materialize-strategy.type")
+ .enumType(SinkUpsertMaterializeStrategy.class)
+ .defaultValue(SinkUpsertMaterializeStrategy.LEGACY)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Which strategy of SinkUpsertMaterializer to use. Supported strategies:")
+ .linebreak()
+ .text(
+ "LEGACY: Simple implementation based on ValueState (the original implementation).")
+ .linebreak()
+ .text(
+ "MAP: OrderedMultiSetState-based implementation based on a combination of several MapState maintaining ordering and fast lookup properties.")
+ .linebreak()
+ .text(
+ "VALUE: Similar to LEGACY, but compatible with MAP and therefore allows to switch to ADAPTIVE.")
+ .linebreak()
+ .text(
+ "ADAPTIVE: Alternate between MAP and VALUE depending on the number of entries for the given key starting with VALUE and switching to MAP upon reaching threshold.high value (and back to VALUE, when reaching low).")
+ .linebreak()
+ .text("The default is LEGACY")
+ .linebreak()
+ .text(
+ "The option takes effect during planning / compile plan generation. Existing jobs won't be affected by this option.")
+ .linebreak()
+ .build());
+
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption TABLE_EXEC_SINK_KEYED_SHUFFLE =
key("table.exec.sink.keyed-shuffle")
@@ -883,6 +945,43 @@ public enum RetryStrategy {
FIXED_DELAY
}
+ /** SinkUpsertMaterializer strategy. */
+ @PublicEvolving
+ public enum SinkUpsertMaterializeStrategy {
+ /**
+ * Simple implementation based on {@code ValueState} (the original implementation).
+ *
+ *
+ *
optimal for cases with history under approx. 100 elements
+ *
limited TTL support (per key granularity, i.e. no expiration for old history
+ * elements)
+ *
+ */
+ LEGACY,
+ /**
+ * OrderedMultiSetState-based implementation based on a combination of several MapState
+ * maintaining ordering and fast lookup properties.
+ *
+ *
+ *
faster and more memory-efficient on long histories
+ *
slower on short histories
+ *
currently, no TTL support (to be added in the future)
+ *
requires more space
+ *
+ */
+ MAP,
+ /**
+ * Similar to LEGACY, but compatible with MAP and therefore allows to switch to ADAPTIVE.
+ */
+ VALUE,
+ /**
+ * Alternate between MAP and VALUE depending on the number of entries for the given key
+ * starting with VALUE and switching to MAP upon reaching threshold.high value (and back to
+ * VALUE, when reaching low).
+ */
+ ADAPTIVE
+ }
+
/** Determine if CAST operates using the legacy behaviour or the new one. */
@Deprecated
public enum LegacyCastBehaviour implements DescribedEnum {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java
index 0916ea6d37d51..b4d34993649f2 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java
@@ -45,11 +45,17 @@
public class ProjectedRowData implements RowData {
private final int[] indexMapping;
+ private final boolean isNullAtNonProjected;
private RowData row;
private ProjectedRowData(int[] indexMapping) {
+ this(indexMapping, false);
+ }
+
+ protected ProjectedRowData(int[] indexMapping, boolean isNullAtNonProjected) {
this.indexMapping = indexMapping;
+ this.isNullAtNonProjected = isNullAtNonProjected;
}
/**
@@ -82,7 +88,8 @@ public void setRowKind(RowKind kind) {
@Override
public boolean isNullAt(int pos) {
- return row.isNullAt(indexMapping[pos]);
+ return (pos >= indexMapping.length && isNullAtNonProjected)
+ || row.isNullAt(indexMapping[pos]);
}
@Override
@@ -183,9 +190,20 @@ public String toString() {
+ Arrays.toString(indexMapping)
+ ", mutableRow="
+ row
+ + ", isNullAtNonProjected="
+ + isNullAtNonProjected
+ '}';
}
+ /**
+ * Returns a new {@link ProjectedRowData} that, depending on isNullAtNonProjected, returns null
+ * from {@link #isNullAt} if the index is out of range or throws {@link
+ * ArrayIndexOutOfBoundsException}.
+ */
+ public ProjectedRowData withNullAtNonProjected(boolean isNullAtNonProjected) {
+ return new ProjectedRowData(this.indexMapping, isNullAtNonProjected);
+ }
+
/**
* Like {@link #from(int[])}, but throws {@link IllegalArgumentException} if the provided {@code
* projection} array contains nested projections, which are not supported by {@link
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/RowTypeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/RowTypeUtils.java
similarity index 95%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/RowTypeUtils.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/RowTypeUtils.java
index 4d9879d7b9980..d70c3ef1b98d9 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/RowTypeUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/RowTypeUtils.java
@@ -16,8 +16,9 @@
* limitations under the License.
*/
-package org.apache.flink.table.planner.typeutils;
+package org.apache.flink.table.typeutils;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -28,7 +29,8 @@
import java.util.Collections;
import java.util.List;
-/** Utils for deriving row types of {@link org.apache.calcite.rel.RelNode}s. */
+/** Utils for deriving row types of org.apache.calcite.rel.RelNode. */
+@Internal
public class RowTypeUtils {
public static String getUniqueName(String oldName, List checklist) {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/typeutils/RowTypeUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/typeutils/RowTypeUtilsTest.java
similarity index 98%
rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/typeutils/RowTypeUtilsTest.java
rename to flink-table/flink-table-common/src/test/java/org/apache/flink/table/typeutils/RowTypeUtilsTest.java
index 143a21503e3d2..b1851c62aa514 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/typeutils/RowTypeUtilsTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/typeutils/RowTypeUtilsTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.planner.typeutils;
+package org.apache.flink.table.typeutils;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.BigIntType;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/runtimefilter/BatchExecLocalRuntimeFilterBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/runtimefilter/BatchExecLocalRuntimeFilterBuilder.java
index 4f8496cc15de8..fa242100a1cc1 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/runtimefilter/BatchExecLocalRuntimeFilterBuilder.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/runtimefilter/BatchExecLocalRuntimeFilterBuilder.java
@@ -34,12 +34,12 @@
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
-import org.apache.flink.table.planner.typeutils.RowTypeUtils;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.operators.runtimefilter.LocalRuntimeFilterBuilderOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.typeutils.RowTypeUtils;
import java.util.List;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
index 8b6c5614ee0fb..1aa62eac0a7af 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
@@ -19,16 +19,22 @@
package org.apache.flink.table.planner.plan.nodes.exec.stream;
import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.ExecutionConfigOptions.RowtimeInserter;
+import org.apache.flink.table.api.config.ExecutionConfigOptions.SinkUpsertMaterializeStrategy;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
+import org.apache.flink.table.planner.codegen.HashCodeGenerator;
import org.apache.flink.table.planner.connectors.CollectDynamicSink;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
@@ -42,16 +48,18 @@
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
-import org.apache.flink.table.planner.typeutils.RowTypeUtils;
+import org.apache.flink.table.runtime.generated.GeneratedHashFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer;
-import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerV2;
+import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateConfig;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.typeutils.RowTypeUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
@@ -63,7 +71,12 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH;
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW;
/**
* Stream {@link ExecNode} to write data into an external sink defined by a {@link
@@ -92,6 +105,7 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode stateMetadataList,
@JsonProperty(FIELD_NAME_INPUT_UPSERT_KEY) int[] inputUpsertKey,
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List inputProperties,
@@ -166,6 +188,7 @@ public StreamExecSink(
this.upsertMaterialize = upsertMaterialize;
this.inputUpsertKey = inputUpsertKey;
this.stateMetadataList = stateMetadataList;
+ this.upsertMaterializeStrategy = sinkUpsertMaterializeStrategy;
}
@SuppressWarnings("unchecked")
@@ -231,9 +254,11 @@ protected Transformation applyUpsertMaterialize(
ClassLoader classLoader,
RowType physicalRowType,
int[] inputUpsertKey) {
+
final GeneratedRecordEqualiser rowEqualiser =
new EqualiserCodeGenerator(physicalRowType, classLoader)
.generateRecordEqualiser("SinkMaterializeEqualiser");
+
final GeneratedRecordEqualiser upsertKeyEqualiser =
inputUpsertKey == null
? null
@@ -243,16 +268,37 @@ protected Transformation applyUpsertMaterialize(
classLoader)
.generateRecordEqualiser("SinkMaterializeUpsertKeyEqualiser");
- final long stateRetentionTime =
- StateMetadata.getStateTtlForOneInputOperator(config, stateMetadataList);
+ GeneratedHashFunction rowHashFunction =
+ HashCodeGenerator.generateRowHash(
+ new CodeGeneratorContext(config, classLoader),
+ physicalRowType,
+ "hashCode",
+ IntStream.range(0, physicalRowType.getFieldCount()).toArray());
- SinkUpsertMaterializer operator =
- new SinkUpsertMaterializer(
- StateConfigUtil.createTtlConfig(stateRetentionTime),
- InternalSerializers.create(physicalRowType),
- rowEqualiser,
+ final GeneratedHashFunction upsertKeyHashFunction =
+ inputUpsertKey == null
+ ? null
+ : HashCodeGenerator.generateRowHash(
+ new CodeGeneratorContext(config, classLoader),
+ RowTypeUtils.projectRowType(physicalRowType, inputUpsertKey),
+ "generated-hashcode-for-" + inputUpsertKey.length + "-keys",
+ IntStream.range(0, inputUpsertKey.length).toArray());
+
+ StateTtlConfig ttlConfig =
+ StateConfigUtil.createTtlConfig(
+ StateMetadata.getStateTtlForOneInputOperator(config, stateMetadataList));
+
+ final OneInputStreamOperator operator =
+ createSumOperator(
+ config,
+ physicalRowType,
+ inputUpsertKey,
upsertKeyEqualiser,
- inputUpsertKey);
+ upsertKeyHashFunction,
+ ttlConfig,
+ rowEqualiser,
+ rowHashFunction);
+
final String[] fieldNames = physicalRowType.getFieldNames().toArray(new String[0]);
final List pkFieldNames =
Arrays.stream(primaryKeys)
@@ -280,4 +326,60 @@ protected Transformation applyUpsertMaterialize(
materializeTransform.setStateKeyType(keySelector.getProducedType());
return materializeTransform;
}
+
+ private OneInputStreamOperator createSumOperator(
+ ExecNodeConfig config,
+ RowType physicalRowType,
+ int[] inputUpsertKey,
+ GeneratedRecordEqualiser upsertKeyEqualiser,
+ GeneratedHashFunction upsertKeyHashFunction,
+ StateTtlConfig ttlConfig,
+ GeneratedRecordEqualiser rowEqualiser,
+ GeneratedHashFunction rowHashFunction) {
+
+ SinkUpsertMaterializeStrategy sinkUpsertMaterializeStrategy =
+ Optional.ofNullable(upsertMaterializeStrategy)
+ .orElse(SinkUpsertMaterializeStrategy.LEGACY);
+
+ return sinkUpsertMaterializeStrategy == SinkUpsertMaterializeStrategy.LEGACY
+ ? SinkUpsertMaterializer.create(
+ ttlConfig,
+ physicalRowType,
+ rowEqualiser,
+ upsertKeyEqualiser,
+ inputUpsertKey)
+ : SinkUpsertMaterializerV2.create(
+ physicalRowType,
+ rowEqualiser,
+ upsertKeyEqualiser,
+ rowHashFunction,
+ upsertKeyHashFunction,
+ inputUpsertKey,
+ createStateConfig(
+ sinkUpsertMaterializeStrategy,
+ TimeDomain.EVENT_TIME,
+ ttlConfig,
+ config));
+ }
+
+ private static SequencedMultiSetStateConfig createStateConfig(
+ SinkUpsertMaterializeStrategy strategy,
+ TimeDomain ttlTimeDomain,
+ StateTtlConfig ttlConfig,
+ ReadableConfig config) {
+ switch (strategy) {
+ case VALUE:
+ return SequencedMultiSetStateConfig.forValue(ttlTimeDomain, ttlConfig);
+ case MAP:
+ return SequencedMultiSetStateConfig.forMap(ttlTimeDomain, ttlConfig);
+ case ADAPTIVE:
+ return SequencedMultiSetStateConfig.adaptive(
+ ttlTimeDomain,
+ config.get(TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH),
+ config.get(TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW),
+ ttlConfig);
+ default:
+ throw new IllegalArgumentException("Unsupported strategy: " + strategy);
+ }
+ }
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
index c55c872b224d9..e790df7c9e04c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
@@ -25,7 +25,7 @@
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner;
import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
import org.apache.flink.table.planner.plan.trait.FlinkRelDistributionTraitDef;
-import org.apache.flink.table.planner.typeutils.RowTypeUtils;
+import org.apache.flink.table.typeutils.RowTypeUtils;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
index f51fddc005a82..075f7343d73ca 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
@@ -26,12 +26,12 @@ import org.apache.flink.table.functions.DeclarativeAggregateFunction
import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, CodeGenUtils, OperatorCodeGenerator, ProjectionCodeGenerator}
import org.apache.flink.table.planner.codegen.CodeGenUtils.ROW_DATA
import org.apache.flink.table.planner.plan.utils.AggregateInfoList
-import org.apache.flink.table.planner.typeutils.RowTypeUtils
import org.apache.flink.table.runtime.generated.GeneratedOperator
import org.apache.flink.table.runtime.operators.TableStreamOperator
import org.apache.flink.table.runtime.operators.aggregate.BytesHashMapSpillMemorySegmentPool
import org.apache.flink.table.runtime.util.collections.binary.BytesMap
import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.table.typeutils.RowTypeUtils
import org.apache.calcite.tools.RelBuilder
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGenerator.scala
index dc2605bb9305e..f68c302239a5b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGenerator.scala
@@ -25,10 +25,10 @@ import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, CodeGenUtils, ProjectionCodeGenerator}
import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect
import org.apache.flink.table.planner.plan.utils.AggregateInfoList
-import org.apache.flink.table.planner.typeutils.RowTypeUtils
import org.apache.flink.table.runtime.generated.GeneratedOperator
import org.apache.flink.table.runtime.operators.TableStreamOperator
import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.table.typeutils.RowTypeUtils
import org.apache.calcite.tools.RelBuilder
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala
index 41bbe7d2beed3..66ea53c2b1e7b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala
@@ -38,7 +38,6 @@ import org.apache.flink.table.planner.expressions.ExpressionBuilder._
import org.apache.flink.table.planner.expressions.converter.ExpressionConverter
import org.apache.flink.table.planner.plan.logical.{LogicalWindow, SlidingGroupWindow, TumblingGroupWindow}
import org.apache.flink.table.planner.plan.utils.{AggregateInfo, AggregateInfoList, AggregateUtil}
-import org.apache.flink.table.planner.typeutils.RowTypeUtils
import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory
import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty
import org.apache.flink.table.runtime.operators.window.TimeWindow
@@ -46,6 +45,7 @@ import org.apache.flink.table.runtime.operators.window.grouping.{HeapWindowsGrou
import org.apache.flink.table.runtime.util.RowIterator
import org.apache.flink.table.types.logical._
import org.apache.flink.table.types.logical.LogicalTypeRoot.INTERVAL_DAY_TIME
+import org.apache.flink.table.typeutils.RowTypeUtils
import org.apache.flink.table.utils.DateTimeUtils
import org.apache.calcite.rel.core.AggregateCall
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/runtimefilter/RuntimeFilterCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/runtimefilter/RuntimeFilterCodeGenerator.scala
index 8d2fd41844699..a036aeb420c48 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/runtimefilter/RuntimeFilterCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/runtimefilter/RuntimeFilterCodeGenerator.scala
@@ -20,11 +20,11 @@ package org.apache.flink.table.planner.codegen.runtimefilter
import org.apache.flink.runtime.operators.util.BloomFilter
import org.apache.flink.table.data.RowData
import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, OperatorCodeGenerator, ProjectionCodeGenerator}
-import org.apache.flink.table.planner.codegen.CodeGenUtils.{className, newName, DEFAULT_INPUT1_TERM, DEFAULT_INPUT2_TERM, ROW_DATA}
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{className, newName, DEFAULT_INPUT1_TERM, DEFAULT_INPUT2_TERM}
import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.{generateCollect, INPUT_SELECTION}
-import org.apache.flink.table.planner.typeutils.RowTypeUtils
import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.table.typeutils.RowTypeUtils
import org.apache.flink.util.Preconditions
/** Operator code generator for runtime filter operator. */
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/HashAggFusionCodegenSpec.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/HashAggFusionCodegenSpec.scala
index 7035fe7a419f1..3402703cf81a1 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/HashAggFusionCodegenSpec.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/HashAggFusionCodegenSpec.scala
@@ -26,17 +26,17 @@ import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, CodeGenUtil
import org.apache.flink.table.planner.codegen.CodeGenUtils.{getReuseRowFieldExprs, newName, newNames}
import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator.genAdaptiveLocalHashAggValueProjectionExpr
import org.apache.flink.table.planner.codegen.agg.batch.{AggCodeGenHelper, HashAggCodeGenHelper}
-import org.apache.flink.table.planner.codegen.agg.batch.AggCodeGenHelper.{buildAggregateArgsMapping, genAggregateByFlatAggregateBuffer, genFlatAggBufferExprs, genGetValueFromFlatAggregateBuffer, genInitFlatAggregateBuffer}
-import org.apache.flink.table.planner.codegen.agg.batch.HashAggCodeGenHelper.{buildAggregateAggBuffMapping, genAggregate, genCreateFallbackSorter, genHashAggValueExpr, genRetryAppendToMap, genReusableEmptyAggBuffer, prepareFallbackSorter}
+import org.apache.flink.table.planner.codegen.agg.batch.AggCodeGenHelper._
+import org.apache.flink.table.planner.codegen.agg.batch.HashAggCodeGenHelper._
import org.apache.flink.table.planner.plan.fusion.{OpFusionCodegenSpecBase, OpFusionContext}
import org.apache.flink.table.planner.plan.fusion.FusionCodegenUtil.{constructDoConsumeCode, constructDoConsumeFunction, evaluateVariables}
import org.apache.flink.table.planner.plan.utils.AggregateInfoList
-import org.apache.flink.table.planner.typeutils.RowTypeUtils
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.{toJava, toScala}
import org.apache.flink.table.runtime.operators.aggregate.BytesHashMapSpillMemorySegmentPool
import org.apache.flink.table.runtime.util.KeyValueIterator
import org.apache.flink.table.runtime.util.collections.binary.{BytesHashMap, BytesMap}
import org.apache.flink.table.types.logical.{LogicalType, RowType}
+import org.apache.flink.table.typeutils.RowTypeUtils
import org.apache.calcite.tools.RelBuilder
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/RuntimeFilterFusionCodegenSpec.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/RuntimeFilterFusionCodegenSpec.scala
index a7e075b62177b..0f049a844aab9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/RuntimeFilterFusionCodegenSpec.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/RuntimeFilterFusionCodegenSpec.scala
@@ -22,8 +22,8 @@ import org.apache.flink.table.data.binary.BinaryRowData
import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GeneratedExpression}
import org.apache.flink.table.planner.codegen.CodeGenUtils.{className, newName, newNames}
import org.apache.flink.table.planner.plan.fusion.{OpFusionCodegenSpecBase, OpFusionContext}
-import org.apache.flink.table.planner.typeutils.RowTypeUtils
import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.table.typeutils.RowTypeUtils
import org.apache.flink.util.Preconditions
import java.util
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Rank.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Rank.scala
index a7a61f378aab4..671e340125652 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Rank.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Rank.scala
@@ -19,8 +19,8 @@ package org.apache.flink.table.planner.plan.nodes.calcite
import org.apache.flink.table.api.TableException
import org.apache.flink.table.planner.plan.utils._
-import org.apache.flink.table.planner.typeutils.RowTypeUtils
import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankRange, RankType, VariableRankRange}
+import org.apache.flink.table.typeutils.RowTypeUtils
import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, RelDataTypeFieldImpl}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
index 40eea54b99116..9ff1a315ba30d 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
@@ -17,6 +17,7 @@
*/
package org.apache.flink.table.planner.plan.nodes.physical.stream
+import org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY
import org.apache.flink.table.catalog.ContextResolvedTable
import org.apache.flink.table.connector.sink.DynamicTableSink
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
@@ -102,13 +103,17 @@ class StreamPhysicalSink(
.reuseOrCreate(cluster.getMetadataQuery)
.getUpsertKeys(inputRel)
+ val config = unwrapTableConfig(this)
new StreamExecSink(
- unwrapTableConfig(this),
+ config,
tableSinkSpec,
inputChangelogMode,
InputProperty.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
upsertMaterialize,
+ // persist upsertMaterialize strategy separately in the compiled plan to make it immutable;
+ // later on, it can't be obtained from the node config because it is merged with the new environment
+ config.getOptional(TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY).orElse(null),
UpsertKeyUtil.getSmallestKey(inputUpsertKeys),
getRelDetailedDescription)
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/OverAggregateUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/OverAggregateUtil.scala
index 389c34a30e60c..d3e1128aa48a4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/OverAggregateUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/OverAggregateUtil.scala
@@ -22,7 +22,7 @@ import org.apache.flink.table.planner.JArrayList
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.nodes.exec.spec.{OverSpec, PartitionSpec}
import org.apache.flink.table.planner.plan.nodes.exec.spec.OverSpec.GroupSpec
-import org.apache.flink.table.planner.typeutils.RowTypeUtils
+import org.apache.flink.table.typeutils.RowTypeUtils
import org.apache.calcite.plan.RelOptCluster
import org.apache.calcite.rel.`type`.RelDataType
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
index 5be058832c604..9102f68528dcc 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
@@ -29,13 +29,13 @@ import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.logical._
import org.apache.flink.table.planner.plan.utils.AggregateUtil.inferAggAccumulatorNames
import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy.{TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, TABLE_EXEC_EMIT_LATE_FIRE_ENABLED}
-import org.apache.flink.table.planner.typeutils.RowTypeUtils
import org.apache.flink.table.runtime.groupwindow._
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAssigner
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner
import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
import org.apache.flink.table.types.logical.TimestampType
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.canBeTimeAttributeType
+import org.apache.flink.table.typeutils.RowTypeUtils
import org.apache.calcite.plan.volcano.RelSubset
import org.apache.calcite.rel.`type`.RelDataType
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerV2.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerV2.java
new file mode 100644
index 0000000000000..88f14a727f079
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerV2.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedHashFunction;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState;
+import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.StateChangeInfo;
+import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateConfig;
+import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateContext;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.typeutils.RowTypeUtils;
+import org.apache.flink.types.RowKind;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An operator that maintains incoming records in state corresponding to the upsert keys and
+ * generates an upsert view for the downstream operator.
+ *
+ *
+ *
Adds an insertion to state and emits it with updated {@link RowKind}.
+ *
Applies a deletion to state.
+ *
Emits a deletion with updated {@link RowKind} iff affects the last record or the state is
+ * empty afterward. A deletion to an already updated record is swallowed.
+ *
+ */
+@Internal
+public class SinkUpsertMaterializerV2 extends TableStreamOperator
+ implements OneInputStreamOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(SinkUpsertMaterializerV2.class);
+
+ private final SequencedMultiSetStateContext stateParameters;
+
+ // Buffer of emitted insertions on which deletions will be applied first.
+ // The row kind might be +I or +U and will be ignored when applying the deletion.
+ private transient TimestampedCollector collector;
+
+ private transient SequencedMultiSetState orderedMultiSetState;
+ private final boolean hasUpsertKey;
+
+ public SinkUpsertMaterializerV2(
+ boolean hasUpsertKey, SequencedMultiSetStateContext stateParameters) {
+ this.hasUpsertKey = hasUpsertKey;
+ this.stateParameters = stateParameters;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ orderedMultiSetState =
+ SequencedMultiSetState.create(
+ stateParameters,
+ getRuntimeContext(),
+ getKeyedStateStore().getBackendTypeIdentifier());
+ collector = new TimestampedCollector<>(output);
+ LOG.info("Opened {} with upsert key: {}", this.getClass().getSimpleName(), hasUpsertKey);
+ }
+
+ @SuppressWarnings("OptionalGetWithoutIsPresent")
+ @Override
+ public void processElement(StreamRecord element) throws Exception {
+ final RowData row = element.getValue();
+ final long timestamp = element.getTimestamp();
+
+ switch (row.getRowKind()) {
+ case INSERT:
+ case UPDATE_AFTER:
+ if (hasUpsertKey) {
+ collect(row, orderedMultiSetState.add(row, timestamp).wasEmpty());
+ } else {
+ collect(row, orderedMultiSetState.append(row, timestamp).wasEmpty());
+ }
+ break;
+
+ case UPDATE_BEFORE:
+ case DELETE:
+ StateChangeInfo removalResult = orderedMultiSetState.remove(row);
+ switch (removalResult.getChangeType()) {
+ case REMOVAL_OTHER:
+ // do nothing;
+ break;
+ case REMOVAL_NOT_FOUND:
+ LOG.warn("Not found record to retract"); // not logging the record due for
+ // security
+ break;
+ case REMOVAL_ALL:
+ collect(removalResult.getPayload().get(), RowKind.DELETE);
+ break;
+ case REMOVAL_LAST_ADDED:
+ collect(removalResult.getPayload().get(), RowKind.UPDATE_AFTER);
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unexpected removal result type: " + removalResult.getChangeType());
+ }
+ }
+ }
+
+ private void collect(RowData row, boolean notExisted) {
+ collect(row, getRowKind(notExisted));
+ }
+
+ private RowKind getRowKind(boolean notExisted) {
+ return notExisted ? RowKind.INSERT : RowKind.UPDATE_AFTER;
+ }
+
+ private void collect(RowData row, RowKind withKind) {
+ RowKind orig = row.getRowKind();
+ row.setRowKind(withKind);
+ collector.collect(row);
+ row.setRowKind(orig);
+ }
+
+ public static SinkUpsertMaterializerV2 create(
+ RowType physicalRowType,
+ GeneratedRecordEqualiser rowEqualiser,
+ GeneratedRecordEqualiser upsertKeyEqualiser,
+ GeneratedHashFunction rowHashFunction,
+ GeneratedHashFunction upsertKeyHashFunction,
+ int[] inputUpsertKey,
+ SequencedMultiSetStateConfig stateSettings) {
+
+ boolean hasUpsertKey = inputUpsertKey != null && inputUpsertKey.length > 0;
+
+ return new SinkUpsertMaterializerV2(
+ hasUpsertKey,
+ new SequencedMultiSetStateContext(
+ checkNotNull(
+ hasUpsertKey
+ ? InternalSerializers.create(
+ RowTypeUtils.projectRowType(
+ physicalRowType, inputUpsertKey))
+ : InternalSerializers.create(physicalRowType)),
+ checkNotNull(hasUpsertKey ? upsertKeyEqualiser : rowEqualiser),
+ checkNotNull(hasUpsertKey ? upsertKeyHashFunction : rowHashFunction),
+ InternalSerializers.create(physicalRowType),
+ row ->
+ hasUpsertKey
+ ? ProjectedRowData.from(inputUpsertKey)
+ .withNullAtNonProjected(true)
+ .replaceRow(row)
+ : row,
+ stateSettings));
+ }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateConfig.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateConfig.java
index a6b16c343d860..1d55937abc7bf 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateConfig.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateConfig.java
@@ -20,24 +20,28 @@
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.Strategy;
import javax.annotation.Nullable;
+import java.io.Serializable;
import java.util.Optional;
import static org.apache.flink.util.Preconditions.checkArgument;
/** Configuration for {@link SequencedMultiSetState}. */
-public class SequencedMultiSetStateConfig {
+public class SequencedMultiSetStateConfig implements Serializable {
- private final SequencedMultiSetState.Strategy strategy;
+ private static final long serialVersionUID = 1L;
+
+ private final Strategy strategy;
private final @Nullable Long adaptiveHighThresholdOverride;
private final @Nullable Long adaptiveLowThresholdOverride;
private final StateTtlConfig ttlConfig;
private final TimeSelector ttlTimeSelector;
public SequencedMultiSetStateConfig(
- SequencedMultiSetState.Strategy strategy,
+ Strategy strategy,
@Nullable Long adaptiveHighThresholdOverride,
@Nullable Long adaptiveLowThresholdOverride,
StateTtlConfig ttlConfig,
@@ -51,7 +55,7 @@ public SequencedMultiSetStateConfig(
}
public SequencedMultiSetStateConfig(
- SequencedMultiSetState.Strategy strategy,
+ Strategy strategy,
@Nullable Long adaptiveHighThresholdOverride,
@Nullable Long adaptiveLowThresholdOverride,
StateTtlConfig ttlConfig,
@@ -74,22 +78,22 @@ public static SequencedMultiSetStateConfig defaults(
public static SequencedMultiSetStateConfig forMap(
TimeDomain ttlTimeDomain, StateTtlConfig ttlConfig) {
return new SequencedMultiSetStateConfig(
- SequencedMultiSetState.Strategy.MAP_STATE, null, null, ttlConfig, ttlTimeDomain);
+ Strategy.MAP_STATE, null, null, ttlConfig, ttlTimeDomain);
}
public static SequencedMultiSetStateConfig forValue(
TimeDomain ttlTimeDomain, StateTtlConfig ttl) {
return new SequencedMultiSetStateConfig(
- SequencedMultiSetState.Strategy.VALUE_STATE, null, null, ttl, ttlTimeDomain);
+ Strategy.VALUE_STATE, null, null, ttl, ttlTimeDomain);
}
public static SequencedMultiSetStateConfig adaptive(
TimeDomain ttlTimeDomain,
- long adaptiveHighThresholdOverride,
- long adaptiveLowThresholdOverride,
+ @Nullable Long adaptiveHighThresholdOverride,
+ @Nullable Long adaptiveLowThresholdOverride,
StateTtlConfig ttl) {
return new SequencedMultiSetStateConfig(
- SequencedMultiSetState.Strategy.ADAPTIVE,
+ Strategy.ADAPTIVE,
adaptiveHighThresholdOverride,
adaptiveLowThresholdOverride,
ttl,
@@ -100,7 +104,7 @@ public TimeSelector getTimeSelector() {
return ttlTimeSelector;
}
- public SequencedMultiSetState.Strategy getStrategy() {
+ public Strategy getStrategy() {
return strategy;
}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateContext.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateContext.java
index b7412d2abcd98..939eea59b953f 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateContext.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateContext.java
@@ -23,24 +23,30 @@
import org.apache.flink.table.runtime.generated.GeneratedHashFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import java.io.Serializable;
import java.util.function.Function;
/** {@link SequencedMultiSetState} (creation) context. */
-public class SequencedMultiSetStateContext {
+public class SequencedMultiSetStateContext implements Serializable {
+
+ private static final long serialVersionUID = 1L;
public final SequencedMultiSetStateConfig config;
public final TypeSerializer keySerializer;
public final GeneratedRecordEqualiser generatedKeyEqualiser;
public final GeneratedHashFunction generatedKeyHashFunction;
public final TypeSerializer recordSerializer;
- public final Function keyExtractor;
+ public final KeyExtractor keyExtractor;
+
+ /** */
+ public interface KeyExtractor extends Function, Serializable {}
public SequencedMultiSetStateContext(
TypeSerializer keySerializer,
GeneratedRecordEqualiser generatedKeyEqualiser,
GeneratedHashFunction generatedKeyHashFunction,
TypeSerializer recordSerializer,
- Function keyExtractor,
+ KeyExtractor keyExtractor,
SequencedMultiSetStateConfig config) {
this.keySerializer = keySerializer;
this.generatedKeyEqualiser = generatedKeyEqualiser;
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/TimeSelector.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/TimeSelector.java
index 40fe6f456e601..057a833b8a8ac 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/TimeSelector.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/TimeSelector.java
@@ -22,9 +22,11 @@
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.util.clock.SystemClock;
+import java.io.Serializable;
+
@Internal
@FunctionalInterface
-public interface TimeSelector {
+public interface TimeSelector extends Serializable {
long getTimestamp(long elementTimestamp);
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerMigrationTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerMigrationTest.java
index 1ab7ac4ae8468..b0ae3c268279d 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerMigrationTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerMigrationTest.java
@@ -19,11 +19,14 @@
package org.apache.flink.table.runtime.operators.sink;
import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateConfig;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.test.util.MigrationTest;
@@ -44,6 +47,8 @@
import static org.apache.flink.streaming.util.OperatorSnapshotUtil.getResourceFilename;
import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.ASSERTOR;
import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.EQUALISER;
+import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.GENERATED_HASH_FUNCTION;
+import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.GENERATED_UPSERT_HASH_FUNCTION;
import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.LOGICAL_TYPES;
import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.TTL_CONFIG;
import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.UPSERT_KEY;
@@ -74,11 +79,14 @@ public static List