'Spark Structured Streaming writeStream trigger set to once is recording much less data than it should

I have a program that runs every hour, it receives streaming data and writes it in parquet format in batches into a datalake every time it runs, to be later processed by another function.

I, therefore, set the writeStream trigger to once=True, hoping that it will take all the data from the last time it run and write it into the datalake but I realised the amount of data it was being processed by the next function was so small (between 1 and 150), and that couldn't be possible.

I removed the trigger and now is processing between 1,500 to 1,800 data entries which is what I expected on the first place. Why is this happening? You can check the huge difference it made in the following chart it generated: enter image description here

This is part of the code where I defined these two functions and I had the trigger set:

def read_stream(spark, config):

    schema = StructType() \
        .add("data", StructType() \
            .add("created_at", TimestampType())
            .add("text", StringType()))

    return spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", 
                config.get("kafka_servers")) \
        .option("subscribe", config.get("topic_list")) \
        .option("startingOffsets", "earliest") \
        .load() \
        .select(F.col('key').cast('string'),
                F.from_json(F.col("value").cast("string"), 
                    schema)['data']['created_at'].alias('created_at'),
                F.from_json(F.col("value").cast("string"), 
                    schema)['data']['text'].alias('text'),
                F.col('topic')) \
        .withColumn("hour", F.date_format(F.col("created_at"), "HH")) \
        .withColumn("date", F.date_format(F.col("created_at"), "yyyy-MM-dd"))

#       .trigger(once=True) \
def write_stream(df, config):
    df.writeStream \
      .format("parquet") \
      .option("checkpointLocation", config.get("checkpoint_path")) \
      .option("path", config.get("s3_path_raw")) \
      .trigger(once=True) \
      .partitionBy('date', 'hour') \
      .start()

write_stream(read_stream(spark, config), config) 

After I removed the trigger is when it started working properly, but I wonder why.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source