'Can the failure control in Beam cause a memory leak in Java?

I am a question about the management of failures in Beam (Java SDK).

According to this the documentation you can use a transformer which can return entries with some kind of failures that cannot be operated by the transformer. Then, you can use a Java List to store and retrieve these messages with failures. Below you can see the snippet of code from the documentation:

List<PCollection<String>> failureCollections = new ArrayList<>();
input.apply(MapElements.via(...).exceptionsVia(...))
      .failuresTo(failureCollections)

As you can see, the messages con failures are added to the list, as you can see in the source code.

Messages with failures are added to the list, but they are not deleted after. So, I wonder if the size of the list only go up. This would mean that messages with failures will not be able to be deleted by the GC, if the the number of failures required the allocated memory we would find a memory leak.

Moreover, I am not sure if the List is being stored in the checkpoints periodically, that would also means the size of checkpoints will also increase.



Solution 1:[1]

No, it does not cause memory leak. The failureCollections is a list of PCollections, so they are not stored on a single machine.

Check the example in the document, if you later do no handle the collected failureCollections, they simply don't do anything to it at all. You may chain a few other transforms to sink them to somewhere such as GCS for further investigation. The usage is

 PCollection<Integer> input = ...
 List<PCollection<Map<String, String>> failureCollections = new ArrayList<>();
 input.apply(MapElements.via(...).exceptionsVia(...))
      .failuresTo(failureCollections)
      .apply(MapElements.via(...).exceptionsVia(...))
      .failuresTo(failureCollections);
 // You handle these failures explicitly, otherwise, they are just dangling PCollections.
 PCollection<Map<String, String>> failures = PCollectionList.of(failureCollections)
      .apply("FlattenFailureCollections", Flatten.pCollections());

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 ningk