'Configured errorChannel not called after aggregation

We are facing a strange behavior in our integration flows where the errorChannel does not receive a message in case an exception is thrown in a step after an aggregation.

This is the (reduced) flow:

    @Bean
    public StandardIntegrationFlow startKafkaInbound() {
        return IntegrationFlows.from(Kafka
                .messageDrivenChannelAdapter(
                        kafkaConsumerFactory,
                        ListenerMode.record,
                        serviceProperties.getInputTopic().getName())
                .errorChannel(errorHandler.getInputChannel())
        )
                .channel(nextChannel().getInputChannel())
                .get();
    }

@Bean
    public IntegrationFlow nextChannel() {
        return IntegrationFlows.from("next")
                .transform(Transformers.fromJson(MyObject.class))  // An exception here is sent to errorChannel
                .aggregate(aggregatorSpec ->
                        aggregatorSpec
                                .releaseStrategy(new MessageCountReleaseStrategy(100))
                                .sendPartialResultOnExpiry(true)
                                .groupTimeout(2000L)
                                .expireGroupsUponCompletion(true)
                                .correlationStrategy(message -> KafkaHeaderUtils.getOrDefault(message.getHeaders(), MY_CORRELATION_HEADER, ""))
                )
                .transform(myObjectTransformer)  // Exception here is not sent to errorChannel                 
                .channel(acknowledgeMyObjectFlow().getInputChannel())
                .get();
    }

If we add an explicit channel which is not of type DirectChannel the errorHandling is working as expected. Working code looks like:

// ...
.aggregate(aggregatorSpec -> ...)
.channel(MessageChannels.queue())
.transform(myObjectTransformer)  // Now the exception is sent to errorChannel                 
.channel(acknowledgeMyObjectFlow().getInputChannel())
// ...

Also we'd like to mention, that we have a very similar flow with an aggregation where errorHandling works as expected (Exception sent to errorChannel)

So we were actually able to get the code running, but since errorHandling is a very critical part of the application we'd really like to understand how we can ensure each error will be sent to the configured channel and why explicitly setting a QueueChannel leads to the wanted behavior.

Thanks in advance



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source