Skip to content

Commit 5480b2b

Browse files
committed
GH-3055: Fix DirectMLContainer for taskScheduler.shutdown()
Fixes: #3055 Issue link: #3057 The internal `DirectMessageListenerContainer.taskScheduler` is not destroyed when application context is closed. The current `doStop()` implementation is only called from `Lifecycle.stop()`. However, the application context calls the `SmartLifecycle.stop(Runnable)`. That one, in turn, in the `AbstractMessageListenerContainer` calls a `shutdown()`, which does not clean up `taskScheduler` in the `DirectMessageListenerContainer` implementation. In fact, this `shutdown()` is called from other volatile places in the `DirectMessageListenerContainer`, where assumption is that `taskScheduler` is active. Therefore, we cannot move `doStop()` extension into the `actualShutDown()` implementation. * Extract `cleanUpTaskScheduler()` method in the `DirectMessageListenerContainer` and call it from existing `doStop()`, from overridden `stop(Runnable)` and `destroy()` **Auto-cherry-pick to `3.1.x`**
1 parent bf53ac2 commit 5480b2b

File tree

2 files changed

+41
-2
lines changed

2 files changed

+41
-2
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -425,14 +425,24 @@ protected void doStart() {
425425
}
426426

427427
@Override
428-
protected void doStop() {
429-
super.doStop();
428+
public void stop(Runnable callback) {
429+
super.stop(callback);
430+
cleanUpTaskScheduler();
431+
}
432+
433+
private void cleanUpTaskScheduler() {
430434
if (!this.taskSchedulerSet && this.taskScheduler != null) {
431435
((ThreadPoolTaskScheduler) this.taskScheduler).shutdown();
432436
this.taskScheduler = null;
433437
}
434438
}
435439

440+
@Override
441+
protected void doStop() {
442+
super.doStop();
443+
cleanUpTaskScheduler();
444+
}
445+
436446
protected void actualStart() {
437447
this.aborted = false;
438448
this.hasStopped = false;
@@ -975,6 +985,12 @@ protected void consumerRemoved(SimpleConsumer consumer) {
975985
// default empty
976986
}
977987

988+
@Override
989+
public void destroy() {
990+
super.destroy();
991+
cleanUpTaskScheduler();
992+
}
993+
978994
/**
979995
* The consumer object.
980996
*/

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ContainerShutDownTests.java

+23
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.Map;
2020
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.ScheduledExecutorService;
2122
import java.util.concurrent.TimeUnit;
2223

2324
import com.rabbitmq.client.AMQP.BasicProperties;
@@ -139,4 +140,26 @@ private void consumersCorrectlyCancelledOnShutdown(AbstractMessageListenerContai
139140
}
140141
}
141142

143+
@Test
144+
void directMessageListenerContainerShutdownsItsSchedulerOnStopWithCallback() {
145+
DirectMessageListenerContainer container = new DirectMessageListenerContainer();
146+
CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
147+
container.setConnectionFactory(cf);
148+
container.setQueueNames("test.shutdown");
149+
container.setMessageListener(m -> {
150+
});
151+
152+
container.start();
153+
154+
ScheduledExecutorService scheduledExecutorService =
155+
TestUtils.getPropertyValue(container, "taskScheduler.scheduledExecutor", ScheduledExecutorService.class);
156+
157+
container.stop(() -> {
158+
});
159+
160+
cf.destroy();
161+
162+
assertThat(scheduledExecutorService.isShutdown()).isTrue();
163+
}
164+
142165
}

0 commit comments

Comments
 (0)