Starting with version 4.1, Spring Integration provides an implementation of the scatter-gather enterprise integration pattern. It is a compound endpoint for which the goal is to send a message to the recipients and aggregate the results. As noted in Enterprise Integration Patterns, it is a component for scenarios such as “best quote”, where we need to request information from several suppliers and decide which one provides us with the best term for the requested item.
Previously, the pattern could be configured by using discrete components. This enhancement brings more convenient configuration.
The ScatterGatherHandler
is a request-reply endpoint that combines a PublishSubscribeChannel
(or a RecipientListRouter
) and an AggregatingMessageHandler
.
The request message is sent to the scatter
channel, and the ScatterGatherHandler
waits for the reply that the aggregator sends to the outputChannel
.
The Scatter-Gather
pattern suggests two scenarios: “auction” and “distribution”.
In both cases, the aggregation
function is the same and provides all the options available for the AggregatingMessageHandler
.
(Actually, the ScatterGatherHandler
requires only an AggregatingMessageHandler
as a constructor argument.)
See Aggregator for more information.
The auction Scatter-Gather
variant uses “publish-subscribe” logic for the request message, where the “scatter” channel is a PublishSubscribeChannel
with apply-sequence="true"
.
However, this channel can be any MessageChannel
implementation (as is the case with the request-channel
in the ContentEnricher
— see Content Enricher).
However, in this case, you should create your own custom correlationStrategy
for the aggregation
function.
The distribution Scatter-Gather
variant is based on the RecipientListRouter
(see RecipientListRouter
) with all available options for the RecipientListRouter
.
This is the second ScatterGatherHandler
constructor argument.
If you want to rely on only the default correlationStrategy
for the recipient-list-router
and the aggregator
, you should specify apply-sequence="true"
.
Otherwise, you should supply a custom correlationStrategy
for the aggregator
.
Unlike the PublishSubscribeChannel
variant (the auction variant), having a recipient-list-router
selector
option lets filter target suppliers based on the message.
With apply-sequence="true"
, the default sequenceSize
is supplied, and the aggregator
can release the group correctly.
The distribution option is mutually exclusive with the auction option.
Note
|
The applySequence=true is required only for plain Java configuration based on the ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer) constructor configuration since the framework cannot mutate externally provided components.
For convenience, the XML and Java DSL for Scatter-Gather sets applySequence to true starting with version 6.0.
|
For both the auction and the distribution variants, the request (scatter) message is enriched with the gatherResultChannel
header to wait for a reply message from the aggregator
.
By default, all suppliers should send their result to the replyChannel
header (usually by omitting the output-channel
from the ultimate endpoint).
However, the gatherChannel
option is also provided, letting suppliers send their reply to that channel for the aggregation.
The following example shows Java configuration for the bean definition for Scatter-Gather
:
@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setApplySequence(true);
router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
distributionChannel3()));
return router;
}
@Bean
public MessageHandler gatherer() {
return new AggregatingMessageHandler(
new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy(
IntegrationMessageHeaderAccessor.CORRELATION_ID),
new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}
@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setOutputChannel(output());
return handler;
}
In the preceding example, we configure the RecipientListRouter
distributor
bean with applySequence="true"
and the list of recipient channels.
The next bean is for an AggregatingMessageHandler
.
Finally, we inject both those beans into the ScatterGatherHandler
bean definition and mark it as a @ServiceActivator
to wire the scatter-gather component into the integration flow.
The following example shows how to configure the <scatter-gather>
endpoint by using the XML namespace:
<scatter-gather
id="" (1)
auto-startup="" (2)
input-channel="" (3)
output-channel="" (4)
scatter-channel="" (5)
gather-channel="" (6)
order="" (7)
phase="" (8)
send-timeout="" (9)
gather-timeout="" (10)
requires-reply="" > (11)
<scatterer/> (12)
<gatherer/> (13)
</scatter-gather>
-
The id of the endpoint. The
ScatterGatherHandler
bean is registered with an alias ofid + '.handler'
. TheRecipientListRouter
bean is registered with an alias ofid + '.scatterer'
. TheAggregatingMessageHandler`bean is registered with an alias of `id + '.gatherer'
. Optional. (TheBeanFactory
generates a defaultid
value.) -
Lifecycle attribute signaling whether the endpoint should be started during application context initialization. In addition, the
ScatterGatherHandler
also implementsLifecycle
and starts and stopsgatherEndpoint
, which is created internally if agather-channel
is provided. Optional. (The default istrue
.) -
The channel on which to receive request messages to handle them in the
ScatterGatherHandler
. Required. -
The channel to which the
ScatterGatherHandler
sends the aggregation results. Optional. (Incoming messages can specify a reply channel themselves in thereplyChannel
message header). -
The channel to which to send the scatter message for the auction scenario. Optional. Mutually exclusive with the
<scatterer>
sub-element. -
The channel on which to receive replies from each supplier for the aggregation. It is used as the
replyChannel
header in the scatter message. Optional. By default, theFixedSubscriberChannel
is created. -
The order of this component when more than one handler is subscribed to the same
DirectChannel
(use for load balancing purposes). Optional. -
Specifies the phase in which the endpoint should be started and stopped. The startup order proceeds from lowest to highest, and the shutdown order is from highest to lowest. By default, this value is
Integer.MAX_VALUE
, meaning that this container starts as late as possible and stops as soon as possible. Optional. -
The timeout interval to wait when sending a reply
Message
to theoutput-channel
. By default, thesend()
blocks for one second. It applies only if the output channel has some 'sending' limitations — for example, aQueueChannel
with a fixed 'capacity' that is full. In this case, aMessageDeliveryException
is thrown. Thesend-timeout
is ignored forAbstractSubscribableChannel
implementations. Forgroup-timeout(-expression)
, theMessageDeliveryException
from the scheduled expire task leads this task to be rescheduled. Optional. -
Lets you specify how long the scatter-gather waits for the reply message before returning. By default, it waits indefinitely. 'null' is returned if the reply times out. Optional. It defaults to
-1
, meaning to wait indefinitely. -
Specifies whether the scatter-gather must return a non-null value. This value is
true
by default. Consequently, aReplyRequiredException
is thrown when the underlying aggregator returns a null value aftergather-timeout
. Note, ifnull
is a possibility, thegather-timeout
should be specified to avoid an indefinite wait. -
The
<recipient-list-router>
options. Optional. Mutually exclusive withscatter-channel
attribute. -
The
<aggregator>
options. Required.
Since Scatter-Gather is a multi request-reply component, error handling has some extra complexity.
In some cases, it is better to just catch and ignore downstream exceptions if the ReleaseStrategy
allows the process to finish with fewer replies than requests.
In other cases something like a “compensation message” should be considered for returning from sub-flow, when an error happens.
Every async sub-flow should be configured with a errorChannel
header for the proper error message sending from the MessagePublishingErrorHandler
.
Otherwise, an error will be sent to the global errorChannel
with the common error handling logic.
See Error Handling for more information about async error processing.
Synchronous flows may use an ExpressionEvaluatingRequestHandlerAdvice
for ignoring the exception or returning a compensation message.
When an exception is thrown from one of the sub-flows to the ScatterGatherHandler
, it is just re-thrown to upstream.
This way all other sub-flows will work for nothing and their replies are going to be ignored in the ScatterGatherHandler
.
This might be an expected behavior sometimes, but in most cases it would be better to handle the error in the particular sub-flow without impacting all others and the expectations in the gatherer.
Starting with version 5.1.3, the ScatterGatherHandler
is supplied with the errorChannelName
option.
It is populated to the errorChannel
header of the scatter message and is used in the when async error happens or can be used in the regular synchronous sub-flow for directly sending an error message.
The sample configuration below demonstrates async error handling by returning a compensation message:
@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
return f -> f
.scatterGather(
scatterer -> scatterer
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
.transform(p -> {
throw new RuntimeException("Sub-flow#2");
})),
null,
s -> s.errorChannel("scatterGatherErrorChannel"));
}
@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
return MessageBuilder.withPayload(payload.getCause().getCause())
.copyHeaders(payload.getFailedMessage().getHeaders())
.build();
}
To produce a proper reply, we have to copy headers (including replyChannel
and errorChannel
) from the failedMessage
of the MessagingException
that has been sent to the scatterGatherErrorChannel
by the MessagePublishingErrorHandler
.
This way the target exception is returned to the gatherer of the ScatterGatherHandler
for reply messages group completion.
Such an exception payload
can be filtered out in the MessageGroupProcessor
of the gatherer or processed other way downstream, after the scatter-gather endpoint.
Note
|
Before sending scattering results to the gatherer, ScatterGatherHandler reinstates the request message headers, including reply and error channels if any.
This way errors from the AggregatingMessageHandler are going to be propagated to the caller, even if an async hand off is applied in scatter recipient subflows.
For successful operation, a gatherResultChannel , originalReplyChannel and originalErrorChannel headers must be transferred back to replies from scatter recipient subflows.
In this case a reasonable, finite gatherTimeout must be configured for the ScatterGatherHandler .
Otherwise, it is going to be blocked waiting for a reply from the gatherer forever, by default.
|