'Full duplex TCP connections using Netty4 and Apache Camel
For an IoT project I am working on I am researching a next, enhanced, version of our “Socket Handler” which is over 5 years of age and has evolved in a big beast doing apart from handling socket connections with IoT devices also in thread processing that has become a real pain to manage.
For my total rewrite I am looking into Apache Camel as a routing and transformation toolkit and understand how this can help us split processing steps into micro-services, loosely coupled through message queues.
One thing that I have trouble understanding however is how I can implement the following logic “the Apache Camel way”: An IoT device sends an initial message which contains its id, some extra headers and a message payload. Apart from extracting the message payload and routing it to a channel, I also need to use the device Id to check a message queue, named after the device id, for any commands that have to go to the device over the same socket connection that received the initial message.
Although it seems that Netty4, which is included in Camel, can deal with synchronous duplex comms, I cannot see how the above logic can be implemented in the Camel Netty4 component. Camel Routing seems to be one way only.
Is there a correct way to do this or should I forget about using camel for this and just use Netty4 bare?
Solution 1:[1]
It is possible to reproduce a full duplex communication by using two Camel routes :
- thanks by using
reuseChannelproperty - As this full duplex communication will be realised thanks to two different Camel routes,
syncproperty must be set to false.
Here the first route:
from("netty4:tcp://{{tcpAddress}}:{{tcpPort}}?decoders=#length-decoder,#string-decoder&encoders=#length-encoder,#bytearray-encoder&sync=false&reuseChannel=true") .bean("myMessageService", "receiveFromTCP").to("jms:queue:<name>")
This first route will create a TCP/IP consumer thanks to a server socket (it is also possible to use a client socket thanks to property clientMode)
As we want to reuse the just created connection, it is important to initialize decoders but also encoders that will be used later thanks to a bean (see further). This bean will be responsible of sending data by using the Channel created in this first route (Netty Channel contains a pipeline for decoding / encoding messages before receiving from / sending to TCP/IP.
Now, we want to send back some data to the parter connected to the consumer (from) endpoint of the first route. Since we are not able to do it thanks to classical producer endpoint (to), we use a bean object :
from("jsm:queue:<name>").bean("myMessageService", "sendToTCP");
Here the bean code :
public class MessageService {
private Channel openedChannel;
public void sendToTCP(final Exchange exchange) {
// opened channel will use encoders before writing on the socket already
// created in the first route
openedChannel.writeAndFlush(exchange.getIn().getBody());
}
public void receiveFromTCP(final Exchange exchange) {
// record the channel created in the first route.
this.openedChannel = exchange.getProperty(NettyConstants.NETTY_CHANNEL, Channel.class);
}
}
Of course,
the same bean instance is used in the two routes, you need to use a registry to do so :
SimpleRegistry simpleRegistry = new SimpleRegistry();
simpleRegistry.put("myMessageService", new MessageService());
Since the bean is used in two different asynchronous routes, you will have to deal with some unexected situations, by for example protecting the access of openedChannel member, or dealing with unexpected deconnexion.
This post helped me a lot to find this solution : how-to-send-a-response-back-over-a-established-tcp-connection-in-async-mode-usin
Solution 2:[2]
After end of camel route, exchange's body and headers will back to requester as response.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | |
| Solution 2 | Alexey Shmakov |
