'Use column from source dataframe to write into multiple targets

I'm new to spark streaming and I need to write to kibana indexes by determining the name of that index from data coming from kafka.

I have my read function :

  def readStreamFromKafka(spark: SparkSession, topicName: String, checkpointPath: String): DataFrame = {
    spark
      .readStream
      .format("kafka")
      .options(kafkaSourceOptions())
      .option("multiLine", value = true)
      .option("topic", topicName)
      .option("subscribe", topicName)
      .option("startingOffsets", "latest")
      .option("checkpointLocation", checkpointPath)
      .option("failOnDataLoss", "false") //true
      .load()
      .selectExpr( "CAST(value AS STRING) as value","CAST(key AS STRING) as key", "topic", "partition", "offset", "timestamp")
  }

and my write function :

    def writeELK(df: DataFrame, indexOutput: String, checkpointPath: String): StreamingQuery = {
    df.writeStream
      .format("es")
      .outputMode("append")
      .option("checkpointLocation", checkpointPath)
      .option("es.resource",indexOutput)
      .start()
  }

The writing works well when the name of the index is fixed (indexOutput) but I don't know how to use a column named "index_name" from my source dataframe in order to write into multiple indexes.

Regards



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source