当前位置: 首页 > 知识库问答 >
问题:

Flink失去了领导者并崩溃

方和顺
2023-03-14

我正在LocalStream环境(嵌入式flink集群)中运行一个流处理应用程序。我成功地使用我的代码处理了几次特定的数据集。我昨天想在对处理逻辑进行一些修改后重新运行应用程序,但是在大约3/4的数据处理方式之后,flink集群似乎无缘无故地崩溃了。查看浓缩日志-我的评论插入尖括号中

2018-02-09 12:04:05,146 [INFO] from a.b.l.f.MultiS3FileSource in Source: General source (1/1) - inserting 266574 events
2018-02-09 12:10:55,094 [ERROR] from o.a.f.r.c.JobSubmissionClientActor in flink-akka.actor.default-dispatcher-11020 - class org.apache.flink.runtime.client.JobSubmissionClientActor received unknown message: 
2018-02-09 12:10:55,245 [WARN] from o.a.f.r.c.JobSubmissionClientActor in flink-akka.actor.default-dispatcher-11019 - Discard message LeaderSessionMessage(7240d925-8573-44e8-996c-fa4658ab0463,02/09/2018 12:10:55 Process -> Detection(7/8) switched to CANCELED ) because there is currently no valid leader id known.
2018-02-09 12:10:55,268 [WARN] from o.a.f.r.c.JobSubmissionClientActor in flink-akka.actor.default-dispatcher-11019 - Discard message LeaderSessionMessage(7240d925-8573-44e8-996c-fa4658ab0463,02/09/2018 12:10:55 Enrichment-> Flat Map(7/8) switched to CANCELED ) because there is currently no valid leader id known.
... <similar messages for all the processing steps>
2018-02-09 12:10:55,509 [ERROR] from o.a.f.s.r.t.StreamTask in PartialAggregations-> Sink: CassandraSink (1/8) - Error during disposal of stream operator.
java.lang.InterruptedException: null <because its interrupting a future>
... <for all of my sinks - these are custom, not the flink cassandra connectors>

第一条信息是关于我的源代码从s3读取数据并将其收集到flink中。

之后,第一个错误产生:https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java#L137

警告由以下人员产生:https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java#L115

最后一个错误在我的代码中,但它是由flink试图拆除作业引起的,所以它不应该是错误的原始原因。

我可以提供一些额外的信息,但我不确定什么是相关的。

第一个错误似乎是导致整个崩溃的原因。JobSubmissionClientActor的getLeaderSessionID为空怎么可能?如果flink运行的是embedded,JobSubmissionClientActor会收到什么样的消息?在我看来,它能够接收到的所有消息都是关于提交作业的消息。在嵌入式模式下也应该这样吗?我怎样才能防止这次撞车?

更新:我想我误解了错误日志。当我再次运行执行时,我得到的事件顺序略有不同。在上一次运行中,我只在处理流时出现错误,没有明显的原因导致流结束,因为最后一个错误似乎没有包含在我的日志文件中(不过它被打印到标准输出)。这个错误在下面,之前的错误与上一次运行中的错误相似(围绕处理流的错误)。

[error] Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: JobClientActor seems to have died before the JobExecutionResult could be retrieved.[error]         at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:285)
[error]         at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
[error]         at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:565)
[error]         at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:539)
[error]         at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:108)
[error]         at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501)
[error]         at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
[error]         at a.b.l.flink.FlinkIngestPrototype$.run(FlinkIngestPrototype.scala:90)
[error]         at a.b.l.flink.FlinkIngestPrototype$.main(FlinkIngestPrototype.scala:43)
[error]         at a.b.l.flink.FlinkIngestPrototype.main(FlinkIngestPrototype.scala)
[error] Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
[error]         at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
[error]         at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
[error]         at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
[error]         at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
[error]         at scala.concurrent.Await$.result(package.scala:190)
[error]         at scala.concurrent.Await.result(package.scala)
[error]         at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:273)
[error]         ... 9 more

