Skip to content

Commit ef1e63a

Browse files
committed
Fix kafka offset from option
1 parent e3c50f6 commit ef1e63a

File tree

2 files changed

+10
-4
lines changed

2 files changed

+10
-4
lines changed

component/src/main/java/io/siddhi/extension/io/kafka/source/KafkaConsumerThread.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ public void run() {
290290
}
291291
}
292292
if (!isReplayThread) {
293-
kafkaSourceState.getTopicOffsetMap().get(record.topic()).put(record.partition(),
293+
kafkaSourceState.getTopicOffsetMap().get(record.topic()).putIfAbsent(record.partition(),
294294
record.offset());
295295
}
296296
if (endReplay(record)) {
@@ -300,7 +300,7 @@ public void run() {
300300
}
301301
} else {
302302
if (!isReplayThread) {
303-
kafkaSourceState.getTopicOffsetMap().get(record.topic()).put(record.partition(),
303+
kafkaSourceState.getTopicOffsetMap().get(record.topic()).putIfAbsent(record.partition(),
304304
record.offset());
305305
}
306306
if (metrics != null) {
@@ -351,7 +351,13 @@ public void run() {
351351
void seekToRequiredOffset() {}
352352

353353
boolean isRecordAfterStartOffset(ConsumerRecord record) {
354-
return true;
354+
Map<Integer, Long> partitionMap = kafkaSourceState.getTopicOffsetMap().get(record.topic());
355+
if (partitionMap == null) {
356+
return true;
357+
}
358+
359+
Long offsetThreshold = partitionMap.get(record.partition());
360+
return offsetThreshold == null || record.offset() > offsetThreshold;
355361
}
356362

357363
boolean endReplay(ConsumerRecord record) {

component/src/main/java/io/siddhi/extension/io/kafka/source/KafkaSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ public StateFactory<KafkaSourceState> init(SourceEventListener sourceEventListen
314314
"true"));
315315
topicOffsetMapConfig = optionHolder.validateAndGetStaticValue(TOPIC_OFFSET_MAP, null);
316316
}
317-
partitions = (partitionList != null) ? partitionList.split(KafkaIOUtils.HEADER_SEPARATOR) : null;
317+
partitions = (partitionList != null) ? partitionList.split(KafkaIOUtils.HEADER_SEPARATOR) : new String[]{"0"};
318318
topics = topicList.split(KafkaIOUtils.HEADER_SEPARATOR);
319319
seqEnabled = optionHolder.validateAndGetStaticValue(SEQ_ENABLED, "false").equalsIgnoreCase("true");
320320
optionalConfigs = optionHolder.validateAndGetStaticValue(ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES, null);

0 commit comments

Comments
 (0)