'Getting connection closed exception while inserting messages to Evenhub using its client
I m basically using java to insert streaming data into the event hub. I am using the EventHubProducerClient to insert the same. But I am seeing some loss of messages while insertion and I see the below amqp exception while insertion
```
EventHubProducerClient client = new EventHubClientBuilder().connectionString(connectionString, eventHubName)
.buildProducerClient();
try {
EventDataBatch dataBatch = client.createBatch();
EventData data = new EventData(content);
if (null != data && dataBatch.tryAdd(data)) {
client.send(dataBatch);
LOG.info("Message Sent to Eventhub");
client.close();
} else
System.out.println("The data getting passed is null");
} catch (IllegalArgumentException e) {
e.printStackTrace();
telemetryClient.trackException(e);
} catch (AmqpException e) {
e.printStackTrace();
telemetryClient.trackException(e);
} catch (IOException e) {
e.printStackTrace();
telemetryClient.trackException(e);
} catch (Exception e) {
e.printStackTrace();
telemetryClient.trackException(e);
} finally {
client.close();
}
```
But I am seeing loss of some messages from source to Eventhub. In my application logs I see the following exception displaying once in every two days
**connectionId[MF_9ac2be_1645071417683] Connection closed. Could not get active connection., errorContext[NAMESPACE: Eventhub_Name.servicebus.windows.net]**
Full Exception stacktrace :
```
com.azure.core.amqp.exception.AmqpException:
at com.azure.core.amqp.implementation.ReactorConnection.onClosedError (ReactorConnection.java:400)
at com.azure.core.amqp.implementation.ReactorConnection.<init> (ReactorConnection.java:116)
at com.azure.messaging.eventhubs.implementation.EventHubReactorAmqpConnection.<init> (EventHubReactorAmqpConnection.java:64)
at com.azure.messaging.eventhubs.EventHubClientBuilder.lambda$buildConnectionProcessor$0 (EventHubClientBuilder.java:643)
at reactor.core.publisher.FluxCreate$BaseSink.onRequest (FluxCreate.java:536)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.onRequest (FluxCreate.java:264)
at com.azure.messaging.eventhubs.EventHubClientBuilder.lambda$buildConnectionProcessor$1 (EventHubClientBuilder.java:624)
at reactor.core.publisher.FluxCreate.subscribe (FluxCreate.java:94)
at reactor.core.publisher.Flux.subscribe (Flux.java:8156)
at reactor.core.publisher.Flux.subscribeWith (Flux.java:8329)
at com.azure.messaging.eventhubs.EventHubClientBuilder.buildConnectionProcessor (EventHubClientBuilder.java:651)
at com.azure.messaging.eventhubs.EventHubClientBuilder.buildAsyncClient (EventHubClientBuilder.java:555)
at com.azure.messaging.eventhubs.EventHubClientBuilder.buildClient (EventHubClientBuilder.java:593)
at com.azure.messaging.eventhubs.EventHubClientBuilder.buildProducerClient (EventHubClientBuilder.java:501)
at com.xxx.EventHubConnector.publishEvents (EventHubConnector.java:30)
```
Why am I seeing the above error ? I know I am closing the eventhub client twice but I guess it should not cause the issue. Please correct me if I am wrong
Adding Full Exception stacktrace :
com.azure.core.amqp.exception.AmqpException:
at com.azure.core.amqp.implementation.ReactorConnection.onClosedError (ReactorConnection.java:400)
at com.azure.core.amqp.implementation.ReactorConnection.<init> (ReactorConnection.java:116)
at com.azure.messaging.eventhubs.implementation.EventHubReactorAmqpConnection.<init> (EventHubReactorAmqpConnection.java:64)
at com.azure.messaging.eventhubs.EventHubClientBuilder.lambda$buildConnectionProcessor$0 (EventHubClientBuilder.java:643)
at reactor.core.publisher.FluxCreate$BaseSink.onRequest (FluxCreate.java:536)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.onRequest (FluxCreate.java:264)
at com.azure.messaging.eventhubs.EventHubClientBuilder.lambda$buildConnectionProcessor$1 (EventHubClientBuilder.java:624)
at reactor.core.publisher.FluxCreate.subscribe (FluxCreate.java:94)
at reactor.core.publisher.Flux.subscribe (Flux.java:8156)
at reactor.core.publisher.Flux.subscribeWith (Flux.java:8329)
at com.azure.messaging.eventhubs.EventHubClientBuilder.buildConnectionProcessor (EventHubClientBuilder.java:651)
at com.azure.messaging.eventhubs.EventHubClientBuilder.buildAsyncClient (EventHubClientBuilder.java:555)
at com.azure.messaging.eventhubs.EventHubClientBuilder.buildClient (EventHubClientBuilder.java:593)
at com.azure.messaging.eventhubs.EventHubClientBuilder.buildProducerClient (EventHubClientBuilder.java:501)
at com.swinds.connectors.EventHubConnector.publishEvents (EventHubConnector.java:30)
at com.swinds.connectors.EventHubConnector.setEventHub (EventHubConnector.java:77)
at com.swinds.connectors.EventHubConnector.routeEventHub (EventHubConnector.java:66)
at com.swinds.function.BlobFunction.run (BlobFunction.java:17)
at sun.reflect.GeneratedMethodAccessor3.invoke
at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:498)
at com.microsoft.azure.functions.worker.broker.JavaMethodInvokeInfo.invoke (JavaMethodInvokeInfo.java:22)
at com.microsoft.azure.functions.worker.broker.JavaMethodExecutorImpl.execute (JavaMethodExecutorImpl.java:54)
at com.microsoft.azure.functions.worker.broker.JavaFunctionBroker.invokeMethod (JavaFunctionBroker.java:57)
at com.microsoft.azure.functions.worker.handler.InvocationRequestHandler.execute (InvocationRequestHandler.java:33)
at com.microsoft.azure.functions.worker.handler.InvocationRequestHandler.execute (InvocationRequestHandler.java:10)
at com.microsoft.azure.functions.worker.handler.MessageHandler.handle (MessageHandler.java:45)
at com.microsoft.azure.functions.worker.JavaWorkerClient$StreamingMessagePeer.lambda$onNext$0 (JavaWorkerClient.java:92)
at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
at java.util.concurrent.FutureTask.run (FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
at java.lang.Thread.run (Thread.java:748)
Solution 1:[1]
Below is the sample code which will check whether the data is sent completely or not and it will continue to do so until everything is sent.
import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;
public class Sender {
private static final String connectionString = "<Event Hubs namespace connection string>";
private static final String eventHubName = "<Event hub name>";
/**
* Code sample for publishing events.
* @throws IllegalArgumentException if the EventData is bigger than the max batch size.
*/
public static void publishEvents() {
// create a producer client
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.buildProducerClient();
// sample events in an array
List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));
// create a batch
EventDataBatch eventDataBatch = producer.createBatch();
for (EventData eventData : allEvents) {
// try to add the event from the array to the batch
// continue to add the data if the data does not fit
while (!eventDataBatch.tryAdd(eventData)) {
// if the batch is full, send it and then create a new batch
producer.send(eventDataBatch);
eventDataBatch = producer.createBatch();
}
// Try to add that event that couldn't fit before.
if (!eventDataBatch.tryAdd(eventData)) {
throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
+ eventDataBatch.getMaxSizeInBytes());
}
}
// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
producer.send(eventDataBatch);
}
producer.close();
}
public static void main(String[] args) {
publishEvents();
}
}
Here is the Microsoft Document for complete information.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | SaiSakethGuduru-MT |
