'Custom Restart Strategy / Error handling for org.apache.flink.runtime.taskmanager.Task

I would like to ask if there is a way to do custom error handling - restart strategy for a flink task, based on the exception type.

Rather than only using the predefined strategies provided (https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/task_failure_recovery/)

My use case is the following:

I am using flink-kafka (https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/) to consume from a kafka topic (source), but there might be a case, where the logs will be corrupted (CorruptRecordException :: https://kafka.apache.org/27/javadoc//org/apache/kafka/common/errors/CorruptRecordException.html) so I want to handle that and do the necessary actions based on my use-case (either stop task, or recover).

Thanks in advance.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source