'How do I receive a subset of data from Kafka?
I know we can filter the data in our consumer/streaming programs. But what I am looking for here is a solution to filter the data from Kafka broker itself.
Problem Statement: I have a use case to fetch the whole content of data present in Kafka and also a subset of this data. So when I create a Consumer, I need to pass a filter query so that I will receive only the filtered data.
I have tried to achieve it using Python consumer and Py spark streaming programs. But I couldn't achieve it. I have shared below the code snippet I have tried.
Python Program
from kafka import KafkaConsumer
from json import loads
from time import sleep
consumer = KafkaConsumer(
'mytopic',
bootstrap_servers=['kafka:9092'],
auto_offset_reset='latest',
enable_auto_commit=True,
group_id='my-group-id',
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
for event in consumer:
event_data = event.value
# Do whatever you want
print(event_data)
sleep(0.5)
Py Spark Program
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, StructType, StructField
import json
master="local"
appname="kafka-spark-streaming"
spark = SparkSession.builder.master(master).appName(appname).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka:9092").option("subscribe", "mytopic").option("startingOffsets", "latest").load()
df=df.selectExpr("CAST(value AS STRING)")
df.show()
In either case, the offset value latest will give me the streaming data. and earliest will give me the whole data present in Kafka including the streaming data.
Solution 1:[1]
If you mean all / subset of a topic, as in offsets, then you must seek and stop the consumer at the offsets manually.
If you want to filter subsets of each individual record, for both solutions, you need to further parse your data.
For example, in plain Python, you would write if statements within your consumer loop based on the content of the data to ignore certain records. In your case, event.value should be a dictionary.
Similarly in Spark, you'd need to use a UDF, such as from_json to parse your string value column into a further Struct, then you can use filter dataframe function, as with any other SparkSQL pipeline
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
