'Getting connection closed exception while inserting messages to Evenhub using its client

I m basically using java to insert streaming data into the event hub. I am using the EventHubProducerClient to insert the same. But I am seeing some loss of messages while insertion and I see the below amqp exception while insertion

```
EventHubProducerClient client = new EventHubClientBuilder().connectionString(connectionString, eventHubName)
                .buildProducerClient();
        try {
            EventDataBatch dataBatch = client.createBatch();
            EventData data = new EventData(content);
            if (null != data && dataBatch.tryAdd(data)) {
                client.send(dataBatch);
                LOG.info("Message Sent to Eventhub");
                client.close();
            } else
                System.out.println("The data getting passed is null");
        } catch (IllegalArgumentException e) {
            e.printStackTrace();
            telemetryClient.trackException(e);
        } catch (AmqpException e) {
            e.printStackTrace();
            telemetryClient.trackException(e);
        } catch (IOException e) {
            e.printStackTrace();
            telemetryClient.trackException(e);
        } catch (Exception e) {
            e.printStackTrace();
            telemetryClient.trackException(e);
        } finally {
            client.close();
        }
```

But I am seeing loss of some messages from source to Eventhub. In my application logs I see the following exception displaying once in every two days

**connectionId[MF_9ac2be_1645071417683] Connection closed. Could not get active connection., errorContext[NAMESPACE: Eventhub_Name.servicebus.windows.net]**

Full Exception stacktrace :
```
com.azure.core.amqp.exception.AmqpException:
   at com.azure.core.amqp.implementation.ReactorConnection.onClosedError (ReactorConnection.java:400)
   at com.azure.core.amqp.implementation.ReactorConnection.<init> (ReactorConnection.java:116)
   at com.azure.messaging.eventhubs.implementation.EventHubReactorAmqpConnection.<init> (EventHubReactorAmqpConnection.java:64)
   at com.azure.messaging.eventhubs.EventHubClientBuilder.lambda$buildConnectionProcessor$0 (EventHubClientBuilder.java:643)
   at reactor.core.publisher.FluxCreate$BaseSink.onRequest (FluxCreate.java:536)
   at reactor.core.publisher.FluxCreate$SerializedFluxSink.onRequest (FluxCreate.java:264)
   at com.azure.messaging.eventhubs.EventHubClientBuilder.lambda$buildConnectionProcessor$1 (EventHubClientBuilder.java:624)
   at reactor.core.publisher.FluxCreate.subscribe (FluxCreate.java:94)
   at reactor.core.publisher.Flux.subscribe (Flux.java:8156)
   at reactor.core.publisher.Flux.subscribeWith (Flux.java:8329)
   at com.azure.messaging.eventhubs.EventHubClientBuilder.buildConnectionProcessor (EventHubClientBuilder.java:651)
   at com.azure.messaging.eventhubs.EventHubClientBuilder.buildAsyncClient (EventHubClientBuilder.java:555)
   at com.azure.messaging.eventhubs.EventHubClientBuilder.buildClient (EventHubClientBuilder.java:593)
   at com.azure.messaging.eventhubs.EventHubClientBuilder.buildProducerClient (EventHubClientBuilder.java:501)
   at com.xxx.EventHubConnector.publishEvents (EventHubConnector.java:30)   
```

Why am I seeing the above error ? I know I am closing the eventhub client twice but I guess it should not cause the issue. Please correct me if I am wrong

