'Kafka Streams - Rebalancing exception in Kafka 1.0.0
In Kafka Streams 1.0.0, we saw a strange error coming my way. My stream app ingest a kafka topic and emit multiple aggregations on different state stores, now the app works on a cluster of 2 nodes, but the moment the third one is added the stream app crashes on all the nodes.
Following is the exception that i get.
2017-12-12 15:32:55 ERROR Kafka010Base:47 - Exception caught in thread c-7-aq32-5648256f-9142-49e2-98c0-da792e6da48e-StreamThread-5
org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: The server experienced an unexpected error when processing the request
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:566)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:539)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
On Taking a look further on brokers, I saw another exception
[2017-12-12 17:28:36,822] INFO [GroupCoordinator 90]: Preparing to rebalance group c-7-aq32 with old generation 25 (__consumer_offsets-39) (kafka.coordinator.group.GroupCoordinator)
[2017-12-12 17:28:40,500] INFO [GroupCoordinator 90]: Stabilized group c-7-aq32 generation 26 (__consumer_offsets-39) (kafka.coordinator.group.GroupCoordinator)
[2017-12-12 17:28:42,290] INFO [GroupCoordinator 90]: Assignment received from leader for group c-7-aq32 for generation 26 **(kafka.coordinator.group.GroupCoordinator)
[2017-12-12 17:28:42,300] ERROR [GroupMetadataManager brokerId=90] Appending metadata message for group c-7-aq32 generation 26 failed due to org.apache.kafka.common.errors.RecordTooLargeException, returning UNKNOWN error code to the client (kafka.coordinator.group.GroupMetadataManager)**
[2017-12-12 17:28:42,301] INFO [GroupCoordinator 90]: Preparing to rebalance group c-7-aq32 with old generation 26 (__consumer_offsets-39) (kafka.coordinator.group.GroupCoordinator)
[2017-12-12 17:28:52,301] INFO [GroupCoordinator 90]: Member c-7-aq32-6138ec53-4aff-4596-8f4b-44ae6f5d72da-StreamThread-13-consumer-e0cc0931-0619-4908-82c2-28f7bf9bace9 in group c-7-aq32 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
Now, I would need help in understanding why I would be getting RecordTooLargeException.
Solution 1:[1]
Changing following property at broker fixed the issue
message.max.bytes: 5,242,880 (5mb)
The default value for the property is 1mb. When consumers in consumer group are rebalancing , each consumer tries to put metadata of assigned and revoked partitions back to kafka. If this metadata size is bigger than 1mb then this would result in 'RecordTooLargeException' error at broker.
Although the question is old but I have also faced similar issue recently.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | TimeTraveler |
