'How to get threadlocal for concurrency consumer?

I am developing spring kafka consumer. Due to message volume, I need use concurrency to make sure throughput. Due to used concurrency, I used threadlocal object to save thread based data. Now I need remove this threadlocal object after use it. Spring document with below links suggested to implement a EventListener which listen to event ConsumerStoppedEvent . But did not mention any sample eventlistener code to get threadlocal object and remove the value. May you please let me know how to get the threadlocal instance in this case? Code samples will be appreciated. https://docs.spring.io/spring-kafka/docs/current/reference/html/#thread-safety



Solution 1:[1]

Something like this:

@SpringBootApplication
public class So71884752Application {

    public static void main(String[] args) {
        SpringApplication.run(So71884752Application.class, args);
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("topic1").partitions(2).build();
    }

    @Component
    static class MyListener implements ApplicationListener<ConsumerStoppedEvent> {

        private static final ThreadLocal<Long> threadLocalState = new ThreadLocal<>();

        @KafkaListener(topics = "topic1", groupId = "my-consumer", concurrency = "2")
        public void listen() {
            long id = Thread.currentThread().getId();
            System.out.println("set thread id to ThreadLocal: " + id);
            threadLocalState.set(id);
        }

        @Override
        public void onApplicationEvent(ConsumerStoppedEvent event) {
            System.out.println("Remove from ThreadLocal: " + threadLocalState.get());
            threadLocalState.remove();
        }

    }

}

So, I have two concurrent listener containers for those two partitions in the topic. Each of them is going to call this my @KafkaListener method anyway. I store the thread id into the ThreadLocal. For simple use-case and testing the feature.

The I implement ApplicationListener<ConsumerStoppedEvent> which is emitted in the appropriate consumer thread. And that one helps me to extract ThreadLocal value and clean it up in the end of consumer life.

The test against embedded Kafka looks like this:

@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
@DirtiesContext
class So71884752ApplicationTests {

    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Test
    void contextLoads() throws InterruptedException {
        this.kafkaTemplate.send("topic1", "1", "foo");
        this.kafkaTemplate.send("topic1", "2", "bar");
        this.kafkaTemplate.flush();

        Thread.sleep(1000); // Give it a chance to consume data

        this.kafkaListenerEndpointRegistry.stop();
    }

}

Right. It doesn't verify anything, but it demonstrate how that event can happen.

I see something like this in log output:

set thread id to ThreadLocal: 125
set thread id to ThreadLocal: 127
...
Remove from ThreadLocal: 125
Remove from ThreadLocal: 127

So, whatever that doc says is correct.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Artem Bilan