Skip to content

Commit ff076b6

Browse files
spring-projectsGH-3902: Add Kotlin Coroutines Support (spring-projects#3905)
* spring-projectsGH-3902: Add Kotlin Coroutines Support Fixes spring-projects#3902 * Add `isAsync()` propagation from the `MessagingMethodInvokerHelper` to the `AbstractMessageProducingHandler` to set into its `async` property. The logic is based on a `CompletableFuture`, `Publisher` or Kotlin `suspend` return types of the POJO method * Introduce `IntegrationMessageHandlerMethodFactory` and `IntegrationInvocableHandlerMethod` to extend the logic to newly introduced `ContinuationHandlerMethodArgumentResolver` and call for Kotlin suspend functions. * Remove `MessageHandlerMethodFactoryCreatingFactoryBean` since its logic now is covered with the `IntegrationMessageHandlerMethodFactory` * Kotlin suspend functions are essentially reactive, so use `CoroutinesUtils.invokeSuspendingFunction()` and existing logic in the `AbstractMessageProducingHandler` to deal with `Publisher` reply * Fix `GroovySplitterTests` for the current code base * Add `kotlinx.coroutines.flow.Flow` support The `Flow` is essentially a multi-value reactive `Publisher`, so use `ReactiveAdapterRegistry` to convert any custom reactive streams result to `Flux` and `Mono` which we already support as reply types * Add docs for `Kotlin Coroutines` Rearrange the doc a bit extracting Kotlin support to individual `kotlin-functions.adoc` file * Fix missed link to `reactive-streams.adoc` from the `index-single.adoc` * Fix unintended Javadocs formatting in the `AbstractMessageProducingHandler` * Add suspend functions support for Messaging Gateway * Add convenient `CoroutinesUtils` for Coroutines types and `Continuation` argument fulfilling via `Mono` * Treat `suspend fun` in the `GatewayProxyFactoryBean` as a `Mono` return * Convert `Mono` to the `Continuation` resuming in the end of gateway call * Document `suspend fun` for `@MessagingGateway` * * Make `async` implicitly only for `suspend fun` * * Remove unused imports * * Verify sync and async `Flow` processing * Mention default sync behavior in the docs * * Improve reflection in the `CoroutinesUtils` * Fix language in docs Co-authored-by: Gary Russell <[email protected]> * * Rebase and revert blank lines around `include` in docs Co-authored-by: Gary Russell <[email protected]>
1 parent cfeaeca commit ff076b6

22 files changed

+612
-231
lines changed

build.gradle

+3-1
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ ext {
8484
junit4Version = '4.13.2'
8585
junitJupiterVersion = '5.9.0'
8686
jythonVersion = '2.7.3'
87+
kotlinCoroutinesVersion = '1.6.4'
8788
kryoVersion = '5.3.0'
8889
lettuceVersion = '6.2.0.RELEASE'
8990
log4jVersion = '2.19.0'
@@ -168,6 +169,7 @@ allprojects {
168169
mavenBom "org.apache.camel:camel-bom:$camelVersion"
169170
mavenBom "org.testcontainers:testcontainers-bom:$testcontainersVersion"
170171
mavenBom "org.apache.groovy:groovy-bom:$groovyVersion"
172+
mavenBom "org.jetbrains.kotlinx:kotlinx-coroutines-bom:$kotlinCoroutinesVersion"
171173
}
172174

173175
}
@@ -541,7 +543,7 @@ project('spring-integration-core') {
541543
}
542544
optionalApi "io.github.resilience4j:resilience4j-ratelimiter:$resilience4jVersion"
543545
optionalApi "org.apache.avro:avro:$avroVersion"
544-
optionalApi 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
546+
optionalApi 'org.jetbrains.kotlinx:kotlinx-coroutines-reactor'
545547

546548
testImplementation "org.aspectj:aspectjweaver:$aspectjVersion"
547549
testImplementation "org.hamcrest:hamcrest-core:$hamcrestVersion"

spring-integration-core/src/main/java/org/springframework/integration/config/DefaultConfiguringBeanFactoryPostProcessor.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.springframework.integration.context.IntegrationContextUtils;
4646
import org.springframework.integration.context.IntegrationProperties;
4747
import org.springframework.integration.handler.LoggingHandler;
48+
import org.springframework.integration.handler.support.IntegrationMessageHandlerMethodFactory;
4849
import org.springframework.integration.json.JsonPathUtils;
4950
import org.springframework.integration.support.DefaultMessageBuilderFactory;
5051
import org.springframework.integration.support.SmartLifecycleRoleController;
@@ -462,10 +463,10 @@ private void registerListMessageHandlerMethodFactory() {
462463
}
463464

464465
private static BeanDefinitionBuilder createMessageHandlerMethodFactoryBeanDefinition(boolean listCapable) {
465-
return BeanDefinitionBuilder.genericBeanDefinition(MessageHandlerMethodFactoryCreatingFactoryBean.class,
466-
() -> new MessageHandlerMethodFactoryCreatingFactoryBean(listCapable))
466+
return BeanDefinitionBuilder.genericBeanDefinition(IntegrationMessageHandlerMethodFactory.class,
467+
() -> new IntegrationMessageHandlerMethodFactory(listCapable))
467468
.addConstructorArgValue(listCapable)
468-
.addPropertyReference("argumentResolverMessageConverter",
469+
.addPropertyReference("messageConverter",
469470
IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME);
470471
}
471472

spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayMethodInboundMessageMapper.java

+4
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
4545
import org.springframework.integration.support.DefaultMessageBuilderFactory;
4646
import org.springframework.integration.support.MessageBuilderFactory;
47+
import org.springframework.integration.util.CoroutinesUtils;
4748
import org.springframework.integration.util.MessagingAnnotationUtils;
4849
import org.springframework.lang.Nullable;
4950
import org.springframework.messaging.Message;
@@ -289,6 +290,9 @@ public Message<?> toMessage(MethodArgsHolder holder, @Nullable Map<String, Objec
289290
for (int i = 0; i < GatewayMethodInboundMessageMapper.this.parameterList.size(); i++) {
290291
Object argumentValue = arguments[i];
291292
MethodParameter methodParameter = GatewayMethodInboundMessageMapper.this.parameterList.get(i);
293+
if (CoroutinesUtils.isContinuationType(methodParameter.getParameterType())) {
294+
continue;
295+
}
292296
Annotation annotation =
293297
MessagingAnnotationUtils.findMessagePartAnnotation(methodParameter.getParameterAnnotations(),
294298
false);

spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java

+45-15
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,10 @@
4646
import org.springframework.beans.factory.BeanFactory;
4747
import org.springframework.beans.factory.BeanInitializationException;
4848
import org.springframework.beans.factory.FactoryBean;
49+
import org.springframework.core.KotlinDetector;
4950
import org.springframework.core.MethodParameter;
5051
import org.springframework.core.ResolvableType;
52+
import org.springframework.core.convert.ConversionService;
5153
import org.springframework.core.task.AsyncTaskExecutor;
5254
import org.springframework.core.task.SimpleAsyncTaskExecutor;
5355
import org.springframework.core.task.support.TaskExecutorAdapter;
@@ -67,6 +69,7 @@
6769
import org.springframework.integration.support.management.IntegrationManagement;
6870
import org.springframework.integration.support.management.TrackableComponent;
6971
import org.springframework.integration.support.management.metrics.MetricsCaptor;
72+
import org.springframework.integration.util.CoroutinesUtils;
7073
import org.springframework.lang.Nullable;
7174
import org.springframework.messaging.Message;
7275
import org.springframework.messaging.MessageChannel;
@@ -498,13 +501,14 @@ public Object getObject() {
498501
@Nullable
499502
@SuppressWarnings("deprecation")
500503
public Object invoke(final MethodInvocation invocation) throws Throwable { // NOSONAR
501-
final Class<?> returnType;
502-
MethodInvocationGateway gateway = this.gatewayMap.get(invocation.getMethod());
504+
Method method = invocation.getMethod();
505+
Class<?> returnType;
506+
MethodInvocationGateway gateway = this.gatewayMap.get(method);
503507
if (gateway != null) {
504508
returnType = gateway.returnType;
505509
}
506510
else {
507-
returnType = invocation.getMethod().getReturnType();
511+
returnType = method.getReturnType();
508512
}
509513
if (this.asyncExecutor != null && !Object.class.equals(returnType)) {
510514
Invoker invoker = new Invoker(invocation);
@@ -524,7 +528,7 @@ else if (Future.class.isAssignableFrom(returnType)) {
524528
+ returnType.getSimpleName());
525529
}
526530
}
527-
if (Mono.class.isAssignableFrom(returnType)) {
531+
if (Mono.class.isAssignableFrom(returnType) || KotlinDetector.isSuspendingFunction(method)) {
528532
return doInvoke(invocation, false);
529533
}
530534
else {
@@ -534,8 +538,7 @@ else if (Future.class.isAssignableFrom(returnType)) {
534538

535539
@Nullable
536540
protected Object doInvoke(MethodInvocation invocation, boolean runningOnCallerThread) throws Throwable { // NOSONAR
537-
Method method = invocation.getMethod();
538-
if (AopUtils.isToStringMethod(method)) {
541+
if (AopUtils.isToStringMethod(invocation.getMethod())) {
539542
return "gateway proxy for service interface [" + this.serviceInterface + "]";
540543
}
541544
try {
@@ -575,16 +578,29 @@ private Object invokeGatewayMethod(MethodInvocation invocation, boolean runningO
575578
else {
576579
response = sendOrSendAndReceive(invocation, gateway, shouldReturnMessage, !oneWay);
577580
}
578-
return response(gateway.returnType, shouldReturnMessage, response);
581+
582+
Object continuation = null;
583+
if (gateway.isSuspendingFunction) {
584+
for (Object argument : invocation.getArguments()) {
585+
if (argument != null && CoroutinesUtils.isContinuation(argument)) {
586+
continuation = argument;
587+
break;
588+
}
589+
}
590+
}
591+
592+
return response(gateway.returnType, shouldReturnMessage, response, continuation);
579593
}
580594

581595
@Nullable
582-
private Object response(Class<?> returnType, boolean shouldReturnMessage, @Nullable Object response) {
596+
private Object response(Class<?> returnType, boolean shouldReturnMessage,
597+
@Nullable Object response, @Nullable Object continuation) {
598+
583599
if (shouldReturnMessage) {
584600
return response;
585601
}
586602
else {
587-
return response != null ? convert(response, returnType) : null;
603+
return response != null ? convert(response, returnType, continuation) : null;
588604
}
589605
}
590606

@@ -627,7 +643,7 @@ private Object sendOrSendAndReceive(MethodInvocation invocation, MethodInvocatio
627643

628644
Object[] args = invocation.getArguments();
629645
if (shouldReply) {
630-
if (gateway.isMonoReturn) {
646+
if (gateway.isMonoReturn || gateway.isSuspendingFunction) {
631647
Mono<Message<?>> messageMono = gateway.sendAndReceiveMessageReactive(args);
632648
if (!shouldReturnMessage) {
633649
return messageMono.map(Message::getPayload);
@@ -641,7 +657,7 @@ private Object sendOrSendAndReceive(MethodInvocation invocation, MethodInvocatio
641657
}
642658
}
643659
else {
644-
if (gateway.isMonoReturn) {
660+
if (gateway.isMonoReturn || gateway.isSuspendingFunction) {
645661
return Mono.fromRunnable(() -> gateway.send(args));
646662
}
647663
else {
@@ -1013,17 +1029,28 @@ protected void doStop() {
10131029
this.gatewayMap.values().forEach(MethodInvocationGateway::stop);
10141030
}
10151031

1016-
@SuppressWarnings("unchecked")
10171032
@Nullable
1018-
private <T> T convert(Object source, Class<T> expectedReturnType) {
1033+
@SuppressWarnings("unchecked")
1034+
private <T> T convert(Object source, Class<T> expectedReturnType, @Nullable Object continuation) {
1035+
if (continuation != null) {
1036+
return CoroutinesUtils.monoAwaitSingleOrNull((Mono<T>) source, continuation);
1037+
}
10191038
if (Future.class.isAssignableFrom(expectedReturnType)) {
10201039
return (T) source;
10211040
}
10221041
if (Mono.class.isAssignableFrom(expectedReturnType)) {
10231042
return (T) source;
10241043
}
1025-
if (getConversionService() != null) {
1026-
return getConversionService().convert(source, expectedReturnType);
1044+
1045+
1046+
return doConvert(source, expectedReturnType);
1047+
}
1048+
1049+
@Nullable
1050+
private <T> T doConvert(Object source, Class<T> expectedReturnType) {
1051+
ConversionService conversionService = getConversionService();
1052+
if (conversionService != null) {
1053+
return conversionService.convert(source, expectedReturnType);
10271054
}
10281055
else {
10291056
return this.typeConverter.convertIfNecessary(source, expectedReturnType);
@@ -1050,6 +1077,8 @@ private static final class MethodInvocationGateway extends MessagingGatewaySuppo
10501077

10511078
private boolean pollable;
10521079

1080+
private boolean isSuspendingFunction;
1081+
10531082
MethodInvocationGateway(GatewayMethodInboundMessageMapper messageMapper) {
10541083
setRequestMapper(messageMapper);
10551084
}
@@ -1088,6 +1117,7 @@ void setupReturnType(Class<?> serviceInterface, Method method) {
10881117
this.expectMessage = hasReturnParameterizedWithMessage(resolvableType);
10891118
}
10901119
this.isVoidReturn = isVoidReturnType(resolvableType);
1120+
this.isSuspendingFunction = KotlinDetector.isSuspendingFunction(method);
10911121
}
10921122

10931123
private boolean hasReturnParameterizedWithMessage(ResolvableType resolvableType) {

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java

+68-35
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@
3131
import org.reactivestreams.Publisher;
3232

3333
import org.springframework.beans.factory.BeanFactory;
34+
import org.springframework.beans.factory.BeanFactoryAware;
3435
import org.springframework.core.ReactiveAdapter;
3536
import org.springframework.core.ReactiveAdapterRegistry;
37+
import org.springframework.core.convert.ConversionService;
3638
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3739
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
3840
import org.springframework.integration.context.IntegrationContextUtils;
@@ -188,8 +190,7 @@ public Collection<String> getNotPropagatedHeaders() {
188190
/**
189191
* Add header patterns ("xxx*", "*xxx", "*xxx*" or "xxx*yyy")
190192
* that will NOT be copied from the inbound message if
191-
* {@link #shouldCopyRequestHeaders()} is true, instead of overwriting the existing
192-
* set.
193+
* {@link #shouldCopyRequestHeaders()} is true, instead of overwriting the existing set.
193194
* @param headers the headers to not propagate from the inbound message.
194195
* @since 4.3.10
195196
* @see #setNotPropagatedHeaders(String...)
@@ -308,28 +309,68 @@ private void doProduceOutput(Message<?> requestMessage, MessageHeaders requestHe
308309
replyChannel = getOutputChannel();
309310
}
310311

311-
if (this.async && (reply instanceof org.springframework.util.concurrent.ListenableFuture<?>
312-
|| reply instanceof CompletableFuture<?>
313-
|| reply instanceof Publisher<?>)) {
312+
ReactiveAdapter reactiveAdapter = null;
314313

315-
if (reply instanceof Publisher<?> &&
316-
replyChannel instanceof ReactiveStreamsSubscribableChannel) {
314+
if (this.async &&
315+
(reply instanceof org.springframework.util.concurrent.ListenableFuture<?>
316+
|| reply instanceof CompletableFuture<?>
317+
|| (reactiveAdapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(null, reply)) != null)) {
317318

318-
((ReactiveStreamsSubscribableChannel) replyChannel)
319+
if (replyChannel instanceof ReactiveStreamsSubscribableChannel reactiveStreamsSubscribableChannel) {
320+
Publisher<?> reactiveReply = toPublisherReply(reply, reactiveAdapter);
321+
reactiveStreamsSubscribableChannel
319322
.subscribeTo(
320-
Flux.from((Publisher<?>) reply)
323+
Flux.from(reactiveReply)
321324
.doOnError((ex) -> sendErrorMessage(requestMessage, ex))
322325
.map(result -> createOutputMessage(result, requestHeaders)));
323326
}
324327
else {
325-
asyncNonReactiveReply(requestMessage, reply, replyChannel);
328+
CompletableFuture<?> futureReply = toFutureReply(reply, reactiveAdapter);
329+
futureReply.whenComplete(new ReplyFutureCallback(requestMessage, replyChannel));
326330
}
327331
}
328332
else {
329333
sendOutput(createOutputMessage(reply, requestHeaders), replyChannel, false);
330334
}
331335
}
332336

337+
private static Publisher<?> toPublisherReply(Object reply, @Nullable ReactiveAdapter reactiveAdapter) {
338+
if (reactiveAdapter != null) {
339+
return reactiveAdapter.toPublisher(reply);
340+
}
341+
else {
342+
return Mono.fromFuture(toCompletableFuture(reply));
343+
}
344+
}
345+
346+
private static CompletableFuture<?> toFutureReply(Object reply, @Nullable ReactiveAdapter reactiveAdapter) {
347+
if (reactiveAdapter != null) {
348+
Mono<?> reactiveReply;
349+
Publisher<?> publisher = reactiveAdapter.toPublisher(reply);
350+
if (reactiveAdapter.isMultiValue()) {
351+
reactiveReply = Mono.just(publisher);
352+
}
353+
else {
354+
reactiveReply = Mono.from(publisher);
355+
}
356+
357+
return reactiveReply.publishOn(Schedulers.boundedElastic()).toFuture();
358+
}
359+
else {
360+
return toCompletableFuture(reply);
361+
}
362+
}
363+
364+
@SuppressWarnings("deprecation")
365+
private static CompletableFuture<?> toCompletableFuture(Object reply) {
366+
if (reply instanceof CompletableFuture<?>) {
367+
return (CompletableFuture<?>) reply;
368+
}
369+
else {
370+
return ((org.springframework.util.concurrent.ListenableFuture<?>) reply).completable();
371+
}
372+
}
373+
333374
private AbstractIntegrationMessageBuilder<?> addRoutingSlipHeader(Object reply, List<?> routingSlip,
334375
AtomicInteger routingSlipIndex) {
335376

@@ -352,30 +393,6 @@ else if (reply instanceof AbstractIntegrationMessageBuilder) {
352393
return builder;
353394
}
354395

355-
@SuppressWarnings("deprecation")
356-
private void asyncNonReactiveReply(Message<?> requestMessage, Object reply, @Nullable Object replyChannel) {
357-
CompletableFuture<?> future;
358-
if (reply instanceof CompletableFuture<?>) {
359-
future = (CompletableFuture<?>) reply;
360-
}
361-
else if (reply instanceof org.springframework.util.concurrent.ListenableFuture<?>) {
362-
future = ((org.springframework.util.concurrent.ListenableFuture<?>) reply).completable();
363-
}
364-
else {
365-
Mono<?> reactiveReply;
366-
ReactiveAdapter adapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(null, reply);
367-
if (adapter != null && adapter.isMultiValue()) {
368-
reactiveReply = Mono.just(reply);
369-
}
370-
else {
371-
reactiveReply = Mono.from((Publisher<?>) reply);
372-
}
373-
374-
future = reactiveReply.publishOn(Schedulers.boundedElastic()).toFuture();
375-
}
376-
future.whenComplete(new ReplyFutureCallback(requestMessage, replyChannel));
377-
}
378-
379396
private Object getOutputChannelFromRoutingSlip(Object reply, Message<?> requestMessage, List<?> routingSlip,
380397
AtomicInteger routingSlipIndex) {
381398

@@ -444,7 +461,7 @@ else if (output instanceof AbstractIntegrationMessageBuilder) {
444461
* <code>null</code>, and it must be an instance of either String or {@link MessageChannel}.
445462
* @param output the output object to send
446463
* @param replyChannelArg the 'replyChannel' value from the original request
447-
* @param useArgChannel - use the replyChannel argument (must not be null), not
464+
* @param useArgChannel use the replyChannel argument (must not be null), not
448465
* the configured output channel.
449466
*/
450467
protected void sendOutput(Object output, @Nullable Object replyChannelArg, boolean useArgChannel) {
@@ -522,6 +539,22 @@ protected Object resolveErrorChannel(final MessageHeaders requestHeaders) {
522539
return errorChannel;
523540
}
524541

542+
protected void setupMessageProcessor(MessageProcessor<?> processor) {
543+
if (processor instanceof AbstractMessageProcessor<?> abstractMessageProcessor) {
544+
ConversionService conversionService = getConversionService();
545+
if (conversionService != null) {
546+
abstractMessageProcessor.setConversionService(conversionService);
547+
}
548+
}
549+
BeanFactory beanFactory = getBeanFactory();
550+
if (processor instanceof BeanFactoryAware beanFactoryAware && beanFactory != null) {
551+
beanFactoryAware.setBeanFactory(beanFactory);
552+
}
553+
if (!this.async && processor instanceof MethodInvokingMessageProcessor<?> methodInvokingMessageProcessor) {
554+
this.async = methodInvokingMessageProcessor.isAsync();
555+
}
556+
}
557+
525558
private final class ReplyFutureCallback implements BiConsumer<Object, Throwable> {
526559

527560
private final Message<?> requestMessage;

spring-integration-core/src/main/java/org/springframework/integration/handler/MethodInvokingMessageProcessor.java

+4
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ public boolean isRunning() {
102102
return this.delegate.isRunning();
103103
}
104104

105+
public boolean isAsync() {
106+
return this.delegate.isAsync();
107+
}
108+
105109
@Override
106110
@Nullable
107111
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)