'structured streaming read based on kafka partitions
I am using spark structured Streaming to Read incoming messages from a Kafka topic and write to multiple parquet tables based on the incoming message So i created a single readStream as Kafka source is common and for each parquet table created separate write stream in a loop . This works fine but the readstream is creating a bottleneck as for each writeStream it create a readStream and there is no way to cache the dataframe which is already read.
val kafkaDf=spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.servers)
.option("subscribe", conf.topics)
// .option("earliestOffset","true")
.option("failOnDataLoss",false)
.load()
foreach table {
//filter the data from source based on table name
//write to parquet
parquetDf.writeStream.format("parquet")
.option("path", outputFolder + File.separator+ tableName)
.option("checkpointLocation", "checkpoint_"+tableName)
.outputMode("append")
.trigger(Trigger.Once())
.start()
}
Now every write stream is creating a new consumer group and reading entire data from Kafka and then doing the filter and writing to Parquet. This is creating a huge overhead. To avoid this overhead, I can partition the Kafka topic to have as many partitions as the number of tables and then the readstream should only read from a given partition. But I don't see a way to specify partition details as part of Kafka read stream.
Solution 1:[1]
if data volume is not that high, write your own sink, collect data of each micro-batch , then you should be able to cache that dataframe and write to different locations, need some tweaks though but it will work
Solution 2:[2]
you can use foreachBatch sink and cache the dataframe. Hopefully it works
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | dunlu_98k |
| Solution 2 | Learnis |
