当前位置: 首页 > 知识库问答 >
问题:

Kafka-如何使用高级使用者在每条消息之后提交偏移量?

松景铄
2023-03-14

我用的是Kafka的高级消费者。因为我使用Kafka作为我的应用程序的“事务队列”,所以我需要绝对确保不会错过或重读任何消息。关于这一点,我有两个问题:

>

  • 如何将偏移量提交给zoomaster?我将在每条消息成功消费后关闭自动提交和提交偏移量。我似乎找不到如何使用高级消费者执行此操作的实际代码示例。有人能帮我吗?

    另一方面,我听说promisezooeger可能会很慢,所以另一种方法可能是在本地跟踪偏移量?这种替代方法可取吗?如果是,你会如何处理?

  • 共有2个答案

    陈琪
    2023-03-14

    中有两个相关设置http://kafka.apache.org/documentation.html#consumerconfigs.

    auto.commit.enable
    

    auto.commit.interval.ms
    

    如果您想将其设置为消费者在每条消息后提交偏移量,这将很困难,因为唯一的设置是在计时器间隔之后,而不是在每条消息之后。您必须对传入消息进行一些速率预测并相应地设置时间。

    一般来说,不建议将此间隔保持得太小,因为它会极大地增加zooKeer中的读/写速率,而zooKeer会变慢,因为它在其仲裁中非常一致。

    丁曦
    2023-03-14

    您可以先禁用自动提交:auto。犯罪启用=假

    然后在获取消息后提交:<代码>消费者。Commitofsets(真)

     类似资料:
    • 我正在尝试找出使用Spring-Kafka(1.1.0. RELEASE)在Kafka消费者中手动提交偏移的方法。我明白,最好将这些偏移提交给健壮的客户端实现,这样其他消费者就不会处理重复的事件,这些事件最初可能是由现已死亡的消费者处理的,或者因为重新平衡被触发了。 我知道有两种方法可以解决这个问题- > 将ACK_MODE设置为MANUAL_IMMEDIATE,并在侦听器实现中调用ack.ack

    • 我有一个用户轮询从订阅的主题。它消耗每条消息并进行一些处理(在几秒内),推送到不同的主题并提交偏移量。 总共有5000条信息, 重新启动前-消耗2900条消息和提交的偏移量 kafka版本(strimzi)>2.0.0 kafka-python==2.0.1

    • 我对SpringBoot中的Kafka批处理侦听器有问题。 这是@KafkaListener 对于我的问题,这个解决方案不起作用,因为提交批处理。对于我的解决方案,我需要提交单个消息的偏移量。 我尝试使用

    • null 当侦听器处理记录后返回时提交偏移量。 如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。 我的配置: 验证偏移量的命令: 我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1

    • 我已经将enable.auto.commit设置为true,并将auto.commit.interval.ms设置为10,000(即10秒)。现在我的问题是--消费者是每个记录的提交偏移量,还是根据10秒内消耗的记录数提交并提前偏移量?