'Flink - Connection unexpectedly closed by remote task manager
I am using Flink v.1.13.2 with one job manager three task managers.
For some reason (I couldn't find out the reason), task manager connections is being lost. Here is the log that I found:
2022-02-17 21:19:55,891 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Print to Std. Out (13/32) (f0ff88713cc3ff5ce39e7073468abed4) switched from RUNNING to FAILED on 1.2.3.5:39309-f61daa @ server.name (dataPort=43421).
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager '1.2.3.4/1.2.3.4:43421'. This might indicate that the remote task manager was lost.
at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:160) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:389) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:354) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:1106) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at java.lang.Thread.run(Thread.java:829) ~[?:?]
2022-02-17 21:19:55,912 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task ba13f73bc80e020e655c42a6f182ba1b_12.
2022-02-17 21:19:55,927 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 1919 tasks should be restarted to recover the failed task ba13f73bc80e020e655c42a6f182ba1b_12.
What could be reason?
Update: Finally i just found the following error logs from the JobManager
I am using standalone deployment mode.
- How does flink decides there is no available resources? (How to calculate max limit)
- If i combine some process functions(or flatmaps, filters etc..) into one process, then can i solve this issue?
2022-02-18 12:31:00,404 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job FlinkStreamProcessing (c447c54ed67c5ca2be360fce46420fba) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=10, backoffTimeMS=10000)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1140) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1080) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:911) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.executiongraph.ExecutionVertex.markFailed(ExecutionVertex.java:472) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.DefaultExecutionVertexOperations.markFailed(DefaultExecutionVertexOperations.java:41) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskDeploymentFailure(DefaultScheduler.java:507) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$7(DefaultScheduler.java:492) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[?:?]
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.failRequest(DeclarativeSlotPoolBridge.java:535) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.cancelPendingRequests(DeclarativeSlotPoolBridge.java:128) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.failPendingRequests(DeclarativeSlotPoolBridge.java:362) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable(DeclarativeSlotPoolBridge.java:351) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.jobmaster.JobMaster.notifyNotEnoughResourcesAvailable(JobMaster.java:816) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at jdk.internal.reflect.GeneratedMethodAccessor109.invoke(Unknown Source) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:301) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.2.jar:1.13.2]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.2.jar:1.13.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.2.jar:1.13.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.2.jar:1.13.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.2.jar:1.13.2]
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[?:?]
... 32 more
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
... 28 more
Solution 1:[1]
There are many reasons for TM to lose connection, TM machine is abnormal, TM exits abnormally, JM load is too high, etc.
From the log error, the root cause cannot be located.
May be you can also provide the TM error log.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | ChangLi |
