'Access record's partition numebr in kafka streams
I am using Kafka 2.6 with spring cloud stream kafka-streams binder. I want to access record headers, partition no etc in my Kafka streams application. I read about using Processor API, using ProcessorContext etc. But everytime ProcessorContext object is coming null.
Below is the code
@StreamListener(Bindings.input)
@SendTo(Bindings.output)
public KStream<String, String> process(KStream<String, String> input)
{
return input.transform(new TransformerSupplier<String, String, KeyValue<String, String>>()
{
public Transformer<String, String, KeyValue<String, String>> get()
{
return new Transformer<String, String, KeyValue<String, String>>()
{
private int total = 0;
ProcessorContext context;
@Override
public void close() {
}
@Override
public void init(org.apache.kafka.streams.processor.ProcessorContext pc)
{
this.context = context;
}
@Override
public KeyValue<String, String> transform(String k, String v)
{
System.out.println("ProcessorContext: "+this.context);
System.out.println("value: "+v);
return new KeyValue<>(k, v);
}
};
}
});
}
In this code ProcessorContext is always printed as null. I also tried using ListenerContainerCustomizer for spring-boot. But that also is not working
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer()
{
return (container, dest, group) ->
{
container.setRecordInterceptor(record ->
{
System.out.println(">>>> Received record, checking headers");
Headers headers = record.headers();
System.out.println(">>>> Header length: " + headers.toArray().length);
for (Header header : headers)
{
if (header.key().equalsIgnoreCase("eventtype"))
{
String value = String.valueOf(header.value());
if (!value.equalsIgnoreCase("PUBLISHED"))
{
System.out.println("Event type from header not PUBLISHED, skipping record");
return null;
}
}
}
System.out.println("Processing record");
return record;
});
};
}
I printed list of beans registered with beans I could see above one. But it never works. I anyways require first approach to work since I like to run some business logic with partition number.
Please help badly stuck since many days.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
