当前位置: 首页 > 知识库问答 >
问题:

从Kafka 10消费者消费消息而不提交

竺鸿骞
2023-03-14

我要求从主题中读取消息,对它们进行批处理,然后将批处理推送到外部系统。如果批处理因任何原因失败,我需要再次使用同一组消息并重复该过程。因此,对于每个批处理,每个分区的 from 和 to 偏移量都存储在数据库中。为了实现这一点,我通过向读取器分配分区来为每个分区创建一个Kafka使用者,基于先前存储的偏移量,使用者寻求该位置并开始读取。我已关闭自动提交,并且不提交来自使用者的偏移量。对于每个批处理,我为每个分区创建一个新的使用者,从存储的最后一个偏移量读取消息并发布到外部系统。您是否认为在不提交偏移量和跨批次使用相同的使用者组的情况下使用消息有任何问题,但在任何时候每个分区都不会有多个使用者?

共有2个答案

司英飙
2023-03-14

在kafka版本2中,我实现了这种行为,而无需数据库来存储偏移。以下是sping-boot-kafka的配置,但它也应该适用于任何kafka消费者api

spring:
  kafka:
    bootstrap-servers: ...
    consumer:
      value-deserializer: ...
      max-poll-records: 1000
      enable-auto-commit: false
      fetch-min-size: 262144 # 1/4 mb..
      group-id: ...
      fetch-max-wait: 10000 # we will consume every 10s or when 1/4 mb or 1000 records are accumulated.
      auto-offset-reset: earliest
    listener:
      type: batch
      concurrency: 7
      ack-mode: manual

这将提供最多1000条记录的批量消息(取决于负载)。然后,我将这些记录异步写入数据库,并计算得到的成功回调次数。如果成功写入等于接收到的批大小,则确认批,例如提交偏移量。即使在高负载生产环境中,这种设计也非常可靠。

萧允晨
2023-03-14

我觉得你的设计很合理。

将偏移量提交到Kafka只是Kafka中一种方便的内置机制,用于跟踪偏移量。但是,没有任何要求使用它 - 您也可以使用任何其他机制来跟踪偏移(例如使用数据库,就像在您的案例中一样)。

此外,如果您手动分配分区,无论如何都不会有组管理。因此参数< code>group.id不起作用。更多详情请见http://docs.confluent.io/current/clients/consumer.html

 类似资料:
  • 我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。 consumer.java 当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行 此外,当我运行指向 AATest 主题的单独控制台使用

  • 我有一个springboot消费者应用程序。当我第一次运行它时,它消耗了来自Kafka主题的信息。但当我再次运行它时,它停止了消耗。在日志中,我看到以下消息。 我知道消费者无法获得偏移量。在这种情况下,消费者将引用自动偏移重置属性。如您所见,我已将其设置为,希望消费者从头开始阅读。但它没有。 应用程序. yml 在我的Java课上 我尝试了一些东西。 我将值设置为。不出所料,它抛出了一个异常,抱怨

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 我在ActiveMQ中使用异步消息使用者。我的制作人工作正常,向队列发送消息。现在,我的异步消息消费者正在等待调用onMessage(),但这从未发生过。因此,问题是: 异步使用者不会使用消息 ActiveMQ日志的快照还显示了许多刚刚堆积在挂起状态中的消息: 我想不出问题到底出在哪里。 计数: toPageIn 78 只是不断增加,信息仍然无法传递给消费者。 是服务器端问题还是客户端问题?

  • 启动使用者接收消息 根据我的理解,consumer直接使用来自broker的消息,但在上面的consumer命令中,我们没有提到broker,而只提到zookeeper。消费者是否会连接到zookeeper(而不是broker)来消费消息?

  • 有什么方法可以阻止Kafka的消费者在一段时间内消费信息吗?我希望消费者停止一段时间,然后开始消费最后一条未消费的消息。