Skip to content

Commit e4a72ce

Browse files
committed
Some tweaks for DynamoDB locks heartbeat
Related to spring-cloud/spring-cloud-stream-binder-aws-kinesis#180 * Treat non-positive `DynamoDbLockRegistry.heartbeatPeriod` as no heartbeat. This way locks renewal is a responsibility of the target `DynamoDbLockRegistry` consumer. For example, the `KinesisMessageDrivenChannelAdapter` does call `tryLock()` in a loop for locks on shards it is consuming at the moment * Increase locks loop sleep timeout in the `KinesisMessageDrivenChannelAdapter` to one second to avoid many requests to DynamoDB **Cherry-pick to `2.5.x`** # Conflicts: # src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java
1 parent 4f516a3 commit e4a72ce

File tree

2 files changed

+10
-10
lines changed

2 files changed

+10
-10
lines changed

src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -1561,7 +1561,7 @@ public void run() {
15611561
}
15621562
}
15631563

1564-
sleep(250,
1564+
sleep(1000,
15651565
new IllegalStateException("ShardConsumerManager Thread [" + this + "] has been interrupted"),
15661566
true);
15671567
}

src/main/java/org/springframework/integration/aws/lock/DynamoDbLockRegistry.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.concurrent.ConcurrentHashMap;
2727
import java.util.concurrent.CountDownLatch;
2828
import java.util.concurrent.Executor;
29-
import java.util.concurrent.ExecutorService;
3029
import java.util.concurrent.ThreadFactory;
3130
import java.util.concurrent.TimeUnit;
3231
import java.util.concurrent.locks.Condition;
@@ -138,12 +137,6 @@ public class DynamoDbLockRegistry implements ExpirableLockRegistry, Initializing
138137

139138
private long heartbeatPeriod = 5L;
140139

141-
/**
142-
* Flag to denote whether the {@link ExecutorService} was provided via the setter and
143-
* thus should not be shutdown when {@link #destroy()} is called.
144-
*/
145-
private boolean executorExplicitlySet;
146-
147140
private volatile boolean initialized;
148141

149142
public DynamoDbLockRegistry(AmazonDynamoDB dynamoDB) {
@@ -205,6 +198,11 @@ public void setLeaseDuration(long leaseDuration) {
205198
this.leaseDuration = leaseDuration;
206199
}
207200

201+
/**
202+
* Specify a period in milliseconds how often send locks renewal requests called heartbeat.
203+
* When the value is less than or equal to {@code 0}, the heartbeat is disabled.
204+
* @param heartbeatPeriod the heartbeat period for background thread to renew locks in DB
205+
*/
208206
public void setHeartbeatPeriod(long heartbeatPeriod) {
209207
this.heartbeatPeriod = heartbeatPeriod;
210208
}
@@ -228,7 +226,9 @@ public void afterPropertiesSet() {
228226
if (!this.dynamoDBLockClientExplicitlySet) {
229227
AmazonDynamoDBLockClientOptions dynamoDBLockClientOptions = AmazonDynamoDBLockClientOptions
230228
.builder(this.dynamoDB, this.tableName).withPartitionKeyName(this.partitionKey)
231-
.withSortKeyName(this.sortKeyName).withHeartbeatPeriod(this.heartbeatPeriod)
229+
.withSortKeyName(this.sortKeyName)
230+
.withCreateHeartbeatBackgroundThread(this.heartbeatPeriod > 0)
231+
.withHeartbeatPeriod(this.heartbeatPeriod)
232232
.withLeaseDuration(this.leaseDuration).build();
233233

234234
this.dynamoDBLockClient = new AmazonDynamoDBLockClient(dynamoDBLockClientOptions);

0 commit comments

Comments
 (0)