'Flink "Encountered error while consuming partitions" + "Connection reset by peer"

I have a Flink streaming job running 24/7. Several times per day, I see it fail and restart with the following log messages:

10:02:08.524 [Flink Netty Server (0) Thread 0] ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered error while consuming partitions
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer
10:02:08.524 [Flink Netty Server (0) Thread 1] ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered error while consuming partitions
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer
10:02:08.537 [Flink Netty Server (0) Thread 0] ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered error while consuming partitions
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer
10:02:08.560 [Flink Netty Server (0) Thread 0] ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered error while consuming partitions
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer
10:02:08.537 [Flink Netty Server (0) Thread 1] ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered error while consuming partitions
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer

Flink automatically restarts and runs successfully for several hours before this happens again.

Ideally, I'd prefer to fix this and avoid the periodic application restarts if possible. Or at least understand what is causing this. However, the application is generally working and it does recover with the automatic application restart. My boss can live with this as-is if needed.

I'm using Flink 1.14.4, which is the latest version as of this writing. I'm using the newer KafkaSource API, if that matters.

I see two related SO questions. They both mention increasing TaskManager memory. My TaskMemory memory is already at 10GB and I don't see any memory related log errors or warnings in TaskManager/JobManager logs.

flink Connection reset by peer Flink Job suddenly crashed with error: Encountered error while consuming partitions



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source