Category "spring-kafka"

Kafka Consumer - Point to specific offset in Spring boot Kafka

I'm trying to seek the offset to some specific offset. It seems I need to use seekToTimestamp and there is no seekToOffset or something. I've research a lot but

JSON array serialization issue after spring-cloud-stream update

I use spring-cloud-stream-binder-kafka to connect to the kafka broker I updated spring-cloud-dependencies version to 2021.0.1 I went from this [INFO] +- org.spr

Is it possible to check Kafka broker and the topic which Im sending message is healthy before sending message

I have a batch application which needs to send message to two different Kafka topic on two different clusters. I want to make sure my kafka broker in which my p

how to send a byte array in kafka with Avro Schema registry

I'm trying to send bytes with a JSON object using a Java POJO and an Avro schema, with schema registry. So my question is, how should my code send "bytes" type

Kafka EOS retry flag

I have a Kafka cluster and a spring boot application that is configured for EOS. The application consumes from topic A performs some business logic then produce

Spring Cloud Stream - Is there a way to customize the StreamThread thread name?

I'm using Spring Cloud Stream (Kafka as the binder) in my current project and the default thread name for StreamThread keeps me bothered as it is very long. Her

Spring cloud streams kafka streams state store

I want to get state store with custom key and value. I have a kafka topic example-kafka-topic-event. This is how I get ktable at the code level: @Component("exa

Profiles property is deprecated is properties.yml for Spring-Kafka

I'm configuring the application.yml file in my Spring Kafka MS and I'm getting a notification that the profiles property is deprecated. This 'profiles' property

Spring Kafka Batch listener not receiving more than 1 or 2 messages

Spring Kafka Batch consumer receives only one or two messages we have increased fetch.min.bytes to 9000 and fetch.max.wait.ms 5000 [Based on this answ

StreamsException: Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory

This is regarding upgrading existing code base in production which uses windowing from kafka-clients,kafka-streams,spring-kafka 2.4.0 to 2.6.x and also upgradi

DefaultErrorHandler doesn't log not retryable exceptions

Not retryable exceptions are not logged in DefaultErrorHandler when used with DeadLetterPublishingRecoverer. @Bean public CommonErrorHandler consumerErrorHandle

Can one Kafka Producer Class have multiple @EventListener methods?

Im using Spring Kafka and wrote Producer Class @Component @RequiredArgsConstructor class Producer { private static final String TOPIC = "channels"; pri

Can one Kafka Producer Class have multiple @EventListener methods?

Im using Spring Kafka and wrote Producer Class @Component @RequiredArgsConstructor class Producer { private static final String TOPIC = "channels"; pri

Spring Kafka serialiser config

I upgraded from spring-kafka 2.7.2 to 2.8.4. I found my code broke where I had created my serialiser up front in my tests. var serialiser = new KafkaAv

How to stop listening in a Spring Kafka consumer?

I use Spring for Apache Kafka. I'd like to stop listening to my topic and wait to escape OOM. How can I do it?

Unable to send Kafka Avro Message to Message Channel <Failed to convert Generic Message to Outbound Message>

We are trying to push a Kafka notification to the external Kafka Topic by sending the Avro Schema Message to the Message Channel. On sending the message to the

No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory<java.lang.Object, java.lang.Object>'

I use this kafka configuration with spring cloud and spring boot 2.6.6: @Configuration @RefreshScope public class KafkaProducerConfig { @Bean(name = "nativeP

This error handler cannot process 'SerializationException's directly;

The image below is my topic. Every once in a while, a value other than an empty or json is returned. If it is null, it throws an error as below and enters the

How to properly test kafkaTemplate.send() within a function in Junit5?

I'm learning how to write tests and especially tests that have a producer in it. I cannot post all the classes because it's HUGE (and not mine, I should just pr

Missing dependency on EmbeddedKafka on integration test with groovy and spock

I've been trying to create an integration test using the embeddedKafka, but I'm getting problem of missing dependency when trying to run it, this is the error: