'Use column from source dataframe to write into multiple targets
I'm new to spark streaming and I need to write to kibana indexes by determining the name of that index from data coming from kafka.
I have my read function :
def readStreamFromKafka(spark: SparkSession, topicName: String, checkpointPath: String): DataFrame = {
spark
.readStream
.format("kafka")
.options(kafkaSourceOptions())
.option("multiLine", value = true)
.option("topic", topicName)
.option("subscribe", topicName)
.option("startingOffsets", "latest")
.option("checkpointLocation", checkpointPath)
.option("failOnDataLoss", "false") //true
.load()
.selectExpr( "CAST(value AS STRING) as value","CAST(key AS STRING) as key", "topic", "partition", "offset", "timestamp")
}
and my write function :
def writeELK(df: DataFrame, indexOutput: String, checkpointPath: String): StreamingQuery = {
df.writeStream
.format("es")
.outputMode("append")
.option("checkpointLocation", checkpointPath)
.option("es.resource",indexOutput)
.start()
}
The writing works well when the name of the index is fixed (indexOutput) but I don't know how to use a column named "index_name" from my source dataframe in order to write into multiple indexes.
Regards
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
