'spark streaming from kafka: numInputRows does not match microbatch df.count()

The Databricks UI shows the following numbers:

  "batchId" : 2,
  "numInputRows" : 311780,
  "inputRowsPerSecond" : 9306.587863048864,
  "processedRowsPerSecond" : 1168.240407673861,

But when I print the microatch's df.count() in a foreachBatch, it shows different numbers. A simplified version of what I am doing is this:

def handleMicroBatch(microBatchDF: org.apache.spark.sql.Dataset[Row], batchId: Long) = {      
            val batchSize = microBatchDF.count                
            println(s"batchSize: ${batchSize}, batchId: ${batchId}")
            //...more code...
}
//output looks like:
batchSize: 995, batchId: 0
batchSize: 22702, batchId: 1
batchSize: 155890, batchId: 2

As you can see, count does not match for batch id#2. My understanding is that each kafka offset is a Row, and therefore the numInputRows for a particular batch id should match what I read using spark.readStram. Why aren't the numbers matching?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source