当前位置: 首页 > 知识库问答 >
问题:

如何使Flink的工作以巨大的状态完成

秦景同
2023-03-14

我们正在运行一个Flink集群来计算历史上数TB的流式数据。数据计算有一个巨大的状态,我们使用键控状态-RocksDb后端的值和映射状态。在工作计算的某个时候,工作绩效开始下降,输入和输出率下降到几乎为0。此时,可以在日志中看到诸如“与Taskmanager通信X超时错误”之类的异常情况,但作业甚至在之前就已被破坏。

我想我们面临的问题是RocksDb的磁盘后端。随着作业状态的增长,需要更频繁地访问磁盘,这会将性能拖至0。我们已经使用了一些选项,并设置了一些对我们的特定设置有意义的选项:

我们正在使用预先定义配置文件,并使用优化过滤器进行进一步优化,以获得更好的性能。然而,这些都不能提供稳定的计算,在作业中,针对更大的数据集重新运行时,作业会再次停止。

我们正在寻找的是一种修改作业的方法,以便即使在输入和状态增加时也能以一定的速度进行。我们在AWS上运行,任务管理器的限制设置为15 GB左右,磁盘空间没有限制。

共有1个答案

蓝华皓
2023-03-14

使用SPINNING_DISK_OPTIMIZED_HIGH_MEM将花费巨大的堆外内存RocksDB的memtable,看到你正在运行内存限制在15GB左右的作业,我想你会遇到OOM问题,但如果你选择默认的预定义配置文件,你将面临写失速问题或CPU开销通过解压缩Rocksdb的页面缓存,所以我认为你应该增加内存限制。这里有一些关于Rocksdb FYI的帖子:https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDBhttps://www.ververica.com/blog/manage-rocksdb-memory-size-apache-flink

 类似资料:
  • 我正在K8上运行flink cluster,状态约为1TB。 我面临的问题之一是获取保存点并恢复作业。现在,这些更新有时是简单的代码更新,而不是并行性更改。但是获取保存点然后用旧状态恢复新作业的时间相当长。 是否有方法对作业进行就地更新,以使本地状态和作业ID不发生更改,从而避免执行保存点恢复所需的时间?

  • 问题内容: 我正在尝试让Jenkins在Cloudbees上构建并运行。我已经成功安装了NodeJ,并从BitBucket存储库中提取了源代码。我正在尝试运行我的grunt任务,以在部署之前最小化并连接我的JS和CSS文件。但是,即使已成功安装,我也无法运行grunt程序。以下是我的构建脚本: 我尝试安装带有和不带有-g选项的grunt都没有成功。这是我的构建的grunt部分的控制台输出: 关于如

  • 我在工作中使用ProcessWindowFunction并保持StateValue。我的目标是将值保持在超过1个窗口的状态,这意味着状态不会在每个窗口的末尾被清除。我有两个问题: 我怎样才能清除状态?有没有设置触发器并用它来清除状态的选项?(当在ProcessFunction中使用状态时,我能够设置触发器以执行此清除,即使没有新事件) 有没有一种方法来构建一个单元测试来检查我的ProcessWin

  • 我有一个带有一些键的流,我想为每个键存储一些状态。我的流看起来如下所示: 在KeyedProcessFunction中,我有一个状态变量: 我对此还这么陌生,我做错了什么?

  • 在阅读了Flink的文档并四处搜索后,我无法完全理解Flink的句柄在其窗口中的状态。假设我有一个每小时滚动的窗口,其中包含一个聚合函数,该函数将消息累积到某个java pojo或scala case类中。该窗口的大小将与一小时内进入该窗口的事件数量相关联,还是仅仅与POJO/Case类相关联,因为我将事件累加到该对象中。(例如,如果将10000个味精数成一个整数,大小会接近10000*味精大小还

  • 我有以下CEP PatternStream,其中数据流是基于实体ID分区的,因为只有实体具有相同的实体ID时,我才对模式匹配感兴趣: 但随后我注意到检查点状态大小随着实体ID数量的增加而增加。如果我对检查点的理解是正确的,这是意料之中的,因为运算符状态的数量会增加。但我想弄清楚是否有其他方法可以最小化检查点状态大小。 > 有没有不同的方法来实现这种模式匹配,而不根据实体ID对数据流进行分区?