'Distinct Count on Column in Dataset in Structured Streaming

I am New in Structure Streaming Topic. so facing issue while calculating distinct count in column in Dataset/Dataframe.

//DataFrame

val readFromKafka = sparksession.readStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServer).option("subscribe", topic).option("failOnDataLoss", "false").load()

//Dataset[(String, Timestamp)]

val selected_readFromKafka = inPutStreamData.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]

//Dataset[(Int, Int, String, Timestamp)]

val final_Data = selected_readFromKafka.map(f => {
      val no1 = f._1.split('|').apply(0).toInt
      val no2 = f._1.split('|').apply(1).toInt
      val data = f._1.split('|').apply(2)
      (no1, no2, data, f._2)
    })

So how can we calculate distinct count on no1,no2 column final_Data.

val count = final_Data...... ?

Use count variable in below map.

val selected_readFromKafka_Next = selected_readFromKafka.map(f => {
      KafkaOutputResponse(
          count,
          "","",f.data
       )
    })


selected_readFromKafka_Next.writeStream.format("console").option("truncate", "false").start().awaitTermination()

Thanks in advance.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source