diff --git a/table/src/main/java/tech/ydb/table/description/ChangefeedDescription.java b/table/src/main/java/tech/ydb/table/description/ChangefeedDescription.java index 2039820e..efd79399 100644 --- a/table/src/main/java/tech/ydb/table/description/ChangefeedDescription.java +++ b/table/src/main/java/tech/ydb/table/description/ChangefeedDescription.java @@ -1,5 +1,6 @@ package tech.ydb.table.description; +import java.time.Duration; import java.util.Objects; import tech.ydb.table.settings.Changefeed; @@ -30,14 +31,36 @@ public enum State { private final Changefeed.Format format; private final State state; private final boolean virtualTimestamps; + private final Duration resolvedTimestampsInterval; - public ChangefeedDescription(String name, Changefeed.Mode mode, Changefeed.Format format, State state, - boolean virtualTimestamps) { + public ChangefeedDescription( + String name, + Changefeed.Mode mode, + Changefeed.Format format, + State state, + boolean virtualTimestamps, + Duration resolvedTimestampsInterval + ) { this.name = name; this.mode = mode; this.format = format; this.state = state; this.virtualTimestamps = virtualTimestamps; + this.resolvedTimestampsInterval = resolvedTimestampsInterval; + } + + /** + * @deprecated use constructor with resolvedTimestampsInterval instead + */ + @Deprecated + public ChangefeedDescription( + String name, + Changefeed.Mode mode, + Changefeed.Format format, + State state, + boolean virtualTimestamps + ) { + this(name, mode, format, state, virtualTimestamps, null); } /** @@ -75,9 +98,16 @@ public boolean hasVirtualTimestamps() { return virtualTimestamps; } + /** + * @return Heartbeat interval + */ + public Duration getResolvedTimestampsInterval() { + return resolvedTimestampsInterval; + } + @Override public int hashCode() { - return Objects.hash(name, mode, format, state, virtualTimestamps); + return Objects.hash(name, mode, format, state, virtualTimestamps, resolvedTimestampsInterval); } @Override @@ -94,7 +124,8 @@ public boolean equals(Object o) { && mode == cd.mode && format == cd.format && state == cd.state - && virtualTimestamps == cd.virtualTimestamps; + && virtualTimestamps == cd.virtualTimestamps + && Objects.equals(resolvedTimestampsInterval, cd.resolvedTimestampsInterval); } @Override @@ -104,6 +135,8 @@ public String toString() { .append(", format=").append(format) .append(", mode=").append(mode) .append(", virtual timestamps=").append(virtualTimestamps) + .append(", resolved timestamps=").append( + resolvedTimestampsInterval != null ? resolvedTimestampsInterval : "null") .append("}").toString(); } } diff --git a/table/src/main/java/tech/ydb/table/impl/BaseSession.java b/table/src/main/java/tech/ydb/table/impl/BaseSession.java index be681763..1e25b012 100644 --- a/table/src/main/java/tech/ydb/table/impl/BaseSession.java +++ b/table/src/main/java/tech/ydb/table/impl/BaseSession.java @@ -32,6 +32,7 @@ import tech.ydb.core.grpc.YdbHeaders; import tech.ydb.core.impl.call.ProxyReadStream; import tech.ydb.core.operation.Operation; +import tech.ydb.core.utils.ProtobufUtils; import tech.ydb.core.utils.URITools; import tech.ydb.proto.StatusCodesProtos.StatusIds; import tech.ydb.proto.ValueProtos; @@ -261,6 +262,14 @@ public static YdbTable.Changefeed buildChangefeed(Changefeed changefeed) { .build()); } + Duration resolvedTimestampsInterval = changefeed.getResolvedTimestampsInterval(); + if (resolvedTimestampsInterval != null) { + builder.setResolvedTimestampsInterval(com.google.protobuf.Duration.newBuilder() + .setSeconds(resolvedTimestampsInterval.getSeconds()) + .setNanos(resolvedTimestampsInterval.getNano()) + .build()); + } + return builder.build(); } @@ -849,7 +858,8 @@ private static TableDescription mapDescribeTable( mapChangefeedMode(pb.getMode()), mapChangefeedFormat(pb.getFormat()), mapChangefeedState(pb.getState()), - pb.getVirtualTimestamps() + pb.getVirtualTimestamps(), + ProtobufUtils.protoToDuration(pb.getResolvedTimestampsInterval()) )); } diff --git a/table/src/main/java/tech/ydb/table/settings/Changefeed.java b/table/src/main/java/tech/ydb/table/settings/Changefeed.java index b1b7172a..ea12d5e1 100644 --- a/table/src/main/java/tech/ydb/table/settings/Changefeed.java +++ b/table/src/main/java/tech/ydb/table/settings/Changefeed.java @@ -44,6 +44,7 @@ public tech.ydb.proto.table.YdbTable.ChangefeedFormat.Format toProto() { private final boolean virtualTimestamps; private final Duration retentionPeriod; private final boolean initialScan; + private final Duration resolvedTimestampsInterval; private Changefeed(Builder builder) { this.name = builder.name; @@ -52,6 +53,7 @@ private Changefeed(Builder builder) { this.virtualTimestamps = builder.virtualTimestamps; this.retentionPeriod = builder.retentionPeriod; this.initialScan = builder.initialScan; + this.resolvedTimestampsInterval = builder.resolvedTimestampsInterval; } public String getName() { @@ -78,6 +80,10 @@ public Duration getRetentionPeriod() { return retentionPeriod; } + public Duration getResolvedTimestampsInterval() { + return resolvedTimestampsInterval; + } + @Deprecated public tech.ydb.proto.table.YdbTable.Changefeed toProto() { return BaseSession.buildChangefeed(this); @@ -98,12 +104,14 @@ public static class Builder { private boolean virtualTimestamps = false; private Duration retentionPeriod = null; private boolean initialScan = false; + private Duration resolvedTimestampsInterval = null; private Builder(ChangefeedDescription description) { this.name = description.getName(); this.mode = description.getMode(); this.format = description.getFormat(); this.virtualTimestamps = description.hasVirtualTimestamps(); + this.resolvedTimestampsInterval = description.getResolvedTimestampsInterval(); } private Builder(String name) { @@ -135,6 +143,11 @@ public Builder withRetentionPeriod(Duration retentionPeriod) { return this; } + public Builder withResolvedTimestampsInterval(Duration resolvedTimestampsInterval) { + this.resolvedTimestampsInterval = resolvedTimestampsInterval; + return this; + } + public Changefeed build() { return new Changefeed(this); } diff --git a/table/src/test/java/tech/ydb/table/integration/ChangefeedTableTest.java b/table/src/test/java/tech/ydb/table/integration/ChangefeedTableTest.java index 9a719d16..02bc2420 100644 --- a/table/src/test/java/tech/ydb/table/integration/ChangefeedTableTest.java +++ b/table/src/test/java/tech/ydb/table/integration/ChangefeedTableTest.java @@ -124,6 +124,7 @@ public void tableChangefeedsTest() { .withMode(Changefeed.Mode.NEW_AND_OLD_IMAGES) .withVirtualTimestamps(true) .withRetentionPeriod(Duration.ofDays(5)) + .withResolvedTimestampsInterval(Duration.ofSeconds(3)) .build()) )).join(); Assert.assertTrue("Alter table changefeed 2 " + alterStatus, alterStatus.isSuccess()); @@ -142,8 +143,9 @@ public void tableChangefeedsTest() { Assert.assertEquals(Changefeed.Format.JSON, ch1.getFormat()); Assert.assertEquals(ChangefeedDescription.State.ENABLED, ch1.getState()); Assert.assertFalse(ch1.hasVirtualTimestamps()); + Assert.assertEquals(ch1.getResolvedTimestampsInterval(), Duration.ZERO); // zero is default when not specified Assert.assertEquals( - "Changefeed['change1']{state=ENABLED, format=JSON, mode=KEYS_ONLY, virtual timestamps=false}", + "Changefeed['change1']{state=ENABLED, format=JSON, mode=KEYS_ONLY, virtual timestamps=false, resolved timestamps=PT0S}", ch1.toString() ); @@ -152,6 +154,7 @@ public void tableChangefeedsTest() { Assert.assertEquals(Changefeed.Mode.NEW_AND_OLD_IMAGES, ch2.getMode()); Assert.assertEquals(Changefeed.Format.JSON, ch2.getFormat()); Assert.assertTrue(ch2.hasVirtualTimestamps()); + Assert.assertEquals(ch2.getResolvedTimestampsInterval(), Duration.ofSeconds(3)); // State may be flaky Assert.assertTrue(ch2.getState() == ChangefeedDescription.State.INITIAL_SCAN ||