当前位置: 首页 > 知识库问答 >
问题:

处理kafka消息需要很长时间

丁星火
2023-03-14

我有一个Python进程(或者更确切地说,在一个使用者组中并行运行的一组进程),它根据来自某个主题的Kafka消息输入来处理数据。通常每条消息的处理都很快,但有时,取决于消息的内容,可能需要很长时间(几分钟)。在这种情况下,Kafka broker断开客户端与组的连接,并启动重新平衡。我可以将session_timeout_ms设置为一个非常大的值,但它可能会超过10分钟,这意味着如果客户机死亡,集群在10分钟内无法正确地重新平衡。这似乎是个坏主意。而且,大多数消息(大约98%)都是快速的,所以仅仅为1-2%的消息支付这样的罚款似乎是浪费。此外,大型消息的频繁程度足以导致大量的重新平衡,并花费大量的性能(因为当组重新平衡时,什么也没有做,然后“死亡”的客户端再次加入并导致另一次重新平衡)。

所以,我想知道,有没有其他方法来处理需要很长时间处理的消息呢?有没有办法手动启动心跳来告诉经纪人“没事,我还活着,我只是在处理消息”?我以为Python客户机(我使用kafka-python1.4.7)应该为我做到这一点,但似乎没有做到。还有,API似乎甚至根本就没有单独的“心跳”功能。据我所知,调用poll()实际上会得到下一个消息--而我甚至还没有处理完当前的消息,而且还会搞乱Kafka consumer的iterator API,它在Python中使用非常方便。

如果我没记错的话,Kafka集群是合流的,版本2.3。

共有1个答案

牧飞鹏
2023-03-14

在Kafka中,0.10.1+Kafka轮询和会话心跳相互解耦。你可以在这里得到解释

max.poll.interval.ms超时前允许使用者实例完成处理的时间是指如果处理时间超过max.poll.interval.ms时间,使用者组将假定它从使用者组中删除并调用Rebalance。

若要增加,这将增加预期轮询之间的间隔,从而使消费者有更多的时间处理轮询返回的一批记录(长)。但同时,它也会延迟组再平衡,因为消费者只会在呼叫轮询内加入再平衡。

session.timeout.ms是用于标识使用者是否仍然活动并在定义的间隔(heartbeat.interval.ms)上发送心跳的超时。一般情况下,拇指规则是heartbeat.interval.ms应该是会话超时的1/3,这样在网络故障的情况下,使用者在会话超时之前最多可以错过3次heartbeat。

>

  • session.timeout.ms:低值有助于更快地检测故障。

    max.poll.interval.ms:较大的值将减少由于处理时间增加而导致的失败风险,但是会增加再平衡时间。

    另一种方法是,如果您真的想要摆脱重新平衡,您可以使用partition assign,在每个使用者实例上手动分配分区。在这种情况下,每个使用者实例将使用它们自己分配的分区独立运行。但在这种情况下,您将无法利用重新平衡功能来自动分配分区。

  •  类似资料:
    • 2013/04/22 12:35:56[错误]2709#0:*1从上游读取响应标头时上游超时(110:连接超时),客户端:xx.xx.xx.xx,服务器:,请求:“get/entity/datasenders/http/1.1”,上游:“uwsgi://127.0.0.1:9001”,主机:“xxx.xx.xx.x” 我已经设置了标题超时和uWSGI发送/读取超时5分钟,有人能告诉我我可以做什么来

    • 应用程序有一个JMS队列负责交付审计日志。应用程序将日志发送到JMS队列,该队列由MDB使用。 但是,发送的消息是大 XML 文件,大小从 20 MB 到 100 MB 不等。问题在于 JMS 队列使用消息的时间太长,从而导致内存不足错误。 我应该怎么做才能解决这个问题?

    • 我使用javamail通过IMAP协议从exchage帐户读取邮件。这些邮件是纯格式的,内容是XML。 几乎所有这些邮件的大小都很短(通常小于100Kb)。然而,有时我不得不处理大型邮件(大约10Mb-15Mb)。例如,昨天我收到一封13Mb大小的电子邮件。仅仅读它就花了50多分钟。这正常吗?有没有办法提高它的性能?代码是: 花费如此长时间的方法是。我做错了什么?有什么提示吗? 非常感谢,我的英语

    • 给出结果需要20多秒,而在mongo控制台中同样的查询需要不到一秒。 为什么会出现这种情况,如何减少速度差距?

    • 我有以下PHP代码在Laravel正在执行一个MySql查询: 执行此查询需要很长时间。 我对所排序的列以及其他查询的许多列都有索引。 我该怎么办? 更新: 执行的查询: 结果:

    • 问题内容: 我在重新整理模型时遇到问题。我训练了模型并使用此代码保存了模型。我不太确定这是否是正确的方法,我将不胜感激。当我尝试还原模型时会发生问题。我只需要预测,就不会再接受过培训了。从模型中恢复参数需要花费很多时间。在我仅需要预测的前提下,如何改进模型保护程序或模型恢复程序以使其快速完成。 恢复: 编辑:也许使用Google Colab的GPU训练模型,然后将其还原到我的PC上这一事实很重要。