'How to guarantee sequence of execution of multiple sinks in spark structured streaming

In my scenario, I have a structured streaming application which reads from kafka and writes to hdfs and kafka using 3 different sinks. Primary sink is the hdfs one and others are secondary. I want the primary sink to run first and then secondary sinks. All have a triggertime of 60seconds. Is there a way to achieve that in spark structured streaming. Adding the code snippet:

val spark = SparkSession
      .builder
      .master(StreamerConfig.sparkMaster)
      .appName(StreamerConfig.sparkAppName)
      .getOrCreate()

    spark.conf.set("spark.sql.orc.impl", "native")
  spark.conf.set("spark.streaming.stopGracefullyOnShutdown","true")
    spark.conf.set("spark.sql.files.ignoreCorruptFiles","true")
    spark.conf.set("spark.dynamicAllocation.enabled","true")
    spark.conf.set("spark.shuffle.service.enabled","true")

val readData = spark
    .readStream
    .format("kafka")   .option("kafka.bootstrap.servers",StreamerConfig.kafkaBootstrapServer)
    .option("subscribe",StreamerConfig.topicName)
    .option("failOnDataLoss", false)
    .option("startingOffsets",StreamerConfig.kafkaStartingOffset)  .option("maxOffsetsPerTrigger",StreamerConfig.maxOffsetsPerTrigger)
    .load()

  val deserializedRecords = StreamerUtils.deserializeAndMapData(readData,spark)

  
  val streamingQuery = deserializedRecords.writeStream
    .queryName(s"Persist data to hive table for ${StreamerConfig.topicName}")
    .outputMode("append")
    .format("orc")
    .option("path",StreamerConfig.hdfsLandingPath)
    .option("checkpointLocation",StreamerConfig.checkpointLocation)
    .partitionBy("date","hour")
    .option("truncate","false")
    .trigger(Trigger.ProcessingTime(StreamerConfig.triggerTime))
    .start()

    
    deserializedRecords.select(to_json(struct("*")).alias("value"))
      .writeStream
      .format("kafka") // Local Testing - "console"
      .option("topic", StreamerConfig.watermarkKafkaTopic)
      .option("kafka.bootstrap.servers", StreamerConfig.kafkaBroker)
      .option("checkpointLocation", StreamerConfig.phase1Checkpoints)
      .trigger(Trigger.ProcessingTime(StreamerConfig.triggerTime))
      .start()
  

  
    deserializedRecords.select(to_json(struct("*")).alias("value"))
      .writeStream
      .format("kafka") // Local Testing - "console"
      .option("topic", StreamerConfig.watermarkKafkaTopic)
      .option("kafka.bootstrap.servers", StreamerConfig.kafkaBroker)
      .option("checkpointLocation", StreamerConfig.phase2Checkpoints)
      .trigger(Trigger.ProcessingTime(StreamerConfig.triggerTime))
      .start()

PS: I am using spark 2.3.2



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source