'Spark Structured Streaming writeStream trigger set to once is recording much less data than it should
I have a program that runs every hour, it receives streaming data and writes it in parquet format in batches into a datalake every time it runs, to be later processed by another function.
I, therefore, set the writeStream trigger to once=True, hoping that it will take all the data from the last time it run and write it into the datalake but I realised the amount of data it was being processed by the next function was so small (between 1 and 150), and that couldn't be possible.
I removed the trigger and now is processing between 1,500 to 1,800 data entries which is what I expected on the first place. Why is this happening?
You can check the huge difference it made in the following chart it generated:

This is part of the code where I defined these two functions and I had the trigger set:
def read_stream(spark, config):
schema = StructType() \
.add("data", StructType() \
.add("created_at", TimestampType())
.add("text", StringType()))
return spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",
config.get("kafka_servers")) \
.option("subscribe", config.get("topic_list")) \
.option("startingOffsets", "earliest") \
.load() \
.select(F.col('key').cast('string'),
F.from_json(F.col("value").cast("string"),
schema)['data']['created_at'].alias('created_at'),
F.from_json(F.col("value").cast("string"),
schema)['data']['text'].alias('text'),
F.col('topic')) \
.withColumn("hour", F.date_format(F.col("created_at"), "HH")) \
.withColumn("date", F.date_format(F.col("created_at"), "yyyy-MM-dd"))
# .trigger(once=True) \
def write_stream(df, config):
df.writeStream \
.format("parquet") \
.option("checkpointLocation", config.get("checkpoint_path")) \
.option("path", config.get("s3_path_raw")) \
.trigger(once=True) \
.partitionBy('date', 'hour') \
.start()
write_stream(read_stream(spark, config), config)
After I removed the trigger is when it started working properly, but I wonder why.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