Adding Full Exception stacktrace :

 com.azure.core.amqp.exception.AmqpException:
   at com.azure.core.amqp.implementation.ReactorConnection.onClosedError (ReactorConnection.java:400)
   at com.azure.core.amqp.implementation.ReactorConnection.<init> (ReactorConnection.java:116)
   at com.azure.messaging.eventhubs.implementation.EventHubReactorAmqpConnection.<init> (EventHubReactorAmqpConnection.java:64)
   at com.azure.messaging.eventhubs.EventHubClientBuilder.lambda$buildConnectionProcessor$0 (EventHubClientBuilder.java:643)
   at reactor.core.publisher.FluxCreate$BaseSink.onRequest (FluxCreate.java:536)
   at reactor.core.publisher.FluxCreate$SerializedFluxSink.onRequest (FluxCreate.java:264)
   at com.azure.messaging.eventhubs.EventHubClientBuilder.lambda$buildConnectionProcessor$1 (EventHubClientBuilder.java:624)
   at reactor.core.publisher.FluxCreate.subscribe (FluxCreate.java:94)
   at reactor.core.publisher.Flux.subscribe (Flux.java:8156)
   at reactor.core.publisher.Flux.subscribeWith (Flux.java:8329)
   at com.azure.messaging.eventhubs.EventHubClientBuilder.buildConnectionProcessor (EventHubClientBuilder.java:651)
   at com.azure.messaging.eventhubs.EventHubClientBuilder.buildAsyncClient (EventHubClientBuilder.java:555)
   at com.azure.messaging.eventhubs.EventHubClientBuilder.buildClient (EventHubClientBuilder.java:593)
   at com.azure.messaging.eventhubs.EventHubClientBuilder.buildProducerClient (EventHubClientBuilder.java:501)
   at com.swinds.connectors.EventHubConnector.publishEvents (EventHubConnector.java:30)
   at com.swinds.connectors.EventHubConnector.setEventHub (EventHubConnector.java:77)
   at com.swinds.connectors.EventHubConnector.routeEventHub (EventHubConnector.java:66)
   at com.swinds.function.BlobFunction.run (BlobFunction.java:17)
   at sun.reflect.GeneratedMethodAccessor3.invoke
   at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke (Method.java:498)
   at com.microsoft.azure.functions.worker.broker.JavaMethodInvokeInfo.invoke (JavaMethodInvokeInfo.java:22)
   at com.microsoft.azure.functions.worker.broker.JavaMethodExecutorImpl.execute (JavaMethodExecutorImpl.java:54)
   at com.microsoft.azure.functions.worker.broker.JavaFunctionBroker.invokeMethod (JavaFunctionBroker.java:57)
   at com.microsoft.azure.functions.worker.handler.InvocationRequestHandler.execute (InvocationRequestHandler.java:33)
   at com.microsoft.azure.functions.worker.handler.InvocationRequestHandler.execute (InvocationRequestHandler.java:10)
   at com.microsoft.azure.functions.worker.handler.MessageHandler.handle (MessageHandler.java:45)
   at com.microsoft.azure.functions.worker.JavaWorkerClient$StreamingMessagePeer.lambda$onNext$0 (JavaWorkerClient.java:92)
   at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
   at java.util.concurrent.FutureTask.run (FutureTask.java:266)
   at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
   at java.lang.Thread.run (Thread.java:748)


Solution 1:[1]

Below is the sample code which will check whether the data is sent completely or not and it will continue to do so until everything is sent.

import  com.azure.messaging.eventhubs.*;

import  java.util.Arrays;

import  java.util.List;

public  class  Sender {

private  static  final  String  connectionString = "<Event Hubs namespace connection string>";

private  static  final  String  eventHubName = "<Event hub name>";

/**

* Code sample for publishing events.

* @throws  IllegalArgumentException if the EventData is bigger than the max batch size.

*/

public  static  void  publishEvents() {

// create a producer client

EventHubProducerClient  producer = new  EventHubClientBuilder()

.connectionString(connectionString, eventHubName)

.buildProducerClient();

// sample events in an array

List<EventData> allEvents = Arrays.asList(new  EventData("Foo"), new  EventData("Bar"));

// create a batch

EventDataBatch  eventDataBatch = producer.createBatch();

for (EventData  eventData  :  allEvents) {

// try to add the event from the array to the batch

// continue to add the data if the data does not fit

while (!eventDataBatch.tryAdd(eventData)) {

// if the batch is full, send it and then create a new batch

producer.send(eventDataBatch);

eventDataBatch = producer.createBatch();

}

// Try to add that event that couldn't fit before.

if (!eventDataBatch.tryAdd(eventData)) {

throw  new  IllegalArgumentException("Event is too large for an empty batch. Max size: "

+ eventDataBatch.getMaxSizeInBytes());

}

}

// send the last batch of remaining events

if (eventDataBatch.getCount() > 0) {

producer.send(eventDataBatch);

}

producer.close();

}

public  static  void  main(String[] args) {

publishEvents();

}

}

Here is the Microsoft Document for complete information.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 SaiSakethGuduru-MT