Category "apache-kafka"

get count of partitions in a kafka topic with scala 2.12

With scala 2.11 and spark-streaming-kafka-0-8_2.11 I could do import org.apache.spark.streaming.kafka.KafkaCluster val params = Map[String, Object]( "bootstr

Found directory not in the form of topic-partition . Kafka's log directories (and children) should only contain Kafka topic data

[2021-04-05 07:51:32,180] ERROR There was an error in one of the threads during logs loading: org.apache.kafka.common.KafkaException: Found directory /var/lib/k

Why kafka write null data to the database?

I am working on a nestjs project. My project gets data from Kafka's topic and writes the data to the database (mysql). If I read hundreds of messages from Kafka

How to test Kafka OnFailure callback with Junit?

I have the following code to send data to Kafka: @Service public class KafkaSender{ @Autowired private KafkaTemplate<String, Employee> kafkaTempla

Spark + Read kafka topic from a specific offset based on timestamp

How do I set a spark job to pick up a kafka topic from a specific offset based on a timestamp ? Let's say that I need to get all data from a kafka topic startin

Apache Kafka - Implementing a KTable

I am new to Kafka Streams API and I am trying to create a KTable. I have an input topic: s-order-topic, which is a json format message, as shown below. { "curr

Is it possible to query a KSQL Table/Materialized view via HTTP?

I have a materialized view created using CREATE TABLE average_latency AS SELECT DEVICENAME, AVG(LATENCY) AS AVG_LATENCY FROM metrics WINDOW TUMBLING (SIZE 1 MIN

kafka consumer is not listening from command line

I am using kafka 1.0.0V In my project. From yesterday on wards. I am unable to listen Messages from command line . In the same time I am able to listen the mess

How to do stream processing with Redpanda?

Redpanda seems easy to work with, but how would one process streams in real-time? We have a few thousand IoT devices that send us data every second. We would li

What is the difference between kafka earliest and latest offset values

producer sends messages 1, 2, 3, 4 consumer receives messages 1, 2, 3, 4 consumer crashes/disconnects producer sends messages 5, 6, 7 consumer comes back up

How to drain the window after a Flink join using coGroup()?

I'd like to join data coming in from two Kafka topics ("left" and "right"). Matching records are to be joined using an ID, but if a "left" or a "right" record i

Table-Table Join duplicate entries

we are using kafka in production and I try to push the adoption and usage of KSQL in the same direction. But I already failed with one simple table-table join.

Kafka consumer unit test with Avro Schema registry failing

I'm writing a consumer which listens to a Kafka topic and consumes message whenever message is available. I've tested the logic/code by running Kafka locally an

Clickhouse Kafka engine on cluster

I'm playing with Kafka engine on ClickHouse cluster. At the moment ClickHouse 22.1 cluster and Kafka are run in Docker. Here are configurations: https://github.

Can Kafka Connect consume data from a separate kerberized Kafka instance and then route to Splunk?

My pipeline is: Kerberized Kafka --> Logstash (hosted on a different server) --> Splunk. Can I replace the Logstash component with Kafka Connect? Could

How does kafka consumer auto commit work?

I am reading this one: Automatic Commit The easiest way to commit offsets is to allow the consumer to do it for you. If you configure enable.auto.commit=t

How to get org.apache.kafka.connect.data.Decimal value from Kafka JSON message [duplicate]

I use debizium to stream postgresql data to Kafka, and use Java to subscribe Kafka topic. I receive Kafka message and get a JSON string, but

How to deserialize BigDecimal value received from kafka broker through debezium CDC mechanism?

I have a couple of microservices developed using spring boot and each has its own Postgres database. These microservices exchange data with a CDC mechanism prov

Why does co-partitioning of two Kstreams in kafka require same number of partitions for both the streams?

I wanted to know why does co-partitioning of two Kstreams in kafka require same number of partitions for both the streams as is given in the documentation in be

Streams Processor key-value types

I'm trying to implement a custom topology processing step implementing the Processor interface and then adding an instance of my custom processor to the topolog