当前位置: 首页 > 知识库问答 >
问题:

如何处理Kafka连接水槽中的背压?[关闭]

戈建白
2023-03-14

这个问题似乎不是关于特定的编程问题、软件算法或主要由程序员使用的软件工具。如果您认为这个问题在另一个Stack Exchange网站上是主题,您可以留下评论来解释这个问题在哪里可以得到回答。

我们构建了一个定制的Kafka Connect sink,它反过来调用一个远程REST API。我如何将背压传播到Kafka Connect基础设施,以便在远程系统比内部使用者向put()传递消息慢的情况下,不经常调用put()?Kafka connect文档说我们不应该在put()中阻塞,而应该在flush()中阻塞。但是在put()中不阻塞意味着我们必须缓冲数据,如果put()比flush()调用得更频繁,这在某些时候肯定会导致OOM异常。我曾见过一个kafka消费者被允许在循环中调用pause()或block()。有可能在kafka connect sink中利用这一点吗?

共有3个答案

唐信瑞
2023-03-14

不幸的是,Kafka中没有内置的反压机制,您需要其他框架的帮助,例如Akka的Al

如果您需要示例代码,可以查看我的博客,blog1,blog2。

邹丰羽
2023-03-14

如果您可以使用某种自动缩放器,一个想法可能是在接收器上收集一些度量,并相应地缩放连接器,无论是在工作人员还是在任务级别(在本例中通过RESTAPI)。

唐腾
2023-03-14

我已经看到 kafka 消费者被允许在 loop() 中调用 pause() 或块。是否可以在 kafka 连接接收器中利用它?

您可以在整个连接器上调用< code>/pause,尽管我不确定此时未刷新的消息会发生什么。

但是在 put() 中不阻塞意味着我们必须缓冲数据,这肯定会在某个时候导致 OOM 异常

当然可以,但这确实是保存数据超过必要时间的唯一可行选择。例如,S3和HDFS连接器就是这样工作的。

rotate.interval.ms
调用文件提交的时间间隔(以毫秒为单位)…

您的HTTP客户端连接可能会阻止发出请求,不是吗?

另一种方法是让您的HTTP服务器嵌入Kafka消费者,以便它可以自行轮询消息并在本地对其进行操作,而不需要通过HTTP发送请求。

 类似资料:
  • kafka jdbc接收器连接器是否支持将其使用的内容写入不同的主题。我正在寻找一种传递机制,如下图所示。如果没有,我可以链接一个接收器和源(从接收器写的地方读取),但我认为这不会有那么好的性能。也许我可以修改现有的接收器连接器来实现这一点?

  • 我正试图编写一个应用程序,将与Kafka集成使用骆驼。(版本-3.4.2) 我从这个问题的答案中借用了一种方法。 我有一条路线可以监听Kafka主题的信息。通过使用一个简单的执行器,该消息的处理与消耗是分离的。每个处理都作为任务提交给该执行者。消息的顺序并不重要,唯一需要考虑的因素是消息处理的速度和效率。我已禁用自动提交,并在任务提交给执行者后手动提交消息。丢失当前正在处理的消息(由于崩溃/关闭)

  • 我有一份flink的工作,从Kafka读取数据,执行某些聚合,并将结果写入elasticsearch索引。我看到震源上有很高的背压。高背压导致数据从Kafka缓慢读取,我看到数据在网络堆栈中排队(netstat RecvQ显示源Kafka连接中有上万字节的数据,数据最终被读取),这反过来会导致数据在延迟后沉入elasticsearch,并且延迟持续增加。 源每分钟产生约17500条记录,Flink

  • 问题内容: 我试图了解哪些是应用程序的“物理”限制。 在客户端: 在服务器端: 在OSX中达到文件限制(256)时,统计信息如下 让我感到困惑的是: 如果我强行关闭连接(这是我想对客户端执行的操作,为什么我仍在使用文件句柄(因此达到文件限制),请执行以下操作:编辑:添加延迟似乎使服务器可以保持呼吸并且永远不会达到文件限制)? 有没有一种方法可以完全关闭套接字,以便可以确定很少达到文件限制(我知道可

  • 我正在运行一个流式flink作业,它消耗来自kafka的流式数据,在flink映射函数中对数据进行一些处理,并将数据写入Azure数据湖和弹性搜索。对于map函数,我使用了1的并行性,因为我需要在作为全局变量维护的数据列表上逐个处理传入的数据。现在,当我运行该作业时,当flink开始从kafka获取流数据时,它的背压在map函数中变得很高。有什么设置或配置我可以做以避免背压在闪烁?

  • 我有一个Flink 1.11作业,它使用来自Kafka主题的消息,键入它们,过滤它们(keyBy后跟自定义ProcessFunction),并通过JDBC接收器将它们保存到db中(如下所述:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html) Kafka消费者使用以下选项初始化: