Category "apache-kafka"

Structured Streaming to Save JSON to HDFS

My Structured Spark Streaming program is to read JSON data from Kafka and write to HDFS in JSON format. I am able to save JSON to HDFS but it saves the JSON st

No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory<java.lang.Object, java.lang.Object>'

I use this kafka configuration with spring cloud and spring boot 2.6.6: @Configuration @RefreshScope public class KafkaProducerConfig { @Bean(name = "nativeP

How to get kafka offset data, specified on timestamp

I've tried to get the offset from Kafka topic based on timestamp when I tried to run it was throwing null pointer error, Map<TopicPartition, Long> timest

How to query the state store in the Kafka Streams DSL to implement consumer idempotency

I'm working in an scenario where duplicated messages could arrive at a consumer (a KStream application). To use the typical case let's suppose it's an OrderCrea

How to monitor the amount of messages in a Kafka topic per day?

I have a Kafka cluster with a topic that receives thousands of messages a day and I want to see how many messages went in the topic per date. I'm using JMX expo

Kafka streams - Concatenate Predicate based on dynamic number of conditions

I'm a bit new in Java so I would appreciate advice to deal with multiple conditions in Kafka Predicates. I've the following code which I'm able to have dynamic

Kafka connector and Schema Registry - Error Retrieving Avro Schema - Subject not found

I have a topic that will eventually have lots of different schemas on it. For now it just has the one. I've created a connect job via REST like this: { "name"

Flink Python Datastream API Kafka Producer Sink Serializaion

I'm trying to read data from one kafka topic and writing to another after making some processing. I'm able to read data and process it when i try to write it to

Kafka producer config: Why should be larger than

From Kafka doc , it says that: The configuration controls the maximum amount of time the client

java.lang.RuntimeException: Failed to resolve Oracle database version

I am using debezium oracle connector in kafka connect.While starting connector I am getting below error, java.lang.RuntimeException: Failed to resolve Oracle da

Helm range yaml template kafka topics

I am new to helm and I am trying to generate different topics for kafka with a range function to not have a yaml file for each topic: I have different topics (t

This error handler cannot process 'SerializationException's directly;

The image below is my topic. Every once in a while, a value other than an empty or json is returned. If it is null, it throws an error as below and enters the

How to properly test kafkaTemplate.send() within a function in Junit5?

I'm learning how to write tests and especially tests that have a producer in it. I cannot post all the classes because it's HUGE (and not mine, I should just pr

kafka failed authentication due to: SSL handshake failed

I have to add encryption and authentication with SSL in kafka. This is what I have done: Generate certificate for each broker kafka: keytool -keystore server

Docker - library initialization failed - unable to allocate file descriptor table - out of memory

I have been try to run Zookeeper and Kafka on Docker container. I got a lot of errors [error occurred during error reporting , id 0xb] and [Too many errors, abo

KTable-KTable foreign-key join not producing all messages when topics have more than one partition

See Update below to show potential workaround Our application consumes 2 topics as KTables, performs a left join, and outputs to a topic. During testing, we fou

Confluent Kafka Connector Configuration with C# / .net

I'm trying to build a connector to go into the confluent kafka library. I have seen many examples in java that use configDef to define the configuration options

Extract particular data from Kafka topic

I'm doing real time streaming on Twitter and wonder is there a way to extract only messages and certain values from Kafka topic?

Create pyFlink DataStream Consumer from Tweets Kafka Producer in Python

I want to create I stream kafka consumer in pyFlink, which can read tweets data after deserialization (json), I have pyflink version 1.14.4 (last version) Can I

How do I serialize non-ascii characters in pykafka?

I have a defauldict I want to serialize as json. But some characters then replaced with unicode escaped charasters. For example, if I have string "International