'Kafka Consumer infinite loop in polling JAVA

Using kafka_2.13-3.1.0 on windows and <kafka.version>2.6.0</kafka.version> in pom.xml Just doing a simple POC to send and consume message. Java Code for Consumer :

package com.cox.amp.api.kafka;
import java.util.Properties;
import java.time.Duration;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      //Kafka consumer configuration settings
      String topicName = "Topic1";
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "127.0.0.1:9092");
      props.put("group.id", "test-consumer-group");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("auto.offset.reset","earliest");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serialization.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      TopicPartition topicPartition = new TopicPartition(topicName, 0);
      consumer.assign(Arrays.asList(topicPartition));
//      consumer.subscribe(Arrays.asList(topicName));
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
//      while (true) {
          try {
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000));
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset1 = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
          }
          catch (Exception ex) {
              ex.printStackTrace();
//      }
      }
   }
}

THe log printed:

01:07:34.732 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Initialize connection to node 127.0.0.1:9092 (id: -1 rack: null) for sending metadata request
01:07:34.734 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Initiating connection to node 127.0.0.1:9092 (id: -1 rack: null) using address /127.0.0.1
01:07:34.745 [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
01:07:35.150 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Completed connection to node -1. Fetching API versions.
01:07:35.150 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Initiating API versions fetch from node -1.
01:07:35.211 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=0) and timeout 30000 to node -1: {client_software_name=apache-kafka-java,client_software_version=2.6.0,_tagged_fields={}}
01:07:35.297 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received API_VERSIONS response from node -1 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=0): org.apache.kafka.common.requests.ApiVersionsResponse@1e94a96
01:07:35.300 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Recorded API versions for node -1: (Produce(0): 0 to 9 [usable: 8], Fetch(1): 0 to 13 [usable: 11], ListOffsets(2): 0 to 7 [usable: 5], Metadata(3): 0 to 12 [usable: 9], LeaderAndIsr(4): 0 to 5 [usable: 4], StopReplica(5): 0 to 3 [usable: 3], UpdateMetadata(6): 0 to 7 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 7], FindCoordinator(10): 0 to 4 [usable: 3], JoinGroup(11): 0 to 7 [usable: 7], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 4], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 5], DeleteTopics(20): 0 to 6 [usable: 4], DeleteRecords(21): 0 to 2 [usable: 2], InitProducerId(22): 0 to 4 [usable: 3], OffsetForLeaderEpoch(23): 0 to 4 [usable: 3], AddPartitionsToTxn(24): 0 to 3 [usable: 1], AddOffsetsToTxn(25): 0 to 3 [usable: 1], EndTxn(26): 0 to 3 [usable: 1], WriteTxnMarkers(27): 0 to 1 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 2 [usable: 2], CreateAcls(30): 0 to 2 [usable: 2], DeleteAcls(31): 0 to 2 [usable: 2], DescribeConfigs(32): 0 to 4 [usable: 3], AlterConfigs(33): 0 to 2 [usable: 1], AlterReplicaLogDirs(34): 0 to 2 [usable: 1], DescribeLogDirs(35): 0 to 2 [usable: 2], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 2], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 2 [usable: 2], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], DescribeClientQuotas(48): 0 to 1 [usable: 0], AlterClientQuotas(49): 0 to 1 [usable: 0], UNKNOWN(50): 0, UNKNOWN(51): 0, UNKNOWN(56): 0, UNKNOWN(57): 0, UNKNOWN(60): 0, UNKNOWN(61): 0, UNKNOWN(65): 0, UNKNOWN(66): 0, UNKNOWN(67): 0)
01:07:35.302 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='Topic1')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 127.0.0.1:9092 (id: -1 rack: null)
01:07:35.303 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=1) and timeout 30000 to node -1: {topics=[{name=Topic1,_tagged_fields={}}],allow_auto_topic_creation=true,include_cluster_authorized_operations=false,include_topic_authorized_operations=false,_tagged_fields={}}
01:07:35.307 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received METADATA response from node -1 for request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=1): org.apache.kafka.common.requests.MetadataResponse@15c2241
01:07:35.320 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updating last seen epoch for partition Topic1-0 from null to epoch 0 from new metadata
01:07:35.321 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updating last seen epoch for partition Topic1-5 from null to epoch 0 from new metadata
01:07:35.321 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updating last seen epoch for partition Topic1-4 from null to epoch 0 from new metadata
01:07:35.321 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updating last seen epoch for partition Topic1-1 from null to epoch 0 from new metadata
01:07:35.321 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updating last seen epoch for partition Topic1-2 from null to epoch 0 from new metadata
01:07:35.321 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updating last seen epoch for partition Topic1-3 from null to epoch 0 from new metadata
01:07:35.330 [main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Cluster ID: eGXb3QmDTx6bRvJjn3HzXQ
01:07:35.330 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updated cluster metadata updateVersion 2 to MetadataCache{clusterId='eGXb3QmDTx6bRvJjn3HzXQ', nodes={0=CVDI0W82P1241.corp.cox.com:9092 (id: 0 rack: null)}, partitions=[PartitionMetadata(error=NONE, partition=Topic1-4, leader=Optional[0], leaderEpoch=Optional[0], replicas=0, isr=0, offlineReplicas=), PartitionMetadata(error=NONE, partition=Topic1-5, leader=Optional[0], leaderEpoch=Optional[0], replicas=0, isr=0, offlineReplicas=), PartitionMetadata(error=NONE, partition=Topic1-0, leader=Optional[0], leaderEpoch=Optional[0], replicas=0, isr=0, offlineReplicas=), PartitionMetadata(error=NONE, partition=Topic1-1, leader=Optional[0], leaderEpoch=Optional[0], replicas=0, isr=0, offlineReplicas=), PartitionMetadata(error=NONE, partition=Topic1-2, leader=Optional[0], leaderEpoch=Optional[0], replicas=0, isr=0, offlineReplicas=), PartitionMetadata(error=NONE, partition=Topic1-3, leader=Optional[0], leaderEpoch=Optional[0], replicas=0, isr=0, offlineReplicas=)], controller=CVDI0W82P1241.corp.cox.com:9092 (id: 0 rack: null)}
01:07:35.340 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FindCoordinator request to broker CVDI0W82P1241.corp.cox.com:9092 (id: 0 rack: null)
01:07:35.355 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Initiating connection to node CVDI0W82P1241.corp.cox.com:9092 (id: 0 rack: null) using address CVDI0W82P1241.corp.cox.com/10.62.19.219
01:07:35.358 [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 0
01:07:35.358 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Completed connection to node 0. Fetching API versions.
01:07:35.358 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Initiating API versions fetch from node 0.
01:07:35.358 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=3) and timeout 30000 to node 0: {client_software_name=apache-kafka-java,client_software_version=2.6.0,_tagged_fields={}}
01:07:35.361 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received API_VERSIONS response from node 0 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=3): org.apache.kafka.common.requests.ApiVersionsResponse@12c9a9c
01:07:35.362 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Recorded API versions for node 0: (Produce(0): 0 to 9 [usable: 8], Fetch(1): 0 to 13 [usable: 11], ListOffsets(2): 0 to 7 [usable: 5], Metadata(3): 0 to 12 [usable: 9], LeaderAndIsr(4): 0 to 5 [usable: 4], StopReplica(5): 0 to 3 [usable: 3], UpdateMetadata(6): 0 to 7 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 7], FindCoordinator(10): 0 to 4 [usable: 3], JoinGroup(11): 0 to 7 [usable: 7], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 4], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 5], DeleteTopics(20): 0 to 6 [usable: 4], DeleteRecords(21): 0 to 2 [usable: 2], InitProducerId(22): 0 to 4 [usable: 3], OffsetForLeaderEpoch(23): 0 to 4 [usable: 3], AddPartitionsToTxn(24): 0 to 3 [usable: 1], AddOffsetsToTxn(25): 0 to 3 [usable: 1], EndTxn(26): 0 to 3 [usable: 1], WriteTxnMarkers(27): 0 to 1 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 2 [usable: 2], CreateAcls(30): 0 to 2 [usable: 2], DeleteAcls(31): 0 to 2 [usable: 2], DescribeConfigs(32): 0 to 4 [usable: 3], AlterConfigs(33): 0 to 2 [usable: 1], AlterReplicaLogDirs(34): 0 to 2 [usable: 1], DescribeLogDirs(35): 0 to 2 [usable: 2], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 2], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 2 [usable: 2], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], DescribeClientQuotas(48): 0 to 1 [usable: 0], AlterClientQuotas(49): 0 to 1 [usable: 0], UNKNOWN(50): 0, UNKNOWN(51): 0, UNKNOWN(56): 0, UNKNOWN(57): 0, UNKNOWN(60): 0, UNKNOWN(61): 0, UNKNOWN(65): 0, UNKNOWN(66): 0, UNKNOWN(67): 0)
01:07:35.362 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=2) and timeout 30000 to node 0: {key=test-consumer-group,key_type=0,_tagged_fields={}}
01:07:35.364 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FIND_COORDINATOR response from node 0 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=2): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=0, host='CVDI0W82P1241.corp.cox.com', port=9092)
01:07:35.366 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FindCoordinator response ClientResponse(receivedTimeMs=1648669055364, latencyMs=23, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=2), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=0, host='CVDI0W82P1241.corp.cox.com', port=9092))
01:07:35.366 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Discovered group coordinator CVDI0W82P1241.corp.cox.com:9092 (id: 2147483647 rack: null)
01:07:35.366 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Initiating connection to node CVDI0W82P1241.corp.cox.com:9092 (id: 2147483647 rack: null) using address CVDI0W82P1241.corp.cox.com/10.62.19.219
01:07:35.368 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Fetching committed offsets for partitions: [Topic1-0]
01:07:35.371 [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2147483647
01:07:35.372 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Completed connection to node 2147483647. Fetching API versions.
01:07:35.372 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Initiating API versions fetch from node 2147483647.
01:07:35.372 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=5) and timeout 30000 to node 2147483647: {client_software_name=apache-kafka-java,client_software_version=2.6.0,_tagged_fields={}}
01:07:35.374 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received API_VERSIONS response from node 2147483647 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=5): org.apache.kafka.common.requests.ApiVersionsResponse@1f1baf5
01:07:35.378 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Recorded API versions for node 2147483647: (Produce(0): 0 to 9 [usable: 8], Fetch(1): 0 to 13 [usable: 11], ListOffsets(2): 0 to 7 [usable: 5], Metadata(3): 0 to 12 [usable: 9], LeaderAndIsr(4): 0 to 5 [usable: 4], StopReplica(5): 0 to 3 [usable: 3], UpdateMetadata(6): 0 to 7 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 7], FindCoordinator(10): 0 to 4 [usable: 3], JoinGroup(11): 0 to 7 [usable: 7], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 4], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 5], DeleteTopics(20): 0 to 6 [usable: 4], DeleteRecords(21): 0 to 2 [usable: 2], InitProducerId(22): 0 to 4 [usable: 3], OffsetForLeaderEpoch(23): 0 to 4 [usable: 3], AddPartitionsToTxn(24): 0 to 3 [usable: 1], AddOffsetsToTxn(25): 0 to 3 [usable: 1], EndTxn(26): 0 to 3 [usable: 1], WriteTxnMarkers(27): 0 to 1 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 2 [usable: 2], CreateAcls(30): 0 to 2 [usable: 2], DeleteAcls(31): 0 to 2 [usable: 2], DescribeConfigs(32): 0 to 4 [usable: 3], AlterConfigs(33): 0 to 2 [usable: 1], AlterReplicaLogDirs(34): 0 to 2 [usable: 1], DescribeLogDirs(35): 0 to 2 [usable: 2], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 2], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 2 [usable: 2], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], DescribeClientQuotas(48): 0 to 1 [usable: 0], AlterClientQuotas(49): 0 to 1 [usable: 0], UNKNOWN(50): 0, UNKNOWN(51): 0, UNKNOWN(56): 0, UNKNOWN(57): 0, UNKNOWN(60): 0, UNKNOWN(61): 0, UNKNOWN(65): 0, UNKNOWN(66): 0, UNKNOWN(67): 0)
01:07:35.379 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending OFFSET_FETCH request with header RequestHeader(apiKey=OFFSET_FETCH, apiVersion=7, clientId=consumer-test-consumer-group-1, correlationId=4) and timeout 30000 to node 2147483647: {group_id=test-consumer-group,topics=[{name=Topic1,partition_indexes=[0],_tagged_fields={}}],require_stable=true,_tagged_fields={}}
01:07:35.381 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received OFFSET_FETCH response from node 2147483647 for request with header RequestHeader(apiKey=OFFSET_FETCH, apiVersion=7, clientId=consumer-test-consumer-group-1, correlationId=4): org.apache.kafka.common.requests.OffsetFetchResponse@1edc48a
01:07:35.384 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Setting offset for partition Topic1-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[CVDI0W82P1241.corp.cox.com:9092 (id: 0 rack: null)], epoch=0}}
01:07:35.390 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Added READ_UNCOMMITTED fetch request for partition Topic1-0 at position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[CVDI0W82P1241.corp.cox.com:9092 (id: 0 rack: null)], epoch=0}} to node CVDI0W82P1241.corp.cox.com:9092 (id: 0 rack: null)
01:07:35.390 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Built full fetch (sessionId=INVALID, epoch=INITIAL) for node 0 with 1 partition(s).
01:07:35.391 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending READ_UNCOMMITTED FullFetchRequest(Topic1-0) to broker CVDI0W82P1241.corp.cox.com:9092 (id: 0 rack: null)
01:07:35.393 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-test-consumer-group-1, correlationId=6) and timeout 30000 to node 0: {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=0,session_epoch=0,topics=[{topic=Topic1,partitions=[{partition=0,current_leader_epoch=0,fetch_offset=1,log_start_offset=-1,partition_max_bytes=1048576}]}],forgotten_topics_data=[],rack_id=}
01:07:35.708 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending asynchronous auto-commit of offsets {Topic1-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}}
01:07:35.711 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending OFFSET_COMMIT request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=consumer-test-consumer-group-1, correlationId=7) and timeout 30000 to node 2147483647: {group_id=test-consumer-group,generation_id=-1,member_id=,group_instance_id=null,topics=[{name=Topic1,partitions=[{partition_index=0,committed_offset=1,committed_leader_epoch=-1,committed_metadata=,_tagged_fields={}}],_tagged_fields={}}],_tagged_fields={}}
01:07:35.714 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received OFFSET_COMMIT response from node 2147483647 for request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=consumer-test-consumer-group-1, correlationId=7): OffsetCommitResponseData(throttleTimeMs=0, topics=[OffsetCommitResponseTopic(name='Topic1', partitions=[OffsetCommitResponsePartition(partitionIndex=0, errorCode=0)])])
01:07:35.714 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Committed offset 1 for partition Topic1-0
01:07:35.715 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Completed asynchronous auto-commit of offsets {Topic1-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}}
01:07:35.918 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FETCH response from node 0 for request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-test-consumer-group-1, correlationId=6): org.apache.kafka.common.requests.FetchResponse@d7f8b4
01:07:35.919 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Node 0 sent a full fetch response that created a new incremental fetch session 2000578609 with 1 response partition(s)
01:07:35.925 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Added READ_UNCOMMITTED fetch request for partition Topic1-0 at position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[CVDI0W82P1241.corp.cox.com:9092 (id: 0 rack: null)], epoch=0}} to node CVDI0W82P1241.corp.cox.com:9092 (id: 0 rack: null)
01:07:35.926 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Built incremental fetch (sessionId=2000578609, epoch=1) for node 0. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
01:07:35.926 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(Topic1-0)) to broker CVDI0W82P1241.corp.cox.com:9092 (id: 0 rack: null)
01:07:35.926 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-test-consumer-group-1, correlationId=8) and timeout 30000 to node 0: {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=2000578609,session_epoch=1,topics=[],forgotten_topics_data=[],rack_id=}
01:07:36.439 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Node 0 sent an incremental fetch response with throttleTimeMs = 0 for session 2000578609 with 0 response partition(s), 1 implied partition(s)
01:07:36.440 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Added READ_UNCOMMITTED fetch request for partition Topic1-0 at position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[CVDI0W82P1241.corp.cox.com:9092 (id: 0 rack: null)], epoch=0}} to node CVDI0W82P1241.corp.cox.com:9092 (id: 0 rack: null)
01:07:36.440 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Built incremental fetch (sessionId=2000578609, epoch=2) for node 0. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
01:07:36.714 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending asynchronous auto-commit of offsets {Topic1-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}}
01:07:36.717 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received OFFSET_COMMIT response from node 2147483647 for request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=consumer-test-consumer-group-1, correlationId=10): OffsetCommitResponseData(throttleTimeMs=0, topics=[OffsetCommitResponseTopic(name='Topic1', partitions=[OffsetCommitResponsePartition(partitionIndex=0, errorCode=0)])])
01:07:36.717 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Committed offset 1 for partition Topic1-0

And I can send msg from my producer and able to verify through CMD command. I have 5 partition to Topic1 and partition 0 and 1 contain a single entry.I am trying to get that value in my java code mentioned above. Please let me know where I Am wrong?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source