'structured streaming read based on kafka partitions

I am using spark structured Streaming to Read incoming messages from a Kafka topic and write to multiple parquet tables based on the incoming message So i created a single readStream as Kafka source is common and for each parquet table created separate write stream in a loop . This works fine but the readstream is creating a bottleneck as for each writeStream it create a readStream and there is no way to cache the dataframe which is already read.

val kafkaDf=spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", conf.servers)
      .option("subscribe", conf.topics)
      //  .option("earliestOffset","true")
      .option("failOnDataLoss",false)
      .load()

foreach table   {  
//filter the data from source based on table name
//write to parquet
 parquetDf.writeStream.format("parquet")
        .option("path", outputFolder + File.separator+ tableName)
        .option("checkpointLocation", "checkpoint_"+tableName)
        .outputMode("append")
        .trigger(Trigger.Once())
       .start()
}

Now every write stream is creating a new consumer group and reading entire data from Kafka and then doing the filter and writing to Parquet. This is creating a huge overhead. To avoid this overhead, I can partition the Kafka topic to have as many partitions as the number of tables and then the readstream should only read from a given partition. But I don't see a way to specify partition details as part of Kafka read stream.



Solution 1:[1]

if data volume is not that high, write your own sink, collect data of each micro-batch , then you should be able to cache that dataframe and write to different locations, need some tweaks though but it will work

Solution 2:[2]

you can use foreachBatch sink and cache the dataframe. Hopefully it works

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 dunlu_98k
Solution 2 Learnis