当前位置: 首页 > 知识库问答 >
问题:

从YARN上的另一个程序开始的Flink作业失败,“JobClientActor似乎已经死亡”

柯学
2023-03-14

我是flink的新用户,我有以下问题。我使用纱线簇上的flink将从RDBMS提取的相关数据传输到HBase。我在java上使用多个ExecutionEnvironments(每个RDB表一个,以并行方式传输表行)编写flink批处理应用程序,以按表顺序传输(因为env.execute()的调用被阻塞)。

我开始这样的纱线训练

export YARN_CONF_DIR=/etc/hadoop/conf
export FLINK_HOME=/opt/flink-1.3.1
export FLINK_CONF_DIR=$FLINK_HOME/conf
$FLINK_HOME/bin/yarn-session.sh -n 1 -s 4 -d -jm 2048 -tm 8096

然后,我在通过shell脚本传输启动的纱线会话上运行我的应用程序。sh.其内容在此处

#!/bin/bash

export YARN_CONF_DIR=/etc/hadoop/conf
export FLINK_HOME=/opt/flink-1.3.1
export FLINK_CONF_DIR=$FLINK_HOME/conf
$FLINK_HOME/bin/flink run -p 4 transfer.jar

当我从命令行手动启动此脚本时,它工作正常-作业一个接一个地提交给YARN会话,没有错误。

现在,我应该能够从另一个java程序运行此脚本。为此,我使用

Runtime.exec("transfer.sh");

(也许有更好的方法来做到这一点?我在REST API上见过,但有一些困难,因为作业管理器是由YARN代理的)。一开始是像往常一样工作——首先将几个作业提交到会话并成功完成。但是以下作业没有提交到YARN会话。在 /opt/flink-1.3.1/log/flink-tsvetkoff-client-hadoop-dev1.log中,我看到错误(并且在DEBUG级别没有发现其他错误)

The program execution failed: JobClientActor seems to have died before the JobExecutionResult could be retrieved.

我试图自己分析这个问题,发现在向JobClientActor(即纱线集群)发送带超时的ping请求时,JobClient类中发生了此错误。我尝试增加多个心跳和超时选项,如akka。*。暂停,阿卡。看心跳信号。*还有纱线。心跳延迟选项,但这并不能解决问题-新作业不会从CliFrontend提交到纱线会话。

这两种情况(手动调用和来自另一个程序的调用)的环境相同。当我打电话时

$ ps axu | grep transfer

它会给我输出

/usr/lib/jvm/java-8-oracle/bin/java -Dlog.file=/opt/flink-1.3.1/log/flink-tsvetkoff-client-hadoop-dev1.log -Dlog4j.configuration=file:/opt/flink-1.3.1/conf/log4j-cli.properties -Dlogback.configurationFile=file:/opt/flink-1.3.1/conf/logback.xml -classpath /opt/flink-1.3.1/lib/flink-metrics-graphite-1.3.1.jar:/opt/flink-1.3.1/lib/flink-python_2.11-1.3.1.jar:/opt/flink-1.3.1/lib/flink-shaded-hadoop2-uber-1.3.1.jar:/opt/flink-1.3.1/lib/log4j-1.2.17.jar:/opt/flink-1.3.1/lib/slf4j-log4j12-1.7.7.jar:/opt/flink-1.3.1/lib/flink-dist_2.11-1.3.1.jar:::/etc/hadoop/conf org.apache.flink.client.CliFrontend run -p 4 transfer.jar

我还试图将flink更新到1.4.0版本或更改作业的并行性(甚至是-p 1),但仍然发生了错误。

我不知道有什么不同?顺便问一下,有什么解决方法吗?

谢谢你的帮助。

共有1个答案

越安翔
2023-03-14

最后,我找到了如何解决这个错误,只需替换运行时即可。执行官(…) 。inheritIO()。start()。

我真的不知道为什么调用inheritIO在这种情况下会有所帮助,因为据我所知,它只是将IO流从子进程重定向到父进程。但我已经检查过,如果我注释掉这行代码,程序就会再次开始下降。

 类似资料:
  • 我正在尝试使用 启动 的会话 我一直收到这个错误: (未知错误:DevToolsActivePort文件不存在)(从chrome位置/usr/lib/chrumbrowser/chrum浏览器启动的进程不再运行,因此ChromeDriver假设chrome已崩溃。)(驱动程序信息:ChromeDriver=71.0.3578.98,平台=Linux 4.15.0-45-generic x86_64

  • 我目前正在尝试为一个项目设置Elasticsearch。我已经安装了,还安装了Java,即。 但是当我尝试使用以下命令启动Elasticsearch时 我得到以下错误 loaded:loaded(/usr/lib/systemd/system/elasticsearch.service;disabled;vend 活动:自世界协调时2019-11-01 06:09:54开始失败(结果:退出-代码)

  • 我有两个简单的Flink流式作业,从Kafka读取,做一些转换,并将结果放入Cassandra sink。他们从不同的Kafka主题阅读,并存入不同的卡桑德拉表。 当我单独运行这两个工作中的任何一个时,一切都很好。检查点被触发并完成,数据被保存到Cassandra。 我找不到关于这个错误的很多信息,它可能是由下列任何一个引起的: Flink(V1.10.0-Scala2.12), Flink Ca

  • 我们正在部署一个新的Flink流处理作业,它的状态(存储)需要使用历史数据进行初始化,并且在开始处理任何新的应用程序事件之前,该数据应该在状态存储中可用。我们不想显着修改Flink作业以同时加载历史数据。我们考虑编写另一个单独的Flink作业来处理历史数据,更新其状态存储并创建一个Savepoint并使用此Savepoint在主Flink作业中初始化状态。看起来状态处理器API仅适用于DataSe

  • 问题内容: 我想创建一个Jenkins作业来启动其他Jenkins作业。那将非常容易,因为Jenkins模板项目插件允许我们创建一个类型为“使用来自另一个项目的构建器”的构建步骤。但是,使我的情况更难的是,我必须在其他计算机上开始Jenkins的工作。有什么标准方法可以做到吗? 问题答案: 万一您只想触发Job的新版本,您有多种方法可以完成它 您可以使用远程访问API并触发请求以从源Job构建目标