当前位置: 首页 > 知识库问答 >
问题:

使用spring kafka在kafka消费者中手动提交偏移量的方法

束志业
2023-03-14

我正在尝试找出使用Spring-Kafka(1.1.0. RELEASE)在Kafka消费者中手动提交偏移的方法。我明白,最好将这些偏移提交给健壮的客户端实现,这样其他消费者就不会处理重复的事件,这些事件最初可能是由现已死亡的消费者处理的,或者因为重新平衡被触发了。

我知道有两种方法可以解决这个问题-

>

  • 将ACK_MODE设置为MANUAL_IMMEDIATE,并在侦听器实现中调用ack.acknowledge()API

    ConcurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
    
    @KafkaListener(topics = "anotherBatchTopic", containerFactory = "batchContainerFactory")
    public void listen(List<ConsumerRecord<byte[], String>> batch, Acknowledgment acknowledgment) throws Exception {
        logger.info("===*** batch size : " + batch.size() + "***===");
        batch.forEach(System.out::println);
        acknowledgment.acknowledge();
    }
    

    实现消费者重新平衡监听器。

    ConcurrentKafkaListenerContainerFactory.getContainerProperties().setConsumerRebalanceListener(new ConsumerRebalanceListener() {
    
    @Override
    public void onPartitionsRevoked(final Collection<TopicPartition> collection) {
    
    }
    
    @Override
    public void onPartitionsAssigned(final Collection<TopicPartition> collection) {
    
    }
    });
    

    然而,使用这种方法,我不知道如何获得消费者的引用,以调用消费者。commitSync()使用者。commitASync()API。由于一些技术限制,我无法使用最新版本的spring kafka,该版本支持ConsumerAwareRebalanceListener,并引用了Consumer

    那么,如何利用ConsumerBalanceListener向Kafka提交偏移量呢?

    此外,我正在使用ConcurrentKafkAlisterContainerFactory启动多个消费者侦听器线程。setConcurrency()API,那么如果特定的使用者线程死亡,它是否有自己的ConsumerBalanceListener实例?

  • 共有1个答案

    裴实
    2023-03-14

    消费者重新平衡侦听器不能用于提交偏移量;使用1. x;唯一的方法是通过确认参数。

    1.1.0非常古老;全部1。建议x用户至少升级到1.3.5;得益于KIP-62,它的线程模型简单多了——请参见项目页面。

    当前版本2.1.6(2.1.7将于今天发布)有更多选项,包括ConsumerRawareMessageListener,您可以在其中完全访问消费者。

    您不能从带有版本的侦听器直接与消费者交互

     类似资料:
    • 我对SpringBoot中的Kafka批处理侦听器有问题。 这是@KafkaListener 对于我的问题,这个解决方案不起作用,因为提交批处理。对于我的解决方案,我需要提交单个消息的偏移量。 我尝试使用

    • null 当侦听器处理记录后返回时提交偏移量。 如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。 我的配置: 验证偏移量的命令: 我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1

    • 我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。

    • 我的用例是使用kafka消费者api,这样我们就可以从kafka主题中手动读取最后一次成功处理的数据的偏移量,然后手动确认Kafka的成功处理数据。(这是为了减少数据丢失)。然而,在我当前的实现中,程序向前移动并从下一个偏移读取,即使我注释掉了“ack.acknowledge()”。我是新来的Kafka和实现我的消费者下面的方式(我们使用Spring引导) 问题是:即使我注释掉ack.acknow

    • 我已经将enable.auto.commit设置为true,并将auto.commit.interval.ms设置为10,000(即10秒)。现在我的问题是--消费者是每个记录的提交偏移量,还是根据10秒内消耗的记录数提交并提前偏移量?

    • 我有一个用户轮询从订阅的主题。它消耗每条消息并进行一些处理(在几秒内),推送到不同的主题并提交偏移量。 总共有5000条信息, 重新启动前-消耗2900条消息和提交的偏移量 kafka版本(strimzi)>2.0.0 kafka-python==2.0.1