'Access record's partition numebr in kafka streams

I am using Kafka 2.6 with spring cloud stream kafka-streams binder. I want to access record headers, partition no etc in my Kafka streams application. I read about using Processor API, using ProcessorContext etc. But everytime ProcessorContext object is coming null.

Below is the code

@StreamListener(Bindings.input)
@SendTo(Bindings.output)
public KStream<String, String> process(KStream<String, String> input)
{
    return input.transform(new TransformerSupplier<String, String, KeyValue<String, String>>()
    {
        public Transformer<String, String, KeyValue<String, String>> get() 
        {
            return new Transformer<String, String, KeyValue<String, String>>() 
            {

                private int total = 0;
                ProcessorContext context;

                @Override
                public void close() {
                }

                @Override
                public void init(org.apache.kafka.streams.processor.ProcessorContext pc)
                {
                    this.context = context;
                }

                @Override
                public KeyValue<String, String> transform(String k, String v)
                {
                    System.out.println("ProcessorContext: "+this.context);
                    System.out.println("value: "+v);
                    return new KeyValue<>(k, v);
                }
            };
        }
    });
}

In this code ProcessorContext is always printed as null. I also tried using ListenerContainerCustomizer for spring-boot. But that also is not working

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer()
{
    return (container, dest, group) ->
    {
        container.setRecordInterceptor(record ->
        {
            System.out.println(">>>> Received record, checking headers");
            Headers headers = record.headers();
            System.out.println(">>>> Header length: " + headers.toArray().length);
            for (Header header : headers)
            {
                if (header.key().equalsIgnoreCase("eventtype"))
                {
                    String value = String.valueOf(header.value());
                    if (!value.equalsIgnoreCase("PUBLISHED"))
                    {
                        System.out.println("Event type from header not PUBLISHED, skipping record");
                        return null;
                    }
                }
            }
            System.out.println("Processing record");
            return record;
        });
    };
}

I printed list of beans registered with beans I could see above one. But it never works. I anyways require first approach to work since I like to run some business logic with partition number.

Please help badly stuck since many days.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source