'Spark structured streaming- checkpoint metadata growing indefinitely

I use spark struture streaming 3.1.2. I need to use s3 for storing checkpoint metadata (I know, it's not optimal storage for checkpoint metadata). Compaction interval is 10 (default) and I set spark.sql.streaming.minBatchesToRetain=5. When the job was running for a few weeks then checkpointing time increased significantly (cause a few minutes dalay on processing). I looked at checkpoint metadata structure. There is one heavy path there: checkpoint/source/0. Single .compact file weights 25GB. I looked into its content and it contains all entries since batch 0 (current batch is around 25000).

I tried a few parameters to remove already processed data from the compact file, namely: spark.cleaner.referenceTracking.cleanCheckpoints=true - does not work. As I've seen in the code it's related to previous version of streaming, isn't it? spark.sql.streaming.fileSource.log.deletion=true and spark.sql.streaming.fileSink.log.deletion=true doesn't work.

The compact file store full history even if all data were processed (except for the most recent checkpoint), so I expect most of entries would be deleted. Is there any parameter to remove entries from compact file or remove compact file gracefully from time to time?

Now I am testing scenario when I stop the job, delete most of checkpoint/source/0/* files, keeping just a few recent checkpoints (not compacted) and I rerun the job. The job recovers correctly from recent checkpoint. When it comes to compaction of checkpoint then it fails with missing recent compaction file. I would probably need to edit recent compact file (instead of deleting it) and keep only a few recent records there. It looks like possible workaround of my problem, but this scenario with manual delete of checkpoint files looks ugly, so I would prefer something managed by Spark.



Solution 1:[1]

For posterity: the problem was FileStreamSourceLog class. I needed to overwrite method shouldRetain, that by default returns true and its doc say:

Default implementation retains all log entries. Implementations should override the method to change the behavior.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 wind