当前位置: 首页 > 知识库问答 >
问题:

当手动提交特定偏移失败但序列中的下一个偏移成功提交时会发生什么?

勾通
2023-03-14

我正在使用投票机制编写一个Kafka消费者,在这个机制中,我每次投票都会收到100条消息。在使用消息之后,我一个接一个地手动提交偏移量。在提交偏移量时,有时我会从100条消息中得到一条错误消息。Rest成功提交。

Offsets
1,2,3,4,5,6,7,....100

假设偏移量5提交失败,但不包括偏移量5所有成功提交的偏移量。

那么,在这种情况下,提交失败的偏移量会发生什么变化?由于偏移量按顺序移动,我会在下一次轮询中获得提交失败的偏移量吗?

我的初步调查显示它会在下次投票中读取失败的偏移量。需要对此的专业意见。

共有1个答案

濮嘉茂
2023-03-14

不需要单独提交每个偏移量。如果你提交offsetX,这意味着所有较小的offset也被提交。

 类似资料:
  • 我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?

  • 我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端

  • 我有一个关于Kafka自动提交机制的问题。我正在使用启用自动提交的Spring-Kafka。作为一个实验,我在系统空闲(主题中没有新消息,没有正在处理的消息)的情况下,将我的消费者与Kafka的连接断开了30秒。重新连接后,我收到了如下几条消息: 第一,我不明白有什么好犯的?系统空闲(所有以前的消息都已提交)。第二,断开时间为30秒,比max.poll.interval.ms的5分钟(300000

  • 我正试着把我的头绕在Kafka的交易上,而且只绕了一次。 我已经创建了一个事务性消费者,我想确保阅读和处理某个主题的所有消息。如果事务失败,消息因此丢失,Kafka仍会提交偏移量。 更正式地说,如果流处理应用程序使用消息A并生成消息B,使得B=F(A),那么恰好一次处理意味着当且仅当成功生成B时才认为A被消耗,反之亦然。来源 基于此,我假设消息A没有被消费,因此将再次被重新处理。但这条信息将如何重

  • 我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。

  • Kafka 服务器和客户端 JAR 移至最新库:0.10.0.1 我的消费者和生产者代码使用如上所述的最新kafka jars,但仍然使用旧的消费者API(0 . 8 . 2)。 我在调用commit offset时在消费者端遇到问题。 kafka服务器端配置: 以下 Kafka 消费者的配置: 要创建消费者,我使用以下api: 和提交调用 在从 Kafka 读取消息时,我们使用以下方法来处理超时