Skip to content
This repository was archived by the owner on Dec 14, 2022. It is now read-only.

Commit fbb8d17

Browse files
syhilyzouyunhe
and
zouyunhe
authored
feat: support start consume from timestamp (#466)
Co-authored-by: zouyunhe <[email protected]>
1 parent e07d89d commit fbb8d17

File tree

8 files changed

+73
-12
lines changed

8 files changed

+73
-12
lines changed

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@
102102
@Slf4j
103103
public class FlinkPulsarSource<T> extends RichParallelSourceFunction<T>
104104
implements ResultTypeQueryable<T>, CheckpointListener, CheckpointedFunction {
105+
private static final long serialVersionUID = -6080350107046202906L;
105106

106107
/** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks. */
107108
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
@@ -235,6 +236,8 @@ public class FlinkPulsarSource<T> extends RichParallelSourceFunction<T>
235236

236237
private transient int numParallelTasks;
237238

239+
private long startupOffsetsTimestamp = -1L;
240+
238241
public FlinkPulsarSource(
239242
String adminUrl,
240243
ClientConfigurationData clientConf,
@@ -448,6 +451,24 @@ public FlinkPulsarSource<T> setStartFromSubscription(
448451
return this;
449452
}
450453

454+
public FlinkPulsarSource<T> setStartFromTimestamp(long startupOffsetsTimestamp) {
455+
checkArgument(
456+
startupOffsetsTimestamp >= 0,
457+
"The provided value for the startup offsets timestamp is invalid.");
458+
459+
long currentTimestamp = System.currentTimeMillis();
460+
checkArgument(
461+
startupOffsetsTimestamp <= currentTimestamp,
462+
"Startup time[%s] must be before current time[%s].",
463+
startupOffsetsTimestamp,
464+
currentTimestamp);
465+
466+
this.startupMode = StartupMode.TIMESTAMP;
467+
this.startupOffsetsTimestamp = startupOffsetsTimestamp;
468+
this.specificStartupOffsets = null;
469+
return this;
470+
}
471+
451472
// ------------------------------------------------------------------------
452473
// Work methods
453474
// ------------------------------------------------------------------------
@@ -631,9 +652,6 @@ protected PulsarFetcher<T> createFetcher(
631652
Set<TopicRange> excludeStartMessageIds)
632653
throws Exception {
633654

634-
// readerConf.putIfAbsent(PulsarOptions.SUBSCRIPTION_ROLE_OPTION_KEY,
635-
// getSubscriptionName());
636-
637655
return new PulsarFetcher<>(
638656
sourceContext,
639657
seedTopicsWithInitialOffsets,
@@ -650,7 +668,8 @@ protected PulsarFetcher<T> createFetcher(
650668
deserializer,
651669
metadataReader,
652670
streamingRuntime.getMetricGroup().addGroup(PULSAR_SOURCE_METRICS_GROUP),
653-
useMetrics);
671+
useMetrics,
672+
startupOffsetsTimestamp);
654673
}
655674

656675
public void joinDiscoveryLoopThread() throws InterruptedException {
@@ -988,6 +1007,7 @@ public Map<TopicRange, MessageId> offsetForEachTopic(
9881007

9891008
switch (mode) {
9901009
case LATEST:
1010+
case TIMESTAMP:
9911011
return topics.stream().collect(Collectors.toMap(k -> k, k -> MessageId.latest));
9921012
case EARLIEST:
9931013
return topics.stream().collect(Collectors.toMap(k -> k, k -> MessageId.earliest));

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/config/StartupMode.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,7 @@ public enum StartupMode {
2626

2727
SPECIFIC_OFFSETS,
2828

29-
EXTERNAL_SUBSCRIPTION
29+
EXTERNAL_SUBSCRIPTION,
30+
31+
TIMESTAMP
3032
}

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,8 @@ public class PulsarFetcher<T> {
148148
/** The metric group which all metrics for the source should be registered to. */
149149
private final MetricGroup consumerMetricGroup;
150150

151+
private final long startConsumeTimestamp;
152+
151153
public PulsarFetcher(
152154
SourceContext<T> sourceContext,
153155
Map<TopicRange, MessageId> seedTopicsWithInitialOffsets,
@@ -180,7 +182,8 @@ public PulsarFetcher(
180182
deserializer,
181183
metadataReader,
182184
consumerMetricGroup,
183-
useMetrics);
185+
useMetrics,
186+
-1);
184187
}
185188

186189
public PulsarFetcher(
@@ -199,7 +202,8 @@ public PulsarFetcher(
199202
PulsarDeserializationSchema<T> deserializer,
200203
PulsarMetadataReader metadataReader,
201204
MetricGroup consumerMetricGroup,
202-
boolean useMetrics)
205+
boolean useMetrics,
206+
long startConsumeTimestamp)
203207
throws Exception {
204208

205209
this.sourceContext = sourceContext;
@@ -272,6 +276,8 @@ public PulsarFetcher(
272276

273277
periodicEmitter.start();
274278
}
279+
280+
this.startConsumeTimestamp = startConsumeTimestamp;
275281
}
276282

277283
public void runFetchLoop() throws Exception {
@@ -567,7 +573,8 @@ protected ReaderThread<T> createReaderThread(
567573
exceptionProxy,
568574
failOnDataLoss,
569575
useEarliestWhenDataLoss,
570-
excludeStartMessageIds.contains(state.getTopicRange()));
576+
excludeStartMessageIds.contains(state.getTopicRange()),
577+
startConsumeTimestamp);
571578
}
572579

573580
/**

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,17 @@ public class ReaderThread<T> extends Thread {
6262

6363
protected volatile Reader<T> reader = null;
6464

65+
private final long startConsumeTimestamp;
66+
6567
public ReaderThread(
6668
PulsarFetcher<T> owner,
6769
PulsarTopicState state,
6870
ClientConfigurationData clientConf,
6971
Map<String, Object> readerConf,
7072
PulsarDeserializationSchema<T> deserializer,
7173
int pollTimeoutMs,
72-
ExceptionProxy exceptionProxy) {
74+
ExceptionProxy exceptionProxy,
75+
long startConsumeTimestamp) {
7376
this.owner = owner;
7477
this.state = state;
7578
this.clientConf = clientConf;
@@ -80,6 +83,7 @@ public ReaderThread(
8083

8184
this.topicRange = state.getTopicRange();
8285
this.startMessageId = state.getOffset();
86+
this.startConsumeTimestamp = startConsumeTimestamp;
8387
}
8488

8589
public ReaderThread(
@@ -92,8 +96,17 @@ public ReaderThread(
9296
ExceptionProxy exceptionProxy,
9397
boolean failOnDataLoss,
9498
boolean useEarliestWhenDataLoss,
95-
boolean excludeMessageId) {
96-
this(owner, state, clientConf, readerConf, deserializer, pollTimeoutMs, exceptionProxy);
99+
boolean excludeMessageId,
100+
long startConsumeTimestamp) {
101+
this(
102+
owner,
103+
state,
104+
clientConf,
105+
readerConf,
106+
deserializer,
107+
pollTimeoutMs,
108+
exceptionProxy,
109+
startConsumeTimestamp);
97110
this.failOnDataLoss = failOnDataLoss;
98111
this.useEarliestWhenDataLoss = useEarliestWhenDataLoss;
99112
this.excludeMessageId = excludeMessageId;
@@ -111,6 +124,10 @@ public void run() {
111124
handleTooLargeCursor();
112125
createActualReader();
113126
log.info("Starting to read {} with reader thread {}", topicRange, getName());
127+
if (startConsumeTimestamp >= 0) {
128+
log.info("Reader seek to timestamp:{}", startConsumeTimestamp);
129+
reader.seek(startConsumeTimestamp);
130+
}
114131

115132
while (running) {
116133
Message<T> message = reader.readNext(pollTimeoutMs, TimeUnit.MILLISECONDS);

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
6363
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SCAN_STARTUP_SUB_NAME;
6464
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SCAN_STARTUP_SUB_START_OFFSET;
65+
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
6566
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SERVICE_URL;
6667
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SINK_MESSAGE_ROUTER;
6768
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SINK_SEMANTIC;
@@ -262,6 +263,7 @@ public Set<ConfigOption<?>> optionalOptions() {
262263
options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
263264
options.add(SCAN_STARTUP_SUB_NAME);
264265
options.add(SCAN_STARTUP_SUB_START_OFFSET);
266+
options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
265267
options.add(GENERIC);
266268

267269
options.add(PARTITION_DISCOVERY_INTERVAL_MILLIS);

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableSource.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,9 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
236236
case SPECIFIC_OFFSETS:
237237
source.setStartFromSpecificOffsets(startupOptions.specificOffsets);
238238
break;
239+
case TIMESTAMP:
240+
source.setStartFromTimestamp(startupOptions.startupTimestampMills);
241+
break;
239242
case EXTERNAL_SUBSCRIPTION:
240243
MessageId subscriptionPosition = MessageId.latest;
241244
if (CONNECTOR_STARTUP_MODE_VALUE_EARLIEST.equals(
@@ -244,6 +247,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
244247
}
245248
source.setStartFromSubscription(
246249
startupOptions.externalSubscriptionName, subscriptionPosition);
250+
break;
247251
}
248252
return SourceFunctionProvider.of(source, false);
249253
}

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarTableOptions.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,14 +255,16 @@ public class PulsarTableOptions {
255255
public static final String SCAN_STARTUP_MODE_VALUE_EXTERNAL_SUBSCRIPTION =
256256
"external-subscription";
257257
public static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
258+
public static final String SCAN_STARTUP_MODE_TIMESTAMP = "timestamp";
258259

259260
private static final Set<String> SCAN_STARTUP_MODE_ENUMS =
260261
new HashSet<>(
261262
Arrays.asList(
262263
SCAN_STARTUP_MODE_VALUE_EARLIEST,
263264
SCAN_STARTUP_MODE_VALUE_LATEST,
264265
SCAN_STARTUP_MODE_VALUE_EXTERNAL_SUBSCRIPTION,
265-
SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS));
266+
SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS,
267+
SCAN_STARTUP_MODE_TIMESTAMP));
266268

267269
// Sink partitioner.
268270
public static final String SINK_MESSAGE_ROUTER_VALUE_KEY_HASH = "key-hash";
@@ -555,6 +557,11 @@ public static StartupOptions getStartupOptions(ReadableConfig tableOptions) {
555557
options.startupMode = StartupMode.EXTERNAL_SUBSCRIPTION;
556558
break;
557559

560+
case PulsarValidator.CONNECTOR_STARTUP_MODE_VALUE_TIMESTAMP:
561+
options.startupMode = StartupMode.TIMESTAMP;
562+
options.startupTimestampMills = tableOptions.get(SCAN_STARTUP_TIMESTAMP_MILLIS);
563+
break;
564+
558565
default:
559566
throw new TableException(
560567
"Unsupported startup mode. Validator should have checked that.");
@@ -734,6 +741,7 @@ public static class StartupOptions {
734741
public Map<String, MessageId> specificOffsets = new HashMap<>();
735742
public String externalSubscriptionName;
736743
public String externalSubStartOffset;
744+
public long startupTimestampMills;
737745
}
738746

739747
/** Strategies to derive the data type of a value format by considering a key format. */

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/descriptors/PulsarValidator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public class PulsarValidator extends ConnectorDescriptorValidator {
4444
public static final String CONNECTOR_STARTUP_MODE_VALUE_LATEST = "latest";
4545
public static final String CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
4646
public static final String CONNECTOR_STARTUP_MODE_VALUE_EXTERNAL_SUB = "external-subscription";
47+
public static final String CONNECTOR_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";
4748
public static final String CONNECTOR_SPECIFIC_OFFSETS = "connector.specific-offsets";
4849
public static final String CONNECTOR_SPECIFIC_OFFSETS_PARTITION = "partition";
4950
public static final String CONNECTOR_SPECIFIC_OFFSETS_OFFSET = "offset";

0 commit comments

Comments
 (0)