'Why is my Spring kafka consumer slow at consuming records?

So I have a spring kafka setup so that it polls for 5000 records each fetch.

  • Deserialization of those 5000 records takes about 2 seconds
  • Processing on the listener for those 5000 records takes about 120 seconds

That means I can only handle about 40 records per second, which is pretty low.

But when I check actuator on how long the listener method takes to execute, it says in average 0.00013 seconds, that times 5000 should be 0.65 seconds. So I imagine spring is taking around 119.45 seconds do handle those records to my listener.

Anyone could please help me understand the situation, I should be much faster processing those but I can't see the reason why that is not the case.

My current configuration is as follows:

@EnableKafka
@Configuration
public class KafkaConfiguration {

    @Bean
    public ConsumerFactory<String, MarketChangesProtocolBuffer.MarketChanges> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress );
        props.put( ConsumerConfig.GROUP_ID_CONFIG, "test-group" );
        props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class );
        props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                ErrorHandlingDeserializer.class );
        props.put( ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
                MarketChangesDeserializer.class );
        props.put( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest" );
        props.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );
        props.put( ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000 );
        props.put( ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, 1000 );
        props.put( ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 10000 );
        return new DefaultKafkaConsumerFactory<>( props );
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MarketChangesProtocolBuffer.MarketChanges>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, MarketChangesProtocolBuffer.MarketChanges> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory( consumerFactory() );
        factory.getContainerProperties().setPollTimeout( 3000 );
        factory.getContainerProperties().setAckMode( ContainerProperties.AckMode.RECORD );
        factory.setErrorHandler( new SeekToCurrentErrorHandler(
                new FixedBackOff( 2000, 2 ) ) );
        return factory;
    }

And the listener:

@Component
public class ScoringListenerService {

    @KafkaListener(topics = "test-topic", groupId = "test-group")
    public void listenGroup( MarketChangesProtocolBuffer.MarketChanges marketChanges,
            ConsumerRecordMetadata metadata ) {
        // some business logic...
    }
}


Solution 1:[1]

It's unlikely to be Spring:

@SpringBootApplication
public class So71681525Application {

    public static void main(String[] args) {
        SpringApplication.run(So71681525Application.class, args);
    }

    int count;

    StopWatch watch = new StopWatch();

    @KafkaListener(id = "so71681525", topics = "so71681525")
    void listen(String in) {
        if (count++ == 1) {
            this.watch.start();
        }
        else if (count == 50_000) {
            perf();
        }
    }

    private void perf() {
        this.watch.stop();
        System.out.println(this.watch.prettyPrint());
        System.out.println(this.count + " @ " + (int) ((count) / (this.watch.getTotalTimeSeconds()) / 1000) + "k/s");
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so71681525").partitions(1).replicas(1).build();
    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 50_000).forEach(i -> template.send("so71681525", "foo"));
        };
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.max-poll-records=5000
StopWatch '': running time = 140801968 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
140801968  100%  

50000 @ 355k/s

You need to profile your application.

Solution 2:[2]

Turns out the issue is the acknowledge mode, since I am using AckMode.RECORD, each time the listener is called on the return a synchronous call is made to commit the offset and Spring waits for the response before processing the next record. Changing it to AckMode.BATCH greatly increases performance as commits are posponed untill all messages returned on the poll are processed.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Gary Russell
Solution 2 Marcelo Canaparro