当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者会议计时结束

厍胤运
2023-03-14

我们有一个应用程序,消费者读取一条消息,线程执行许多操作,包括在生成另一主题的消息之前访问数据库。在线程上消耗和生成消息之间的时间可能需要几分钟。一旦生成了指向新主题的消息,就会进行提交,以表明我们已经完成了对消费者队列消息的处理。自动提交因此被禁用。

我正在使用高级消费者,我注意到的是zoowatch和kafka会话超时,因为我们在消费者队列上做任何事情之前需要太长时间,所以kafka每次线程返回以从消费者队列中读取更多内容时都会重新平衡,并开始花很长时间消费者才会在一段时间后阅读新消息。

我可以设置动物园管理员会话超时非常高,不使这成为一个问题,但然后我必须相应地调整再平衡参数,Kafka不会拿起一个新的消费者在其他副作用。

我有什么办法来解决这个问题?有没有办法让Kafka和动物园管理员都开心?如果我使用一个简单的消费者,我还会有这些问题吗?

共有2个答案

施阳夏
2023-03-14

我认为问题在于消费者的投票方法触发了消费者的心跳请求。当你增加训练时。超时。消费者的心跳将无法到达协调器。由于心跳跳动,协调员标记消费者死亡。而且消费者重新加入非常缓慢,尤其是在单一消费者的情况下。

我遇到了一个类似的问题,为了解决这个问题,我必须在consumer配置属性中更改以下参数

session.timeout.ms=request.timeout.ms=超过会话超时

您还必须在服务器中添加以下属性。kafka代理节点上的属性。组最大会话数。超时。ms=

您可以查看以下链接了解更多详细信息。http://grokbase.com/t/kafka/users/16324waa50/session-timeout-ms-limit

钱志强
2023-03-14

听起来,您的问题归结为依赖高级消费者来管理上次读取偏移量。使用一个简单的消费者可以解决这个问题,因为您可以控制该偏移的持久性。请注意,高级消费者提交所做的一切都是将最后读取偏移存储在zoowatch中。没有采取其他操作,您刚刚读取的消息仍在分区中,其他消费者可以读取。

使用kafka simple consumer,您可以更好地控制偏移存储发生的时间和方式。您甚至可以将该偏移保留在Zookeeper之外的其他位置(例如,数据库)。

坏消息是,虽然简单消费者本身比高级消费者更简单,但要使其工作,您还需要在代码方面做更多的工作。您还必须编写代码来访问多个分区——这是高级消费者非常适合您的。

 类似资料:
  • 我正在做一个Kafka的消费者计划。最近我们在PROD环境下进行了部署。在那里,我们面临以下问题: 我的理解是,当组协调器不可用并被重新发现时,心跳间隔(根据文档为3秒)过期,消费者被踢出组。这是正确的吗?。如果是这样的话,应该为这个工作做些什么呢?。如果我错了,请帮助我理解这个问题,并建议您有任何想法,以解决这个问题。如果需要,我可以分享代码。

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 我是Kafka的新手,我对消费者的理解是,基本上有两种类型的实现 1)高级消费者/消费者群体 2)简单消费者 高级抽象最重要的部分是当Kafka不关心处理偏移量,而Simple消费者对偏移量管理提供了更好的控制时使用它。让我困惑的是,如果我想在多线程环境中运行consumer,并且还想控制偏移量,该怎么办。如果我使用消费者组,这是否意味着我必须读取存储在zookeeper中的最后一个偏移量?这是我