当前位置: 首页 > 知识库问答 >
问题:

Kafka-处理缓慢消费者的最佳实践。如何实现更多的并行性?

徐茂材
2023-03-14

我知道一个消费者组中活动消费者的最大数量是一个主题的分区数。

对于处理速度较慢的消费者,最佳做法是什么?如何实现更多的并行性?

例如:一个主题有6个分区,生产者每秒生成数千条消息。所以我在这个群体中最多有6个消费者。考虑到处理这些消息很复杂,而且消费者比生产者慢得多。结果是,消费者总是落后于最后一个补偿,而滞后正在增加。

在传统的MQ系统中,我们只需添加越来越多的使用者以保持最新。

如何使用Kafka实现这一点,因为组中消费者的总数最多是分区的数量?我应该:

  • 是否将主题配置为具有更多分区,以允许每个组有更多使用者

这种情况下的最佳做法是什么?

共有1个答案

颛孙英勋
2023-03-14

在Kafka中,分区是并行的单位。

如果不知道我们确切的用例和需求,很难提出精确的建议,但有几个选项。

首先,您应该真正考虑拥有更多的分区。6个分区相对较小,您可以轻松拥有60个、120个甚至更多的分区(以及相应的使用者数量)。突然之间,每个消费者必须做的工作量大大减少了。

此外,如果您的需求允许,您还可以快速消费,并将记录处理分散到许多工作人员。在这样的解决方案中,很难维持秩序,但如果您不需要它,那么您可以考虑它。

我不确定通过MQ队列路由消息在这种情况下是否真的有帮助。如果您仍然读取速度比写入速度慢,队列中的数据量将增长,直到您没有剩余的磁盘空间。

Kafka更好地被设计为生产者和消费者之间的缓冲区,因此只需确保您对主题有保留限制,以便在不丢失数据的情况下为消费者提供一定的灵活性。

 类似资料:
  • 我有一个用例,在这个用例中,我有3个Kafka消费者向一个主题写作,每个消费者中的消息都需要按顺序处理。在这种情况下,如果某个消费者中存在延迟,则需要更早处理的消息将被丢弃(写入条件)。那么,有没有一种方法可以维持这些消息的顺序呢。

  • 想知道Kafka使用者(Java客户端)是否可以并行读取和处理多条消息...我的意思是使用多个线程...我应该使用rxJava吗?? 1)这样做是一个好的方法吗???2)而且根据我的理解,Kafka甚至把每一个线程都当作消费者...如果我错了,请纠正我... 3)并且还想让Java客户端作为守护进程服务在Linux中运行,这样它就可以连续运行,并且轮询Kafka的消息,读取和处理都是一样的...这

  • 如何提高Kafka消费者的绩效?我有(并且需要)至少一次Kafka消费语义学 我有以下配置。processInDB()需要2分钟才能完成。因此,仅处理10条消息(全部在单个分区中)就需要20分钟(假设每条消息2分钟)。我可以在不同的线程中调用processInDB,但我可能会丢失消息!。如何在2到4分钟的时间窗口内处理所有10条消息? 下面是我的Kafka消费者代码。

  • 我正在使用Pactman和pact-python为CDC测试做一个POC。我可以生成协议文件和验证协议与提供商的基本url而不注册到协议代理,我使用以下方法。它将检查任何失败,这是最好的做法还是我需要使用代理?

  • 问题内容: 如果我的应用程序崩溃了,它会挂起几秒钟,然后Android告诉我该应用程序崩溃了,需要关闭。所以我当时想用通用的方式捕获应用程序中的所有异常: 并做一个新的解释,说明应用程序立即崩溃(并且还使用户有机会发送包含错误详细信息的邮件),而不是由于Android而造成了延迟。是否有更好的方法来实现这一目标? 更新: 我使用的是启用了ART的Nexus 5,但我没有注意到我以前遇到的崩溃(我最

  • 问题内容: 几天前我才开始尝试使用node.js。我意识到只要程序中有未处理的异常,Node就会终止。这与我所见过的普通服务器容器不同,在普通服务器容器中,当发生未处理的异常时,只有工作线程死亡,并且容器仍然能够接收请求。这引起了一些问题: 是唯一有效的预防方法吗? 在执行异步过程期间也会捕获未处理的异常吗? 是否存在已经构建的模块(例如发送电子邮件或写入文件),在未捕获的异常的情况下可以利用该模