'Read latest records from Kafka using pyspark batch job
I am executing a batch job in pyspark, where spark will read data from kafka topic for every 5 min.
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1") \
.option("subscribePattern", "test") \
.option("startingOffsets", "earliest") \
.option("endingOffsets", "latest") \
.load()
Whenever spark reads data from kafka it is reading all the data including previous batches. I want to read data for the current batch or latest records which is not read before. Please suggest !! Thank you.
Solution 1:[1]
For batch queries, latest (either implicitly or by using -1 in json) is not allowed.
Using earliest means all the data again is obtained.
You will need to define the offset explicitly every time you run like, e.g.:
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
That implies you need to save the offsets processed per partition. I am looking into this in the near future myself for a project. Some items hereunder items to help:
https://medium.com/datakaresolutions/structured-streaming-kafka-integration-6ab1b6a56dd1 stating what you observe:
Create a Kafka Batch Query
Spark also provides a feature to fetch the data from Kafka in batch mode. In batch mode Spark will consume all the messages at once. Kafka in batch mode requires two important parameters Starting offsets and ending offsets, if not specified spark will consider the default configuration which is,
- startingOffsets — earliest
- endingOffsets — latest
https://dzone.com/articles/kafka-gt-hdfss3-batch-ingestion-through-spark alludes as well to what you should do, with the following:
- And, finally, save these Kafka topic endOffsets to file system – local or HDFS (or commit them to ZooKeeper). This will be used for the next run of starting the offset for a Kafka topic. Here we are making sure the job's next run will read from the offset where the previous run left off.
This blog https://dataengi.com/2019/06/06/spark-structured-streaming/ I think has the answer for saving offsets.
Solution 2:[2]
Did you use check point location while writing stream data
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | |
| Solution 2 | Pyspark Developer |
