'Spark-Kafka Stream- Duplicate Kafka messages

0I am using Spark DStreams to consume data from a Kafka topic, which has 5 partitions. Below is code for the same:

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

Object SparkKafkaListener extends Serializable {

   def main(args: Array[String]): Unit = {
     val spark = SparkSession.Builder().enableHiveSupport().getOrCreate()
     val kafkaConfig = Map[String, Object](<Kafka Configurations>)
     val sc = spark.sparkContext
     val topic = Array("topic")
     
     // Creating Streaming Context
     @transient
     val ssc = new StreamingContext(sc, Seconds(300))
     
     // Building Spark-Kafka DStreams object
     val stream = KafkaUtils.createDirectStream[String, String](
        ssc,
        PreferConsistent,
        Subscribe[String, String](topic, kafkaConfig)
     )
     
     // Iterating over stream of RDDs and performing some operation
     // Committing offsets in the end after storing Kafka message and header
     stream.foreachRDD((rdd, batchTime) => {
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd.map(value => (value.value())).saveAsTextFile("path")

        rdd.map(message => message.headers()).map(y => 
        {y.iterator}).map(x => x.next()).map(y => 
          y.value().map(_.toChar).mkString)
        .saveAsTextFile("path")

        val commits = new offsetCommit(logger, util, props,batchID,x12Type)
        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges, commits)
      }
     )
    )
  
    ssc.start()
    ssc.awaitTermination()


   }
}

After saving messages from Kafka Topic, I'm trying to commit the offsets in range via commitAsync call. The issue here is that this call is executed in consecutive run but not the current. For instance, offsets of batch1 stream are getting committed only after the batch2 execution completes, instead it should be happening just after the batch1 completes. This issue is leading to data duplicity and sometimes data loss.

Not sure if I'm missing some property which would immediately commit offsets of the respective batch of stream without any lag. Would really appreciate any help towards this.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source