Skip to content

Commit d7a2dd5

Browse files
committed
Fix KinesisMDChA rewindIteratorOnError() for NPE
The error in the `KinesisMessageDrivenChannelAdapter.ShardConsumer.processTask()` might be also thrown just direct from the `amazonKinesis.getRecords(getRecordsRequest)`. If it happens first time, the `ShardCheckpointer` is not initialized with sequence numbers. Therefore, a condition `highestSequence.equals(lastCheckpoint)` may lead to NPE. * Rework the logic in the `KinesisMessageDrivenChannelAdapter.ShardConsumer.rewindIteratorOnError()` to deal with a `null` for `this.checkpointer.getHighestSequence()` and reuse the current `shardIterator` in the next request if no any commits happened. * Remove `ShardCheckpointer.firstSequenceInBatch` since this is exactly a meaning of `shardIterator` representation. Related to: #223
1 parent bfde46a commit d7a2dd5

File tree

2 files changed

+28
-34
lines changed

2 files changed

+28
-34
lines changed

src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@
6565
import org.springframework.context.ApplicationEventPublisherAware;
6666
import org.springframework.core.AttributeAccessor;
6767
import org.springframework.core.convert.converter.Converter;
68-
import org.springframework.core.log.LogMessage;
6968
import org.springframework.core.serializer.support.DeserializingConverter;
7069
import org.springframework.integration.IntegrationMessageHeaderAccessor;
7170
import org.springframework.integration.aws.event.KinesisShardEndedEvent;
@@ -1146,36 +1145,42 @@ private Runnable processTask() {
11461145
}
11471146

11481147
private void rewindIteratorOnError(Exception ex, GetRecordsResponse result) {
1149-
KinesisShardOffset newOffset = new KinesisShardOffset(this.shardOffset);
11501148
String lastCheckpoint = this.checkpointer.getLastCheckpointValue();
11511149
String highestSequence = this.checkpointer.getHighestSequence();
1152-
if (highestSequence.equals(lastCheckpoint)) {
1150+
1151+
if (highestSequence == null) {
1152+
// Haven't reached record process - reuse the current shard iterator.
1153+
logger.info(ex, "getRecords request has thrown exception. " +
1154+
"No checkpoints - re-request with the current shard iterator.");
1155+
}
1156+
else if (highestSequence.equals(lastCheckpoint)) {
11531157
logger.info(ex, "Record processor has thrown exception. " +
11541158
"Ignore since the highest sequence in batch was check-pointed.");
11551159
this.shardIterator = result.nextShardIterator();
1156-
return;
11571160
}
1158-
String newOffsetValue = lastCheckpoint;
1159-
if (lastCheckpoint != null) {
1160-
newOffset.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
1161+
else if (lastCheckpoint == null
1162+
|| new BigInteger(lastCheckpoint).compareTo(new BigInteger(this.shardIterator)) < 0) {
1163+
1164+
// No checkpoints for the shard - reuse the current shard iterator.
1165+
logger.info(ex, "Record processor has thrown exception. " +
1166+
"No checkpoints - re-request with the current shard iterator.");
11611167
}
11621168
else {
1163-
newOffsetValue = this.checkpointer.getFirstSequenceInBatch();
1164-
newOffset.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
1165-
}
1166-
1167-
logger.info(ex,
1168-
LogMessage.format("Record processor has thrown exception. " +
1169-
"Rewind shard iterator %s sequence number: %s",
1170-
(lastCheckpoint != null ? "after" : "at"), newOffsetValue));
1169+
KinesisShardOffset newOffset = new KinesisShardOffset(this.shardOffset);
1170+
newOffset.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
11711171

1172-
newOffset.setSequenceNumber(newOffsetValue);
1173-
GetShardIteratorRequest shardIteratorRequest = newOffset.toShardIteratorRequest();
1174-
this.shardIterator =
1175-
KinesisMessageDrivenChannelAdapter.this.amazonKinesis
1176-
.getShardIterator(shardIteratorRequest)
1177-
.join()
1178-
.shardIterator();
1172+
logger.info(ex, () ->
1173+
"Record processor has thrown exception. " +
1174+
"Rewind shard iterator after sequence number: " + lastCheckpoint);
1175+
1176+
newOffset.setSequenceNumber(lastCheckpoint);
1177+
GetShardIteratorRequest shardIteratorRequest = newOffset.toShardIteratorRequest();
1178+
this.shardIterator =
1179+
KinesisMessageDrivenChannelAdapter.this.amazonKinesis
1180+
.getShardIterator(shardIteratorRequest)
1181+
.join()
1182+
.shardIterator();
1183+
}
11791184
}
11801185

11811186
private void checkpointSwallowingProvisioningExceptions(String endingSequenceNumber) {
@@ -1235,7 +1240,6 @@ private void prepareSleepState() {
12351240
private void processRecords(List<Record> records) {
12361241
logger.trace(() -> "Processing records: " + records + " for [" + ShardConsumer.this + "]");
12371242

1238-
this.checkpointer.setFirstSequenceInBatch(records.get(0).sequenceNumber());
12391243
this.checkpointer.setHighestSequence(records.get(records.size() - 1).sequenceNumber());
12401244

12411245
if (ListenerMode.record.equals(KinesisMessageDrivenChannelAdapter.this.listenerMode)) {

src/main/java/org/springframework/integration/aws/inbound/kinesis/ShardCheckpointer.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@ class ShardCheckpointer implements Checkpointer {
4343

4444
private final String key;
4545

46-
private volatile String firstSequenceInBatch;
47-
4846
private volatile String highestSequence;
4947

5048
private volatile String lastCheckpointValue;
@@ -88,19 +86,11 @@ public boolean checkpoint(String sequenceNumber) {
8886
return false;
8987
}
9088

91-
void setFirstSequenceInBatch(String firstSequenceInBatch) {
92-
this.firstSequenceInBatch = firstSequenceInBatch;
93-
}
94-
95-
@Nullable
96-
String getFirstSequenceInBatch() {
97-
return this.firstSequenceInBatch;
98-
}
99-
10089
void setHighestSequence(String highestSequence) {
10190
this.highestSequence = highestSequence;
10291
}
10392

93+
@Nullable
10494
String getHighestSequence() {
10595
return this.highestSequence;
10696
}

0 commit comments

Comments
 (0)