当前位置: 首页 > 知识库问答 >
问题:

flink:处理背压(来源:kafka,sink:elasticsearch)

贲功
2023-03-14

我有一份flink的工作,从Kafka读取数据,执行某些聚合,并将结果写入elasticsearch索引。我看到震源上有很高的背压。高背压导致数据从Kafka缓慢读取,我看到数据在网络堆栈中排队(netstat RecvQ显示源Kafka连接中有上万字节的数据,数据最终被读取),这反过来会导致数据在延迟后沉入elasticsearch,并且延迟持续增加。

源每分钟产生约17500条记录,Flink作业为每个传入记录分配(事件)时间戳,执行12种不同类型的keyBy,在1分钟的翻滚窗口中匹配事件,在此键控的windows流上执行聚合操作,最后将结果写入12个不同的elasticsearch索引(每次写入都是插入)。

问题是写入elasticsearch的数据开始滞后,因此仪表板结果(建立在elasticsearch之上)不再是实时的。我的理解是,这是因为背压增加而发生的。不确定如何解决这个问题。html" target="_blank">集群本身是一个基于VM的单节点独立集群,具有64GB RAM(任务管理器配置为使用20GB)和16个vCPU。没有证据(从htop中看到)CPU或内存受到限制。只有一个任务管理器,这是此集群上唯一的flink作业。

我不确定这个问题是由于集群中的一些本地资源问题还是由于写入elasticsearch速度慢。我已将setBulkFlushMaxActions设置为1(正如我在任何地方看到的所有代码示例中所做的那样),我是否需要设置setBulkFlushInterval和/或setBulkFlushMaxSizeinMB?

我经历过https://www.da-platform.com/flink-forward-berlin/resources/improving-throughput-and-latency-with-flinks-network-stack但尚未尝试幻灯片19中列出的调整选项,不确定要为这些参数设置什么值。

最后,我认为在IntelliJ IDE中运行同一个作业时,我看不到同样的问题。

我将排除所有聚合,然后逐个将它们添加回来,看看是否有特定的聚合触发此问题?

任何特定的指针将非常感激,也将尝试setBulkFlushInterval和setBulkFlushMaxSizeinMB。

2019年1月29日更新1似乎两个节点都在以非常高的堆使用率运行,因此GC一直在运行,试图清除JVM中的空间。将物理内存从16 GB增加到32GB,然后重新启动节点。这将有望解决问题,24小时后就会知道。

共有2个答案

蒋华美
2023-03-14

通常在这种情况下,问题在于与外部数据存储的连接——要么带宽不足,要么对每条记录进行同步写入,而不是成批写入。

验证elasticsearch接收器是否是问题(而不是网络堆栈配置)的一种简单方法是将其替换为丢弃接收器(一个什么都不做的接收器),看看这是否解决了问题。类似的东西

public static class NullSink<OUT> implements SinkFunction<OUT> {
    @Override
    public void invoke(OUT value, Context context) throws Exception {
    }
}

更新:

问题是您设置了批量。脸红将max.actions设置为1,防止连接到elasticsearch服务器时出现任何缓冲。

孔俊爽
2023-03-14

通过增加(加倍)elasticsearch集群节点上的RAM并将索引刷新间隔(在所有elasticsearch索引上)设置为30(默认为1秒),问题得到了解决。进行这些更改后,闪烁中的背压报告为ok,没有数据延迟,一切看起来都很好。

 类似资料:
  • 我正在运行一个流式flink作业,它消耗来自kafka的流式数据,在flink映射函数中对数据进行一些处理,并将数据写入Azure数据湖和弹性搜索。对于map函数,我使用了1的并行性,因为我需要在作为全局变量维护的数据列表上逐个处理传入的数据。现在,当我运行该作业时,当flink开始从kafka获取流数据时,它的背压在map函数中变得很高。有什么设置或配置我可以做以避免背压在闪烁?

  • 我试图从Kafka主题中读取数据,在Flink流媒体。我试图运行以下示例代码,在APACHE Flink 1.1.3文档页面上作为示例:Apache kafka连接器, } 我有以下错误: 你能指导我修理这个吗?Kafka连接器是否存在依赖性问题。我的版本是: Flink 1.1.3

  • 我正试图编写一个应用程序,将与Kafka集成使用骆驼。(版本-3.4.2) 我从这个问题的答案中借用了一种方法。 我有一条路线可以监听Kafka主题的信息。通过使用一个简单的执行器,该消息的处理与消耗是分离的。每个处理都作为任务提交给该执行者。消息的顺序并不重要,唯一需要考虑的因素是消息处理的速度和效率。我已禁用自动提交,并在任务提交给执行者后手动提交消息。丢失当前正在处理的消息(由于崩溃/关闭)

  • 这个问题似乎不是关于特定的编程问题、软件算法或主要由程序员使用的软件工具。如果您认为这个问题在另一个Stack Exchange网站上是主题,您可以留下评论来解释这个问题在哪里可以得到回答。 我们构建了一个定制的Kafka Connect sink,它反过来调用一个远程REST API。我如何将背压传播到Kafka Connect基础设施,以便在远程系统比内部使用者向put()传递消息慢的情况下,

  • 如何使用项目Reactor背压功能与Kafka粘合剂在斯佩林云流? 如果我使用这种方式,比发布者发送延迟1秒的消息,但消费者消费消息没有任何延迟。 有可能在春雨云流中使用cunsumer上的BackPereSure吗?

  • 我正在使用至少一次检查点模式,这应该是异步化进程。有人能建议吗?我的检查点设置 我的工作有128个容器。 我想用一个30分钟的检查站看看