当前位置: 首页 > 知识库问答 >
问题:

成功批量插入后更新Kafka提交偏移量

尚嘉庆
2023-03-14

我有一个spring kafka消费者,它读取记录并将其交给缓存。计划的任务将定期清除缓存中的记录。我只想在批成功保存到数据库中后更新提交偏移量。我尝试将Acknowledgement对象传递给缓存服务以调用acknowledge方法,如下所示。

public class KafkaConsumer {
    @KafkaListener( topicPattern = "${kafka.topicpattern}", containerFactory = "kafkaListenerContainerFactory" )
    public void receive( ConsumerRecord<String, String> record, Acknowledgment acknowledgment ) {
        cacheService.add( record.value(), acknowledgment );
    }
}

public class CacheService {
    // concurrency handling has been left out in favor of readability
    public void add( String record, Acknowledgment acknowledgment ) {
        this.records.add(record);
        this.lastAcknowledgment = acknowledgment;
    }

    public void saveBatch() { //called by scheduled task
        if( records.size() == BATCH_SIZE ) {
            // perform batch insert into database
            this.lastAcknowledgment.acknowledge();
            this.records.clear();
        }
    }
}

确认模式设置如下:

factory.getContainerProperties().setAckMode( AbstractMessageListenerContainer.AckMode.MANUAL );

自动提交是错误的:

config.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );

即使调用了确认方法,提交偏移量也不会更新。持久化记录后更新提交偏移量的最佳方法是什么?

我正在使用spring-kafka 2.1.7。释放。

编辑:@GaryRussell确认在下一次轮询期间,外部线程所做的确认由消费者线程执行后,我重新检查了代码,发现上一个确认对象的设置存在错误。修复此问题后,提交偏移量将按预期进行更新。所以这个问题已经解决了。然而,我无法将这个问题标记为已回答。

共有1个答案

井疏珂
2023-03-14

问题是,消费者线程负责提交偏移量。在轮询时,消费者线程将提交上一批偏移量。

因为在您的情况下,AUTO_COMMIT是错误的,lastAcknowledge。acknowledge()未确认偏移量未提交。

只有一种方法可以做到这一点,一旦你获得投票记录,将任务安排为异步任务,并在html" target="_blank">异步任务完成后保持消费者线程并提交偏移量,检查此答案作为参考答案

注意:如果你按住消费者线程超过5分钟,将在此处进行重新平衡

新的Java消费者现在支持后台线程的心跳。有一个新的配置max.poll。间隔ms,它控制用户主动离开组之前轮询调用之间的最长时间(默认为5分钟)。配置请求的值。暂停。ms必须始终大于max.poll。间隔ms因为这是消费者重新平衡时JoinGroup请求在服务器上可以阻止的最长时间,所以我们将其默认值更改为略高于5分钟。最后是会话的默认值。暂停。ms已调整为10秒,默认值为max.poll。记录已更改为500。

来自SpringKafka的特别提示

在外国线程上所做的确认将在下一次投票前由消费者线程执行。感谢@Gary Russell提供此信息

 类似资料:
  • 我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。

  • 我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端

  • 我正在实现spring kafka批处理侦听器,它读取来自kafka主题的消息列表,并将数据发布到REST服务。我想了解在REST服务停止的情况下的偏移管理,不应该提交批处理的偏移,应该为下一次轮询处理消息。我已经阅读了spring kafka文档,但在理解侦听器错误处理程序和批量查找当前容器错误处理程序之间的区别时存在困惑。我使用的是spring-boot-2.0.0。M7及以下版本是我的代码。

  • 我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?

  • 我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。

  • 我有: 连接的Kafka消费者 此外,我有一个方法,它接受两个参数:消费者和一个重新平衡侦听器,该侦听器跟踪分配给消费者的分区 此方法在计时器上运行,其目标是处理记录,直到没有剩余的记录可读取,或者直到所有分区中的某个最长时间。 由于重新平衡可能发生在使用过程中(在consumer.poll()已触发多次之后),因此我希望检测此情况,重置并从所有分配的分区(即使已分配)的最后提交偏移量开始重新启动