Skip to content

Commit bfde46a

Browse files
committed
GH-223: Fix errors in KinesisMDChA.getRecords()
Fixes #223 A `amazonKinesis.getRecords(getRecordsRequest).join()` throws `CompletionException` where the target reason is in a `cause` * Fix `KinesisMessageDrivenChannelAdapter.getRecords()` to `catch (CompletionException)` and then process its `cause` respectively * Upgrade to Spring Cloud AWS `3.0.0` GA * Upgrade other deps to the latest versions
1 parent bc9a082 commit bfde46a

File tree

3 files changed

+39
-25
lines changed

3 files changed

+39
-25
lines changed

build.gradle

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,15 @@ repositories {
3232
ext {
3333
assertjVersion = '3.24.2'
3434
awaitilityVersion = '4.2.0'
35-
awsSdkVersion = '2.20.51'
35+
awsSdkVersion = '2.20.57'
3636
jacksonVersion = '2.15.0'
3737
junitVersion = '5.9.2'
3838
log4jVersion = '2.20.0'
3939
servletApiVersion = '6.0.0'
40-
springCloudAwsVersion = '3.0.0-RC2'
40+
springCloudAwsVersion = '3.0.0'
4141
springIntegrationVersion = '6.0.5'
4242
kinesisClientVersion = '2.4.8'
43-
kinesisProducerVersion = '0.15.6'
43+
kinesisProducerVersion = '0.15.7'
4444
testcontainersVersion = '1.18.0'
4545

4646
idPrefix = 'aws'

src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1195,26 +1195,32 @@ private GetRecordsResponse getRecords(GetRecordsRequest getRecordsRequest) {
11951195
try {
11961196
return KinesisMessageDrivenChannelAdapter.this.amazonKinesis.getRecords(getRecordsRequest).join();
11971197
}
1198-
catch (ExpiredIteratorException e) {
1199-
// Iterator expired, but this does not mean that shard no longer contains
1200-
// records.
1201-
// Let's acquire iterator again (using checkpointer for iterator start
1202-
// sequence number).
1203-
logger.info(() ->
1204-
"Shard iterator for ["
1205-
+ ShardConsumer.this
1206-
+ "] expired.\n"
1207-
+ "A new one will be started from the check pointed sequence number.");
1208-
this.state = ConsumerState.EXPIRED;
1209-
}
1210-
catch (ProvisionedThroughputExceededException ex) {
1211-
logger.warn(() ->
1212-
"GetRecords request throttled for ["
1213-
+ ShardConsumer.this
1214-
+ "] with the reason: "
1215-
+ ex.getMessage());
1216-
// We are throttled, so let's sleep
1217-
prepareSleepState();
1198+
catch (CompletionException ex) {
1199+
Throwable cause = ex.getCause();
1200+
if (cause instanceof ExpiredIteratorException) {
1201+
// Iterator expired, but this does not mean that shard no longer contains
1202+
// records.
1203+
// Let's acquire iterator again (using checkpointer for iterator start
1204+
// sequence number).
1205+
logger.info(() ->
1206+
"Shard iterator for ["
1207+
+ ShardConsumer.this
1208+
+ "] expired.\n"
1209+
+ "A new one will be started from the check pointed sequence number.");
1210+
this.state = ConsumerState.EXPIRED;
1211+
}
1212+
else if (cause instanceof ProvisionedThroughputExceededException) {
1213+
logger.warn(() ->
1214+
"GetRecords request throttled for ["
1215+
+ ShardConsumer.this
1216+
+ "] with the reason: "
1217+
+ cause.getMessage());
1218+
// We are throttled, so let's sleep
1219+
prepareSleepState();
1220+
}
1221+
else {
1222+
throw ex;
1223+
}
12181224
}
12191225

12201226
return null;

src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,8 +335,16 @@ public KinesisAsyncClient amazonKinesis() {
335335
.shardIterator(shard1Iterator1)
336336
.limit(25)
337337
.build()))
338-
.willThrow(ProvisionedThroughputExceededException.builder().message("Iterator throttled").build())
339-
.willThrow(ExpiredIteratorException.builder().message("Iterator expired").build());
338+
.willReturn(
339+
CompletableFuture.failedFuture(
340+
ProvisionedThroughputExceededException.builder()
341+
.message("Iterator throttled")
342+
.build()))
343+
.willReturn(
344+
CompletableFuture.failedFuture(
345+
ExpiredIteratorException.builder()
346+
.message("Iterator expired")
347+
.build()));
340348

341349
SerializingConverter serializingConverter = new SerializingConverter();
342350

0 commit comments

Comments
 (0)