Skip to content

Commit df842ba

Browse files
Introduction of RateLimitSemaphoreBackPressureHandler
1 parent 425b6c8 commit df842ba

File tree

2 files changed

+293
-1
lines changed

2 files changed

+293
-1
lines changed

spring-cloud-aws-sqs/pom.xml

+10-1
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,18 @@
55
<parent>
66
<artifactId>spring-cloud-aws</artifactId>
77
<groupId>io.awspring.cloud</groupId>
8-
<version>3.3.0-SNAPSHOT</version>
8+
<version>3.3.0-RC1</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111

1212
<artifactId>spring-cloud-aws-sqs</artifactId>
1313
<name>Spring Cloud AWS SQS</name>
1414
<description>Spring Cloud AWS Simple Queue Service</description>
1515

16+
<properties>
17+
<apache-commons-lang3.version>3.17.0</apache-commons-lang3.version>
18+
</properties>
19+
1620
<dependencies>
1721
<dependency>
1822
<groupId>software.amazon.awssdk</groupId>
@@ -34,6 +38,11 @@
3438
<groupId>org.springframework.retry</groupId>
3539
<artifactId>spring-retry</artifactId>
3640
</dependency>
41+
<dependency>
42+
<groupId>org.apache.commons</groupId>
43+
<artifactId>commons-lang3</artifactId>
44+
<version>${apache-commons-lang3.version}</version>
45+
</dependency>
3746
<dependency>
3847
<groupId>com.fasterxml.jackson.core</groupId>
3948
<artifactId>jackson-databind</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
/*
2+
* Copyright 2013-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.listener;
17+
18+
import java.time.Duration;
19+
import java.util.Arrays;
20+
import java.util.concurrent.Semaphore;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
import org.apache.commons.lang3.concurrent.TimedSemaphore;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
import org.springframework.util.Assert;
27+
28+
/**
29+
* {@link BackPressureHandler} implementation that uses a {@link Semaphore} for handling backpressure.
30+
*
31+
* @author Jeroen Vandevelde
32+
* @see io.awspring.cloud.sqs.listener.source.PollingMessageSource
33+
* @since 3.0
34+
*/
35+
public class RateLimitSemaphoreBackPressureHandler
36+
implements BatchAwareBackPressureHandler, IdentifiableContainerComponent {
37+
38+
private static final Logger logger = LoggerFactory.getLogger(RateLimitSemaphoreBackPressureHandler.class);
39+
40+
private final Semaphore semaphore;
41+
private final TimedSemaphore timedSemaphore;
42+
43+
private final int batchSize;
44+
45+
private final int totalPermits;
46+
47+
private final Duration acquireTimeout;
48+
49+
private final BackPressureMode backPressureConfiguration;
50+
51+
private volatile CurrentThroughputMode currentThroughputMode;
52+
53+
private final AtomicBoolean hasAcquiredFullPermits = new AtomicBoolean(false);
54+
55+
private String id;
56+
57+
private RateLimitSemaphoreBackPressureHandler(Builder builder) {
58+
this.batchSize = builder.batchSize;
59+
this.totalPermits = builder.totalPermits;
60+
this.acquireTimeout = builder.acquireTimeout;
61+
this.backPressureConfiguration = builder.backPressureMode;
62+
this.semaphore = new Semaphore(totalPermits);
63+
this.timedSemaphore = new TimedSemaphore(1, TimeUnit.SECONDS, 10);
64+
this.currentThroughputMode = BackPressureMode.FIXED_HIGH_THROUGHPUT.equals(backPressureConfiguration)
65+
? CurrentThroughputMode.HIGH
66+
: CurrentThroughputMode.LOW;
67+
logger.debug("SemaphoreBackPressureHandler created with configuration {} and {} total permits",
68+
backPressureConfiguration, totalPermits);
69+
}
70+
71+
public static Builder builder() {
72+
return new Builder();
73+
}
74+
75+
@Override
76+
public void setId(String id) {
77+
this.id = id;
78+
}
79+
80+
@Override
81+
public String getId() {
82+
return this.id;
83+
}
84+
85+
@Override
86+
public int request(int amount) throws InterruptedException {
87+
if (timedSemaphore.getAvailablePermits() >= amount) {
88+
for (int i = 0; i < amount; i++) {
89+
boolean acquired = timedSemaphore.tryAcquire();
90+
if (!acquired) {
91+
return i;
92+
}
93+
}
94+
return amount;
95+
}
96+
else if (timedSemaphore.getAvailablePermits() > 0) {
97+
for (int i = 0; i < timedSemaphore.getAvailablePermits(); i++) {
98+
boolean acquired = timedSemaphore.tryAcquire();
99+
if (!acquired) {
100+
return i;
101+
}
102+
}
103+
return amount;
104+
}
105+
else {
106+
return 0;
107+
}
108+
}
109+
110+
// @formatter:off
111+
@Override
112+
public int requestBatch() throws InterruptedException {
113+
return CurrentThroughputMode.LOW.equals(this.currentThroughputMode)
114+
? requestInLowThroughputMode()
115+
: requestInHighThroughputMode();
116+
}
117+
118+
private int requestInHighThroughputMode() throws InterruptedException {
119+
return tryAcquire(this.batchSize, CurrentThroughputMode.HIGH)
120+
? this.batchSize
121+
: tryAcquirePartial();
122+
}
123+
// @formatter:on
124+
125+
private int tryAcquirePartial() throws InterruptedException {
126+
int availablePermits = this.timedSemaphore.getAvailablePermits();
127+
if (availablePermits == 0 || BackPressureMode.ALWAYS_POLL_MAX_MESSAGES.equals(this.backPressureConfiguration)) {
128+
return 0;
129+
}
130+
int permitsToRequest = Math.min(availablePermits, this.batchSize);
131+
CurrentThroughputMode currentThroughputModeNow = this.currentThroughputMode;
132+
logger.trace("Trying to acquire partial batch of {} permits from {} available for {} in TM {}",
133+
permitsToRequest, availablePermits, this.id, currentThroughputModeNow);
134+
boolean hasAcquiredPartial = tryAcquire(permitsToRequest, currentThroughputModeNow);
135+
return hasAcquiredPartial ? permitsToRequest : 0;
136+
}
137+
138+
private int requestInLowThroughputMode() throws InterruptedException {
139+
// Although LTM can be set / unset by many processes, only the MessageSource thread gets here,
140+
// so no actual concurrency
141+
logger.debug("Trying to acquire full permits for {}. Permits left: {}", this.id,
142+
this.timedSemaphore.getAvailablePermits());
143+
boolean hasAcquired = tryAcquire(this.totalPermits, CurrentThroughputMode.LOW);
144+
if (hasAcquired) {
145+
logger.debug("Acquired full permits for {}. Permits left: {}", this.id,
146+
this.timedSemaphore.getAvailablePermits());
147+
// We've acquired all permits - there's no other process currently processing messages
148+
if (!this.hasAcquiredFullPermits.compareAndSet(false, true)) {
149+
logger.warn("hasAcquiredFullPermits was already true. Permits left: {}",
150+
this.timedSemaphore.getAvailablePermits());
151+
}
152+
return this.batchSize;
153+
}
154+
else {
155+
return 0;
156+
}
157+
}
158+
159+
private boolean tryAcquire(int amount, CurrentThroughputMode currentThroughputModeNow) throws InterruptedException {
160+
logger.trace("Acquiring {} permits for {} in TM {}", amount, this.id, this.currentThroughputMode);
161+
162+
for (int i = 0; i < amount; i++) {
163+
if (!this.timedSemaphore.tryAcquire()) {
164+
logger.trace("Not able to acquire {} permits in {} milliseconds for {} in TM {}. Permits left: {}",
165+
amount, this.acquireTimeout.toMillis(), this.id, currentThroughputModeNow,
166+
this.timedSemaphore.getAvailablePermits());
167+
return false;
168+
}
169+
}
170+
171+
logger.trace("{} permits acquired for {} in TM {}. Permits left: {}", amount, this.id, currentThroughputModeNow,
172+
this.timedSemaphore.getAvailablePermits());
173+
return true;
174+
}
175+
176+
@Override
177+
public void releaseBatch() {
178+
maybeSwitchToLowThroughputMode();
179+
// TODO is this still needed with a timed one
180+
// int permitsToRelease = getPermitsToRelease(this.batchSize);
181+
// this.semaphore.release(permitsToRelease);
182+
// logger.trace("Released {} permits for {}. Permits left: {}", permitsToRelease, this.id,
183+
// this.timedSemaphore.getAvailablePermits());
184+
}
185+
186+
@Override
187+
public int getBatchSize() {
188+
return this.batchSize;
189+
}
190+
191+
private void maybeSwitchToLowThroughputMode() {
192+
if (!BackPressureMode.FIXED_HIGH_THROUGHPUT.equals(this.backPressureConfiguration)
193+
&& CurrentThroughputMode.HIGH.equals(this.currentThroughputMode)) {
194+
logger.debug("Entire batch of permits released for {}, setting TM LOW. Permits left: {}", this.id,
195+
this.timedSemaphore.getAvailablePermits());
196+
this.currentThroughputMode = CurrentThroughputMode.LOW;
197+
}
198+
}
199+
200+
@Override
201+
public void release(int amount) {
202+
// logger.trace("Releasing {} permits for {}. Permits left: {}", amount, this.id,
203+
// this.timedSemaphore.getAvailablePermits());
204+
// maybeSwitchToHighThroughputMode(amount);
205+
// int permitsToRelease = getPermitsToRelease(amount);
206+
// this.semaphore.release(permitsToRelease);
207+
// logger.trace("Released {} permits for {}. Permits left: {}", permitsToRelease, this.id,
208+
// this.timedSemaphore.getAvailablePermits());
209+
210+
// TODO figure out how to deal with release capabilities
211+
}
212+
213+
private int getPermitsToRelease(int amount) {
214+
return this.hasAcquiredFullPermits.compareAndSet(true, false)
215+
// The first process that gets here should release all permits except for inflight messages
216+
// We can have only one batch of messages at this point since we have all permits
217+
? this.totalPermits - (this.batchSize - amount)
218+
: amount;
219+
}
220+
221+
private void maybeSwitchToHighThroughputMode(int amount) {
222+
if (CurrentThroughputMode.LOW.equals(this.currentThroughputMode)) {
223+
logger.debug("{} unused permit(s), setting TM HIGH for {}. Permits left: {}", amount, this.id,
224+
this.timedSemaphore.getAvailablePermits());
225+
this.currentThroughputMode = CurrentThroughputMode.HIGH;
226+
}
227+
}
228+
229+
@Override
230+
public boolean drain(Duration timeout) {
231+
// TODO figure out a different way to make certain all the messages have been processed
232+
timedSemaphore.shutdown();
233+
return true;
234+
}
235+
236+
private enum CurrentThroughputMode {
237+
238+
HIGH,
239+
240+
LOW;
241+
242+
}
243+
244+
public static class Builder {
245+
246+
private int batchSize;
247+
248+
private int totalPermits;
249+
250+
private Duration acquireTimeout;
251+
252+
private BackPressureMode backPressureMode;
253+
254+
public Builder batchSize(int batchSize) {
255+
this.batchSize = batchSize;
256+
return this;
257+
}
258+
259+
public Builder totalPermits(int totalPermits) {
260+
this.totalPermits = totalPermits;
261+
return this;
262+
}
263+
264+
public Builder acquireTimeout(Duration acquireTimeout) {
265+
this.acquireTimeout = acquireTimeout;
266+
return this;
267+
}
268+
269+
public Builder throughputConfiguration(BackPressureMode backPressureConfiguration) {
270+
this.backPressureMode = backPressureConfiguration;
271+
return this;
272+
}
273+
274+
public RateLimitSemaphoreBackPressureHandler build() {
275+
Assert.noNullElements(
276+
Arrays.asList(this.batchSize, this.totalPermits, this.acquireTimeout, this.backPressureMode),
277+
"Missing configuration");
278+
return new RateLimitSemaphoreBackPressureHandler(this);
279+
}
280+
281+
}
282+
283+
}

0 commit comments

Comments
 (0)