Skip to content

Commit 133b2c0

Browse files
authored
Merge pull request #376 from altvod/issue-375-add-resolved-timestamps
#375: Added resolvedTimestampsInterval to changefeed description
2 parents 217b9fb + 6bfa66c commit 133b2c0

File tree

4 files changed

+65
-6
lines changed

4 files changed

+65
-6
lines changed

table/src/main/java/tech/ydb/table/description/ChangefeedDescription.java

+37-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package tech.ydb.table.description;
22

3+
import java.time.Duration;
34
import java.util.Objects;
45

56
import tech.ydb.table.settings.Changefeed;
@@ -30,14 +31,36 @@ public enum State {
3031
private final Changefeed.Format format;
3132
private final State state;
3233
private final boolean virtualTimestamps;
34+
private final Duration resolvedTimestampsInterval;
3335

34-
public ChangefeedDescription(String name, Changefeed.Mode mode, Changefeed.Format format, State state,
35-
boolean virtualTimestamps) {
36+
public ChangefeedDescription(
37+
String name,
38+
Changefeed.Mode mode,
39+
Changefeed.Format format,
40+
State state,
41+
boolean virtualTimestamps,
42+
Duration resolvedTimestampsInterval
43+
) {
3644
this.name = name;
3745
this.mode = mode;
3846
this.format = format;
3947
this.state = state;
4048
this.virtualTimestamps = virtualTimestamps;
49+
this.resolvedTimestampsInterval = resolvedTimestampsInterval;
50+
}
51+
52+
/**
53+
* @deprecated use constructor with resolvedTimestampsInterval instead
54+
*/
55+
@Deprecated
56+
public ChangefeedDescription(
57+
String name,
58+
Changefeed.Mode mode,
59+
Changefeed.Format format,
60+
State state,
61+
boolean virtualTimestamps
62+
) {
63+
this(name, mode, format, state, virtualTimestamps, null);
4164
}
4265

4366
/**
@@ -75,9 +98,16 @@ public boolean hasVirtualTimestamps() {
7598
return virtualTimestamps;
7699
}
77100

101+
/**
102+
* @return Heartbeat interval
103+
*/
104+
public Duration getResolvedTimestampsInterval() {
105+
return resolvedTimestampsInterval;
106+
}
107+
78108
@Override
79109
public int hashCode() {
80-
return Objects.hash(name, mode, format, state, virtualTimestamps);
110+
return Objects.hash(name, mode, format, state, virtualTimestamps, resolvedTimestampsInterval);
81111
}
82112

83113
@Override
@@ -94,7 +124,8 @@ public boolean equals(Object o) {
94124
&& mode == cd.mode
95125
&& format == cd.format
96126
&& state == cd.state
97-
&& virtualTimestamps == cd.virtualTimestamps;
127+
&& virtualTimestamps == cd.virtualTimestamps
128+
&& Objects.equals(resolvedTimestampsInterval, cd.resolvedTimestampsInterval);
98129
}
99130

100131
@Override
@@ -104,6 +135,8 @@ public String toString() {
104135
.append(", format=").append(format)
105136
.append(", mode=").append(mode)
106137
.append(", virtual timestamps=").append(virtualTimestamps)
138+
.append(", resolved timestamps=").append(
139+
resolvedTimestampsInterval != null ? resolvedTimestampsInterval : "null")
107140
.append("}").toString();
108141
}
109142
}

table/src/main/java/tech/ydb/table/impl/BaseSession.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import tech.ydb.core.grpc.YdbHeaders;
3333
import tech.ydb.core.impl.call.ProxyReadStream;
3434
import tech.ydb.core.operation.Operation;
35+
import tech.ydb.core.utils.ProtobufUtils;
3536
import tech.ydb.core.utils.URITools;
3637
import tech.ydb.proto.StatusCodesProtos.StatusIds;
3738
import tech.ydb.proto.ValueProtos;
@@ -261,6 +262,14 @@ public static YdbTable.Changefeed buildChangefeed(Changefeed changefeed) {
261262
.build());
262263
}
263264

265+
Duration resolvedTimestampsInterval = changefeed.getResolvedTimestampsInterval();
266+
if (resolvedTimestampsInterval != null) {
267+
builder.setResolvedTimestampsInterval(com.google.protobuf.Duration.newBuilder()
268+
.setSeconds(resolvedTimestampsInterval.getSeconds())
269+
.setNanos(resolvedTimestampsInterval.getNano())
270+
.build());
271+
}
272+
264273
return builder.build();
265274
}
266275

