'Spring Integration recommended way of enriching events with key based queries
I want to read from data from Kafka, and on each event, read from MongoDB with the Id and another field from the Kafka event. I wonder in general what is the recommended way to do this, and whether it is possible to do this with the ReactiveMongoDbMessageSource. I thought that maybe the right operator is .gateway() or .enrich() but I'm really not sure. I don't really have a clue about how to use this with a message source so I'm not sure that it's even possible. I'd like to be able to write something like this:
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(reactiveKafkaConsumerTemplate.receiveAutoAck()
.map(GenericMessage::new))
.<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
.gateway((message) -> enrichMongoDbPayloadByMessageKey(message.getHeaders().getId())
.handle(new ReactiveElasticsearchMessageHandler());
}
I'd really like to see an example for a mock implementation of my needed enrichMongoDbPayloadByMessageKey().
Solution 1:[1]
The gateway() or enricher() is right direction, depending on your requirements if you'd like to continue the flow with only a result from the MongoDb request, or you want to add more data to the result of that transform().
The ReactiveMongoDbMessageSource is a wrong direction here just because it is used as a source of messages - the beginning of the flow. In your case it is really a service activator based on the result received from the Kafka.
There is no (yet) reactive MongoDb gateway (request-reply channel adapter), but the closest out-of-the-box solution is a MongoDbOutboundGateway: https://docs.spring.io/spring-integration/docs/current/reference/html/mongodb.html#mongodb-outbound-gateway.
If you really wish to deal here with reactive solution, consider to implement the service method which would receive your arguments, perform a reactive operation on the MongoDB and return you something. See for that goal a ReactiveMongoTemplate.findOne(Query query, Class<T> entityClass).
There is no the gateway() operator with a signature you show.
It is also wrong to use message.getHeaders().getId() since it does not reflect anything you receive from Kafka.
See more docs about gateway and enricher:
https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-gateway
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Artem Bilan |
