'How to create a Reactive Inbound Channel Adapter in Spring Integration Reactor

I'd like to understand how to create a reactive channel adapter for Spring Integration with Reactor core. I've understood from other forums I've read that this Mongo DB reactive adapter can be a good example, but it contains lots of Mongo domain specific classes.

I've read the Reactive part of the docs, and I see that there is a need to implement MessageProducerSupport, but from the code example it looks that there is a need of implementing a class the extends MessageProducerSpec and calls the first one. Can someone give an example for the most basic usage and explain what is really a demand for creating such a channel adapter? What I understand that I should do is something like:

public IntegrationFlow buildPipe() {
   return IntegrationFlows.from(myMessageProducerSpec)
      .handle(reactiveMongoDbStoringMessageHandler, "handleMessage")
      .handle(writeToKafka)
      .get();
}


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source