'Camel - Handle Exception inside Split and continue
In my camel route I have a simple split like this
.split(body(), eventListAggregationStrategy).parallelProcessing()
.process(rawEventTransformationProcessor)
.end()
If exceptions occure inside .process(rawEventTransformationProcessor) I want them to be handled and send to an error queue. However all my attempts failed and the whole route stopped.
I tried onException in multiple ways (with handled, continue, shareUnitOfWork) and so on.
onException(RawEventTransformationException.class)
.to("log:RawEventTransformationException?showAll=true&multiline=true&level=ERROR")
.handled(true)
.process(exchange -> {
RawEventTransformationException cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, RawEventTransformationException.class);
exchange.getIn().setBody(cause.getFailedEvent());
})
.marshal().json(JsonLibrary.Jackson, RawEvent.class)
.to("rabbitmq://errors?queue=transformed-data-fails&routingKey=transformed-data-fails&autoDelete=false");
What works if I have an additional direct route that I trigger from inside the Processor when an Exception works.
catch (RawEventTransformationException e1) {
producerTemplate.sendBody(e1);
exchange.getIn().setBody(List.of());
throw e1;
}
What is the best practice way in catching this exception and letting the rest continue?
Solution 1:[1]
Using route-level exception handling with continued(true) worked fine for me at least. All messages split messages got processed and exception handled. Checkout Dead Letter channel for information about sending failed exchanges to error queue.
You could also call direct route inside the split block and use route-level exception handling there to limit the exception handling to just split messages.
public class ExampleTest extends CamelTestSupport {
@Test
public void exampleTest() {
List<String> body = Arrays.asList(new String[]{"a", "b", "exception", "c", "d"});
template.sendBody("direct:start", body);
}
@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.onException(Exception.class)
.log(LoggingLevel.ERROR, "Exception handled: ${exception.message}")
.continued(true)
.end()
.log(LoggingLevel.INFO, "Start")
.split(body()).parallelProcessing()
.choice()
.when(body().isNotEqualTo("exception"))
.log(LoggingLevel.INFO, "${body}")
.otherwise()
.process(exchange -> {
throw new Exception("Example exception");
})
.end()
.end()
.log(LoggingLevel.INFO, "Done");
}
};
}
}
logs:
INFO [log] org.apache.camel.spi.CamelLogger : Start
INFO [log] org.apache.camel.spi.CamelLogger : b
INFO [log] org.apache.camel.spi.CamelLogger : a
INFO [log] org.apache.camel.spi.CamelLogger : c
INFO [log] org.apache.camel.spi.CamelLogger : d
ERROR [log] org.apache.camel.spi.CamelLogger : Exception handled: Example exception
INFO [log] org.apache.camel.spi.CamelLogger : Done
Solution 2:[2]
You can have your EventListAggregationStrategy clear the exception on the exchange it returns:
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
...
newExchange.setException(null);
return newExchange;
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Pasi Österman |
| Solution 2 | Jeremy Ross |