我将执行失败追溯到以下几点:

>

此ping超时并向作业参与者发送毒药,这会导致所有不同的处理错误。

我之前遇到过一些期货问题,它们会以不确定的方式被更短的超时中断。我已经调试了这个问题,我认为这是因为一些非常长的GC暂停(或类似的东西)。超时如何与GC暂停同步的说明:https://imgur.com/a/9mMvN.我认为这也可能是超时的原因。这是我的GC配置:

"-XX:-UseParallelGC",
"-XX:-UseConcMarkSweepGC",
"-XX:+UseG1GC",

根据大多数来源,这将导致非常短的GC暂停(少于一秒)。有人有过在flink中获得很长GC暂停的经验吗?这可能是一个与硬件相关的问题吗?我正在EC2 AWS实例上运行该应用程序。

共有1个答案

洪琦
2023-03-14

正如您所说,这是GC暂停的问题,我试图解决这类问题的是:

  1. 减少作业内存需求
  2. 增加系统可用内存
  3. 增加心跳超时,因此长时间暂停后不会崩溃
 类似资料:
  • 我有一个Kafka Streams应用程序,使用3个代理和3个复制因子从Kafka集群进行消费和生产。除了消费者偏移主题(50个分区)之外,所有其他主题都只有一个分区。 当代理尝试首选副本选择时,Streams应用程序(运行在与代理完全不同的实例上)将失败,并出现错误: Streams应用程序尝试成为分区的领导者是否正常,因为它在不属于Kafka集群的服务器上运行? 我可以通过以下方式复制这种行为

  • 本文向大家介绍解释领导者和追随者的概念。相关面试题,主要包含被问及解释领导者和追随者的概念。时的应答技巧和注意事项,需要的朋友参考一下 答:在Kafka的每个分区中,都有一个服务器充当领导者,0到多个服务器充当追随者的角色。

  • 我在AWS EKS集群版本1.15中以复制模式(总共3个zooKeer节点)运行ZooKeer 3.6.0。我正在从Docker中心拉取zooKeer:最新映像。 这是节点1(PRD-zoo1)的zoo.cfg文件。除了指定其他zoo服务器的最后三个条目之外,其他节点具有类似的配置。 动物园节点似乎能够相互通信并完成领导人选举。然而,当我查看日志时,我看到了反复出现的java。lang.Illeg

  • 我正在努力通过SSL移动所有Kafka流量。每个区域有两个集群。 使用Kafka 2.7.0版。 除一个集群外,所有区域和所有集群都可以通过SSL正常工作。 在其他工具中,我使用< code>kafkacat来探测集群。 当通过明文连接对此集群执行时,它会列出所有代理、主题和分区,并显示每个分区的领导者: 当通过 SSL 执行相同的命令时,会发现: 0经纪人 列出主题和分区,但没有标题 对该地区的

  • 问题内容: 我知道Go中没有析构函数,因为从技术上讲没有类。这样,我用来执行与构造函数相同的功能。但是,有没有办法在终止的情况下创建某些东西来模仿析构函数,例如使用关闭文件?现在,我只是打电话给我,但这有点荒唐,我认为设计很差。正确的方法是什么? 问题答案: 在Go生态系统中,存在一种处理包装了宝贵(和/或外部)资源的对象的惯用语:一种专门用于释放该资源的特殊方法,通常通过该机制进行 显式 调用。

  • 问题内容: 今天,当我试图“通用化”我的“ CoreData导入操作”时遇到了一个奇怪的问题。看来,如果我创建NSOperation的通用子类,则不会调用该func。 简单的例子: 如果您创建此类的实例并将其添加到中,您将看到实际上并没有调用它。 操作简单地从过境到和状态,而无需调用。 如果我从类中删除通用注释,它将正常工作。 这怎么可能?我在这里想念什么吗? 问题答案: 问题是由以下简单规则引起