我有一个Python进程(或者更确切地说,在一个使用者组中并行运行的一组进程),它根据来自某个主题的Kafka消息输入来处理数据。通常每条消息的处理都很快,但有时,取决于消息的内容,可能需要很长时间(几分钟)。在这种情况下,Kafka broker断开客户端与组的连接,并启动重新平衡。我可以将session_timeout_ms
设置为一个非常大的值,但它可能会超过10分钟,这意味着如果客户机死亡,集群在10分钟内无法正确地重新平衡。这似乎是个坏主意。而且,大多数消息(大约98%)都是快速的,所以仅仅为1-2%的消息支付这样的罚款似乎是浪费。此外,大型消息的频繁程度足以导致大量的重新平衡,并花费大量的性能(因为当组重新平衡时,什么也没有做,然后“死亡”的客户端再次加入并导致另一次重新平衡)。
所以,我想知道,有没有其他方法来处理需要很长时间处理的消息呢?有没有办法手动启动心跳来告诉经纪人“没事,我还活着,我只是在处理消息”?我以为Python客户机(我使用kafka-python1.4.7
)应该为我做到这一点,但似乎没有做到。还有,API似乎甚至根本就没有单独的“心跳”功能。据我所知,调用poll()
实际上会得到下一个消息--而我甚至还没有处理完当前的消息,而且还会搞乱Kafka consumer的iterator API,它在Python中使用非常方便。
如果我没记错的话,Kafka集群是合流的,版本2.3。
在Kafka中,0.10.1+Kafka轮询和会话心跳相互解耦。你可以在这里得到解释
max.poll.interval.ms超时前允许使用者实例完成处理的时间是指如果处理时间超过max.poll.interval.ms时间,使用者组将假定它从使用者组中删除并调用Rebalance。
若要增加,这将增加预期轮询之间的间隔,从而使消费者有更多的时间处理轮询返回的一批记录(长)。但同时,它也会延迟组再平衡,因为消费者只会在呼叫轮询内加入再平衡。
session.timeout.ms是用于标识使用者是否仍然活动并在定义的间隔(heartbeat.interval.ms)上发送心跳的超时。一般情况下,拇指规则是heartbeat.interval.ms应该是会话超时的1/3,这样在网络故障的情况下,使用者在会话超时之前最多可以错过3次heartbeat。
>
session.timeout.ms:低值有助于更快地检测故障。
max.poll.interval.ms:较大的值将减少由于处理时间增加而导致的失败风险,但是会增加再平衡时间。
另一种方法是,如果您真的想要摆脱重新平衡,您可以使用partition assign,在每个使用者实例上手动分配分区。在这种情况下,每个使用者实例将使用它们自己分配的分区独立运行。但在这种情况下,您将无法利用重新平衡功能来自动分配分区。
2013/04/22 12:35:56[错误]2709#0:*1从上游读取响应标头时上游超时(110:连接超时),客户端:xx.xx.xx.xx,服务器:,请求:“get/entity/datasenders/http/1.1”,上游:“uwsgi://127.0.0.1:9001”,主机:“xxx.xx.xx.x” 我已经设置了标题超时和uWSGI发送/读取超时5分钟,有人能告诉我我可以做什么来
应用程序有一个JMS队列负责交付审计日志。应用程序将日志发送到JMS队列,该队列由MDB使用。 但是,发送的消息是大 XML 文件,大小从 20 MB 到 100 MB 不等。问题在于 JMS 队列使用消息的时间太长,从而导致内存不足错误。 我应该怎么做才能解决这个问题?
我使用javamail通过IMAP协议从exchage帐户读取邮件。这些邮件是纯格式的,内容是XML。 几乎所有这些邮件的大小都很短(通常小于100Kb)。然而,有时我不得不处理大型邮件(大约10Mb-15Mb)。例如,昨天我收到一封13Mb大小的电子邮件。仅仅读它就花了50多分钟。这正常吗?有没有办法提高它的性能?代码是: 花费如此长时间的方法是。我做错了什么?有什么提示吗? 非常感谢,我的英语
给出结果需要20多秒,而在mongo控制台中同样的查询需要不到一秒。 为什么会出现这种情况,如何减少速度差距?
我有以下PHP代码在Laravel正在执行一个MySql查询: 执行此查询需要很长时间。 我对所排序的列以及其他查询的许多列都有索引。 我该怎么办? 更新: 执行的查询: 结果: