当前位置: 首页 > 知识库问答 >
问题:

Flink HA群集作业管理器问题

石正信
2023-03-14

我有一个Flink 1.2集群的设置,由3个JobManager1和2个TaskManager1组成。我从JobManager1开始动物园管理员法定人数,我得到确认,动物园管理员开始其他2个JobManager1,然后我在这个JobManager1上开始一个Flink作业。

flink-conf.yaml在所有5个虚拟机上都是相同的,这意味着jobmanager。rpc。地址:指向各处的JobManager1。

如果我关闭运行JobManager1的VM,我希望Zookeeper会说剩下的一个JobManager是领导者,TaskManager应该重新连接到它。相反,我在TaskManager的日志中看到了很多这样的信息

2017-03-14 14:13:21,827 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Trying to register at JobManager akka.tcp://flink@1.2.3.4:43660/user/jobmanager (attempt 11, timeout: 30 seconds)
2017-03-14 14:13:21,836 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@1.2.3.4:43660] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:43660]] Caused by: [Connection refused: /1.2.3.4:43660]

为了保密,我将原始IP修改为1.2.3.4,因为它总是相同的IP(JobManager1)。

更多日志

2017-03-15 10:28:28,655 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source -> Flat Map (1/1)
2017-03-15 10:28:38,534 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
2017-03-15 10:28:46,606 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
2017-03-15 10:28:52,431 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
2017-03-15 10:29:02,435 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
2017-03-15 10:29:10,489 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its leadership.
2017-03-15 10:29:10,490 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Cancelling all computations and discarding all cached data.
2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Source: Custom Source -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED.
java.lang.Exception: TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its leadership.
    at org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074)
    at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426)
    at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
    at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-03-15 10:29:10,512 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Source: Custom Source -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
2017-03-15 10:29:10,515 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Flat Map (1/1) (dd555e0437867c3180a1ecaf0a9f4d04).
2017-03-15 10:29:10,515 INFO  org.apache.flink.runtime.taskmanager.Task                     - Flat Map (1/1) (dd555e0437867c3180a1ecaf0a9f4d04) switched from RUNNING to FAILED.
java.lang.Exception: TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its leadership.
    at org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074)
    at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426)
    at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
    at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-03-15 10:29:10,516 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Flat Map (1/1) (dd555e0437867c3180a1ecaf0a9f4d04).
2017-03-15 10:29:10,516 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Disassociating from JobManager
2017-03-15 10:29:10,525 INFO  org.apache.flink.runtime.blob.BlobCache                       - Shutting down BlobCache
2017-03-15 10:29:10,542 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
2017-03-15 10:29:10,546 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
2017-03-15 10:29:10,548 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Flat Map (1/1) (dd555e0437867c3180a1ecaf0a9f4d04).
2017-03-15 10:29:10,551 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Flat Map (1/1)
2017-03-15 10:29:10,552 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Trying to register at JobManager akka.tcp://flink@1.2.3.5:43893/user/jobmanager (attempt 1, timeout: 500 milliseconds)
2017-03-15 10:29:10,567 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Source: Custom Source -> Flat Map (1/1)
2017-03-15 10:29:10,632 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Successful registration at JobManager (akka.tcp://flink@1.2.3.5:43893/user/jobmanager), starting network stack and library cache.
2017-03-15 10:29:10,633 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Determined BLOB server address to be /1.2.3.5:42830. Starting BLOB cache.
2017-03-15 10:29:10,633 INFO  org.apache.flink.runtime.blob.BlobCache                       - Created BLOB cache storage directory /tmp/blobStore-d97e08db-d2f1-4f00-a7d1-30c2f5823934
2017-03-15 10:29:15,551 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
2017-03-15 10:29:20,571 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
2017-03-15 10:29:25,582 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
2017-03-15 10:29:30,592 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]

有人知道为什么TaskManager没有尝试重新连接到剩余的一个JobManager(如上面的1.2.3.5)吗?

谢谢

共有1个答案

仲涵亮
2023-03-14

对于所有面临相同问题的人,HA要求您提供一个可从所有节点访问的DFS位置。我的后端状态检查点目录和zookeeper存储目录在每个虚拟机上指向一个本地文件系统位置,当一个作业管理器宕机时,新领导无法恢复正在运行的作业,因为缺少信息/位置不可访问。

编辑:由于有人问这个问题,我修改了文件(在ApacheFlink 1.2中)(https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/config.html))是

conf/flink-conf.yaml

我设定

state.backend.fs.checkpointdir
high-availability.zookeeper.storageDir

到AWS S3路径。可从TaskManager和JobManager访问。

 类似资料:
  • 一、hadoop yarn 简介 Apache YARN (Yet Another Resource Negotiator) 是 hadoop 2.0 引入的集群资源管理系统。用户可以将各种服务框架部署在 YARN 上,由 YARN 进行统一地管理和资源分配。 二、YARN架构 1. ResourceManager ResourceManager 通常在独立的机器上以后台进程的形式运行,它是整个集

  • 主要内容:使用简介 Docker Swarm 是 Docker 的集群管理工具。它将 Docker 主机池转变为单个虚拟 Docker 主机。 Docker Swarm 提供了标准的 Docker API,所有任何已经与 Docker 守护程序通信的工具都可以使用 Swarm 轻松地扩展到多个主机。 支持的工具包括但不限于以下各项: Dokku Docker Compose Docker Machine Jen

  • 集群管理架构概述。 { "clusters": [], "sds": "{...}", "local_cluster_name": "...", "outlier_detection": "{...}", "cds": "{...}" } clusters (required, array) 群集管理器将执行服务发现,健康检查和负载平衡的上游群集列表。 sds (someti

  • 集群管理架构概述 v1 API 参考 v2 API 参考 统计 概述 健康检查统计 离群检测统计 动态HTTP统计 动态HTTP交叉树统计 按服务区动态HTTP统计 负载均衡统计 负载均衡子集统计 运行时设置 主动健康检查 离群异常检测 核心 区域负载均衡 熔断 集群发现服务 统计 健康检查 TCP健康检查 熔断 运行时配置

  • Envoy的集群管理器管理所有配置的上游集群。就像Envoy配置可以包含任意数量的监听器一样,配置也可以包含任意数量的独立配置的上游集群。 上游集群和主机从网络/HTTP过滤器堆栈中抽象出来,因为上游集群和主机可以用于任意数量的不同代理任务。集群管理器向过滤器堆栈公开API,允许过滤器获得到上游集群的L3/L4连接,或者到上游集群的抽象HTTP连接池的句柄(无论上游主机是支持HTTP/1.1还是H

  • 用户除了通过控制台管理集群外,还可以通过ssh直接登陆到主节点上进行操作。主节点上已经完成了集群环境的相关配置,您可以直接在主节点上执行命令。 您还可以通过ssh架设SOCKS5代理服务器后,访问到集群内原生的hadoop管理页面。 生成密钥对 在自己机器上,执行命令如下 ssh-keygen -f ./hadoop_key -C "emr public key" 其中-f指定文件,-C添加