当前位置: 首页 > 知识库问答 >
问题:

Flink检查点随机失败

季凡
2023-03-14

我正在kubernetes中运行一个Flink作业。我的设置如下

  1. 1个作业管理器吊舱

我的flink作业从kafka源获取时间序列数据(时间、值),进行聚合和其他转换,并将其发布到kafka接收器。

有时,我的作业因检查点异常(10分钟后超时)而失败,主要是由一名操作员完成的。我不理解异步持续时间(在图中)的含义,为什么它花费的时间最长。在这个异常之前,Kafka的吞吐量非常高,有500-800万条记录,但在这个检查点异常之后,吞吐量变得非常慢。另一个观察结果是,增加并行性无助于提高吞吐量。我还启用了未对齐的检查点,但它没有多大帮助。

Flink版本:1.13.2提交:5f007ff@2021-07-23T04:35:55 02:00

感谢您的帮助。

编辑:

在David的评论之后,我对我的集群进行了以下更改

我添加了一个单独的节点池,每个节点附有10个节点和1个本地ssd(默认大小为375 GB),并在此节点池中调度我的TM pod。此外,我将rocksdb localdir设置为溢出状态数据到磁盘。我有1个kafka源,可以流式传输10亿时间序列数据

检查点间隔=15分钟,还启用了未对齐的检查点

  1. 第一次运行,吞吐量非常高。它处理

问题

  1. 为什么第一次运行时吞吐量真的很高,然后又下降了
  2. 首次运行时仍观察到检查点故障
  3. 在首次运行时取消作业并重新提交作业无助于获得相同的吞吐量
  4. 虽然作业现在运行良好,没有检查点故障,但为什么所有本地ssd设置的吞吐量仍然很低
  5. 为什么检查点大小波动如此之大?有时多,有时少

共有3个答案

祝花蜂
2023-03-14

看起来您有一个用例,其中未对齐的检查点实际上没有任何帮助,反而使其恶化。

未对齐的检查点适用于遇到突然峰值或持续时间较长的背压的用例,因此在开始时吞吐量非常低的情况下。

现在,未对齐的检查点正在检查点中持久化瞬态数据。这就是为什么大小会膨胀,实际上可能会有很大的变化,这取决于网络缓冲区饱和的程度。

从未对齐的检查点恢复也需要更长的时间(要恢复更多数据)。

现在的问题仍然是,为什么对未对齐的检查点会有超时。理论上,只有当您的I/O成为瓶颈时,才会发生这种情况。未对齐的检查点将您的检查点大小从1 GB放大到9 GB。

另一种解释可能实际上是水槽侧的一个完全瓶颈;由于您可能使用了KafkaConsumer,而不是新的KafkaSource,因此需要最少的数据流。但我想从你的解释中排除这一点。

为了进一步调查,我需要一个超时检查点的屏幕截图(每个任务的详细统计数据)和日志。

最后一个问题是,即使作业完全重新启动,总体吞吐量仍然存在问题。根据提供的信息,这个问题很难回答。我猜您有一些有状态操作,其中heap statebend在达到某个状态大小后运行得不太理想。您可以尝试调整StateBend。例如,确保调整SSD的rocks DB。

严玉泽
2023-03-14

完全同意David的意见,您应该使用本地磁盘。这里没有耐久性,只有性能要求。有关详细信息,请查看此博文[1]。至于吞吐量下降,您是否检查了本地磁盘的指标?它们是不稳定的磁盘吗?你可能会遇到瓶颈。查看这篇博文[2],看看它是否符合您的情况。如果不是磁盘,也检查CPU

[1]https://flink.apache.org/2021/01/18/rocksdb.html

[2]https://www.ververica.com/blog/the-impact-of-disks-on-rocksdb-state-backend-in-flink-a-case-study

益楷
2023-03-14

关于您的设置,我要更改的第一件事是移动状态。后端。rocksdb。localdir到每个TM pod的本地临时存储。如果RocksDB本地磁盘实际上是NFS装载,则可能会导致问题。

至于异步持续时间,检查点分两个阶段完成。一个(希望简短)同步阶段,在此期间流处理暂停,以便可以检查点一些关键内容,然后是一个通常更长的异步阶段,在此期间键控状态被检查点(在与正在进行的流处理分开的线程中)。

 类似资料:
  • 我们正在尝试使用RocksDB后端设置Flink有状态作业。我们使用会话窗口,有30分钟的间隔。我们使用aggregateFunction,所以不使用任何Flink状态变量。通过采样,我们的事件数不到20k次/秒,新会话数不到20-30次/秒。我们的会议基本上收集了所有的事件。会话累加器的大小会随着时间而增大。我们总共使用了10G内存和Flink1.9,128个容器。以下是设置: 从我们对给定时间

  • 一、状态分类 相对于其他流计算框架,Flink 一个比较重要的特性就是其支持有状态计算。即你可以将中间的计算结果进行保存,并提供给后续的计算使用: 具体而言,Flink 又将状态 (State) 分为 Keyed State 与 Operator State: 2.1 算子状态 算子状态 (Operator State):顾名思义,状态是和算子进行绑定的,一个算子的状态不能被其他算子所访问到。官方

  • 下面是我对Flink的疑问。 我们可以将检查点和保存点存储到RockDB等外部数据结构中吗?还是只是我们可以存储在RockDB等中的状态。 状态后端是否会影响检查点?如果是,以何种方式? 什么是状态处理器API?它与我们存储的保存点和检查点直接相关吗?状态处理器API提供了普通保存点无法提供的哪些额外好处? 对于3个问题,请尽可能描述性地回答。我对学习状态处理器API很感兴趣,但我想深入了解它的应

  • 但是Flink医生说: 在启用Flink检查点的情况下,Flink Kafka使用者将使用来自主题的记录,并以一致的方式定期检查其所有的Kafka偏移量以及其他操作的状态。在作业失败的情况下,Flink会将流程序恢复到最新检查点的状态,并从检查点中存储的偏移量开始重新使用来自Kafka的记录。 阅读其他来源,我猜Flink检查点将保存程序的状态以及消耗的偏移量,但Spark检查点只是保存消耗的偏移

  • 我的工作流程工作原理如下: src[Kafka]- 但我的工作是运行精细的数据完美地流向Kafka和MySQL,但它在检查点失败,附加图像相同。 Ps :目前我已经禁用了检查点,但是当我使用相同的属性启用时,它会失败

  • 主程序正在消费kafka事件,然后过滤- 但是我得到了以下例外: 以下是flink-conf.yaml中的一些配置 任何想法为什么会发生异常以及如何解决问题? 谢谢