@@ -849,7 +858,8 @@ private static TableDescription mapDescribeTable(
849858
mapChangefeedMode(pb.getMode()),
850859
mapChangefeedFormat(pb.getFormat()),
851860
mapChangefeedState(pb.getState()),
852-
pb.getVirtualTimestamps()
861+
pb.getVirtualTimestamps(),
862+
ProtobufUtils.protoToDuration(pb.getResolvedTimestampsInterval())
853863
));
854864
}
855865

table/src/main/java/tech/ydb/table/settings/Changefeed.java

+13
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public tech.ydb.proto.table.YdbTable.ChangefeedFormat.Format toProto() {
4444
private final boolean virtualTimestamps;
4545
private final Duration retentionPeriod;
4646
private final boolean initialScan;
47+
private final Duration resolvedTimestampsInterval;
4748

4849
private Changefeed(Builder builder) {
4950
this.name = builder.name;
@@ -52,6 +53,7 @@ private Changefeed(Builder builder) {
5253
this.virtualTimestamps = builder.virtualTimestamps;
5354
this.retentionPeriod = builder.retentionPeriod;
5455
this.initialScan = builder.initialScan;
56+
this.resolvedTimestampsInterval = builder.resolvedTimestampsInterval;
5557
}
5658

5759
public String getName() {
@@ -78,6 +80,10 @@ public Duration getRetentionPeriod() {
7880
return retentionPeriod;
7981
}
8082

83+
public Duration getResolvedTimestampsInterval() {
84+
return resolvedTimestampsInterval;
85+
}
86+
8187
@Deprecated
8288
public tech.ydb.proto.table.YdbTable.Changefeed toProto() {
8389
return BaseSession.buildChangefeed(this);
@@ -98,12 +104,14 @@ public static class Builder {
98104
private boolean virtualTimestamps = false;
99105
private Duration retentionPeriod = null;
100106
private boolean initialScan = false;
107+
private Duration resolvedTimestampsInterval = null;
101108

102109
private Builder(ChangefeedDescription description) {
103110
this.name = description.getName();
104111
this.mode = description.getMode();
105112
this.format = description.getFormat();
106113
this.virtualTimestamps = description.hasVirtualTimestamps();
114+
this.resolvedTimestampsInterval = description.getResolvedTimestampsInterval();
107115
}
108116

109117
private Builder(String name) {
@@ -135,6 +143,11 @@ public Builder withRetentionPeriod(Duration retentionPeriod) {
135143
return this;
136144
}
137145

146+
public Builder withResolvedTimestampsInterval(Duration resolvedTimestampsInterval) {
147+
this.resolvedTimestampsInterval = resolvedTimestampsInterval;
148+
return this;
149+
}
150+
138151
public Changefeed build() {
139152
return new Changefeed(this);
140153
}

table/src/test/java/tech/ydb/table/integration/ChangefeedTableTest.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ public void tableChangefeedsTest() {
124124
.withMode(Changefeed.Mode.NEW_AND_OLD_IMAGES)
125125
.withVirtualTimestamps(true)
126126
.withRetentionPeriod(Duration.ofDays(5))
127+
.withResolvedTimestampsInterval(Duration.ofSeconds(3))
127128
.build())
128129
)).join();
129130
Assert.assertTrue("Alter table changefeed 2 " + alterStatus, alterStatus.isSuccess());
@@ -142,8 +143,9 @@ public void tableChangefeedsTest() {
142143
Assert.assertEquals(Changefeed.Format.JSON, ch1.getFormat());
143144
Assert.assertEquals(ChangefeedDescription.State.ENABLED, ch1.getState());
144145
Assert.assertFalse(ch1.hasVirtualTimestamps());
146+
Assert.assertEquals(ch1.getResolvedTimestampsInterval(), Duration.ZERO); // zero is default when not specified
145147
Assert.assertEquals(
146-
"Changefeed['change1']{state=ENABLED, format=JSON, mode=KEYS_ONLY, virtual timestamps=false}",
148+
"Changefeed['change1']{state=ENABLED, format=JSON, mode=KEYS_ONLY, virtual timestamps=false, resolved timestamps=PT0S}",
147149
ch1.toString()
148150
);
149151

@@ -152,6 +154,7 @@ public void tableChangefeedsTest() {
152154
Assert.assertEquals(Changefeed.Mode.NEW_AND_OLD_IMAGES, ch2.getMode());
153155
Assert.assertEquals(Changefeed.Format.JSON, ch2.getFormat());
154156
Assert.assertTrue(ch2.hasVirtualTimestamps());
157+
Assert.assertEquals(ch2.getResolvedTimestampsInterval(), Duration.ofSeconds(3));
155158

156159
// State may be flaky
157160
Assert.assertTrue(ch2.getState() == ChangefeedDescription.State.INITIAL_SCAN ||

0 commit comments

Comments
 (0)