当前位置: 首页 > 知识库问答 >
问题:

TaskManager和JobManager之间的Flink Cluster Kubernetes心跳故障

国兴文
2023-03-14

我正在尝试使用本机kubernetes运行flink集群。

下面是flink conf,

jobmanager.rpc.port: 6123
blob.server.port: 6124
taskmanager.rpc.port: 6122
taskmanager.numberOfTaskSlots: 10
jobmanager.memory.process.size: 5120m
jobmanager.memory.jvm-overhead.max: 512m
taskmanager.memory.process.size: 6656m
taskmanager.memory.framework.heap.size: 435m
taskmanager.memory.framework.off-heap.size: 217m
taskmanager.memory.jvm-overhead.max: 435m
kubernetes.jobmanager.cpu: 1
kubernetes.taskmanager.cpu: 1
# akka settings
akka.ask.timeout: 300s
akka.tcp.timeout: 1200s


# JVM configurations
env.java.opts: "-XX:+HeapDumpOnOutOfMemoryError -XX:NativeMemoryTracking=summary -XX:+UseG1GC -Dkubernetes.websocket.ping.interval=300000"


# checkpoint config
execution.checkpointing.interval: 2min
execution.checkpointing.timeout: 30min # savepoint usually takes longer
execution.checkpointing.min-pause: 110s
execution.checkpointing.externalized-checkpoint-retention: DELETE_ON_CANCELLATION
execution.checkpointing.tolerable-failed-checkpoints: 2
execution.checkpointing.snapshot-compression: true
execution.savepoint.ignore-unclaimed-state: true
#execution.checkpointing.unaligned: true

# heartbeat settings
cluster.registration.initial-timeout: 1000
cluster.registration.max-timeout: 300000
cluster.services.shutdown-timeout: 300000
heartbeat.timeout: 120000
heartbeat.interval: 60000

首先,JobManager和TaskManager成功启动,TM处理的事件很少。但在超时间隔之后,JM抛出如下所示的心跳错误,并将启动新的TaskManager。旧的任务管理器仍在运行,新的任务管理器在同一命名空间中启动。

无法解释这种行为。我确保检查TaskManager和jobmanager的内存,没有内存问题。此外,TM和JM运行时不会崩溃。除非在超时间隔后,否则无法识别TM,并启动新的TMs。

2021-09-09 03:23:32,886 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Co-Process-Broadcast (10/10) (14679f59d1accdc6f925e2637eede0c9) switched from RUNNING to FAILED on flink-taskmanager-1-4 @ 100.114.72.234 (dataPort=41845).
java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id flink-taskmanager-1-4  timed out.
    at org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1442) ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
    at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111) ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
    at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442) ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209) ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [akka-actor_2.12-2.5.21.jar:2.5.21]
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [akka-actor_2.12-2.5.21.jar:2.5.21]
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [scala-library-2.12.7.jar:?]
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [scala-library-2.12.7.jar:?]
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [akka-actor_2.12-2.5.21.jar:2.5.21]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [scala-library-2.12.7.jar:?]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [scala-library-2.12.7.jar:?]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [scala-library-2.12.7.jar:?]
    at akka.actor.Actor.aroundReceive(Actor.scala:517) [akka-actor_2.12-2.5.21.jar:2.5.21]
    at akka.actor.Actor.aroundReceive$(Actor.scala:515) [akka-actor_2.12-2.5.21.jar:2.5.21]
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [akka-actor_2.12-2.5.21.jar:2.5.21]
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [akka-actor_2.12-2.5.21.jar:2.5.21]
    at akka.actor.ActorCell.invoke(ActorCell.scala:561) [akka-actor_2.12-2.5.21.jar:2.5.21]

我还检查了/etc/hosts文件以查看

# Kubernetes-managed hosts file.
127.0.0.1       localhost
::1     localhost ip6-localhost ip6-loopback
fe00::0 ip6-localnet
fe00::0 ip6-mcastprefix
fe00::1 ip6-allnodes
fe00::2 ip6-allrouters
100.114.126.100 flink-5fd769dbcf-k4cnb
2620:149:106a:220d::7418        flink-5fd769dbcf-k4cnb

知道这里有什么问题吗?非常感谢。

共有1个答案

顾骏祥
2023-03-14

可能需要确认JM(cpu核心)的配置和TMs的数量,JM是否可能有高负载,或者JM是否执行一些用户逻辑代码?

 类似资料:
  • 我正在阅读Flink官方文档关于任务失败恢复:https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html 据我所知,这个文档告诉我们,如果某个任务由于某种原因失败,Flink可以借助检查点机制来恢复它。 所以现在我还有两个问题: > 如果TaskManager失败怎么办?据我所知,任务分

  • 我在HA模式下配置了Flink,如下所述: 我想测试容错性,因此我做了以下工作: 设置具有2个JobManager和1个TaskManager的Flink群集 在任务管理器上启动流式处理作业 杀死活动的作业管理器(以模拟崩溃) 领导人选举如期举行 但注意到任务管理器正在重新连接到新的作业管理器。它只是每10秒尝试重新连接到前一个领导者 在此处粘贴任务管理器日志: 重新启动任务管理器没有帮助 重新启

  • 我正在尝试使用以下命令提交flink关于纱线的作业: 我拿到了env。Java语言选择flink客户端日志,但当应用程序提交到Thread时,这些Java选项将不可用。由于额外的JVM选项不可用,应用程序在连接zookeeper时引发异常。 请建议如何将动态属性传递给JM 注意:我试图设置env。Java语言将选项选择到conf/flink-conf.yaml及其工作状态。我需要一种通过flink

  • 本文向大家介绍Java中故障快速和故障安全之间的区别,包括了Java中故障快速和故障安全之间的区别的使用技巧和注意事项,需要的朋友参考一下 序号 键 不及格 故障安全 1 例外 集合中的任何更改(例如在线程期间添加,删除和更新集合)都是迭代集合,然后使快速抛出并发修改异常失败。  故障安全集合不会引发异常。  2。 集合类型 ArrayList和hashmap集合是快速失败迭代器的示例  Copy

  • 嵌入式设备上,只能运行SRS时,其他的业务系统可能需要知道这个设备的ip等信息,SRS可以以http方式主动汇报给api服务器。 编译 要求编译时支持http-parser,即开启了下面任何一个选项即支持: --with-http-api HTTP接口。 --with-http-server HTTP服务器。 --with-http-callback HTTP回调。 配置 在全局配置以下信息即可以

  • 本文向大家介绍顶点之间的距离和偏心距,包括了顶点之间的距离和偏心距的使用技巧和注意事项,需要的朋友参考一下 两个顶点之间的距离 它是顶点U和顶点V之间最短路径中的边数。如果有多个路径连接两个顶点,则最短路径被视为两个顶点之间的距离。 表示法-d(U,V) 从一个顶点到另一顶点可以有任意数量的路径。其中,您只需要选择最短的一个即可。 示例 看一下下图- 在这里,从顶点“ d”到顶点“ e”或简称“