'Spark structured streaming metrics are confusing
I start a structured streaming(spark 3.1.2) which writes to kafka (1 driver, 5 executor on K8s)
dataFrame
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-host:9092")
.option("topic", "my-topic")
By the time all the inputs are processed the Kafka topic 'my-topic' consists 38087 new messages. So 38087 rows were written into Kafka. Metrics are written to the ConsoleSink and on the SparkUI.
Now let's see the actual Spark metrics:
spark.streaming.states-rowsTotal: 3600 (Gauge)
spark.streaming.states-rowsUpdated: 0 (Gauge)
executor.recordsWritten: 0 (Counter)
Makes no sense...
Now lets check all the ConsoleSink entries:
spark.streaming.states-rowsTotal: 41687,3600, 3600...n
spark.streaming.states-rowsUpdated: 41687,0,0...n
executor.recordsWritten: 0...n
As you can see rowsTotal was 41687 at first since then it is 3600. RecordsWritten is still 0.
But if I check recordsWritten on the SparkUi:
driver-ip/api/v1/app-id/stages/stage-id
For all the stage-id there are 400piece of entries with 0 value, except for one which has the value of 41687
Magic math: If I perform x=41687-3600 then I get 38087 which is the number of new rows in Kafka. What the heck is going on?
So the questions are:
- what rowsTotal,rowsUpdated represent and why are they Guage types?
- what recordsWritten represents?
- Why the value of recordsWritten in a Sink and on SparkUI are different?
- What is the reason behind the magis substraction above? Makes no sense at all
- How to get/create a single metric which shows the total number of rows written into Kafka
Solution 1:[1]
spark.streaming.states-rowsTotal means the number of rows in state store if you used some stateful operators. It has no relations with output records.
The sum of executor.recordsWritten for all executors should return total records writen to sink. But unforuntately, most streaming sink doesn't correctly updates this metrics(at least for console, kafka sink).
One workaround I can think of is:
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.count() // write count to some where
batchDF.write.format(...).save(...) // write into kafka
batchDF.unpersist()
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Warren Zhu |
