当前位置: 首页 > 知识库问答 >
问题:

当产生到主题时,Kafka流不增加偏移量1

东方建修
2023-03-14

我实现了一个简单的Kafka死信记录处理器。

当使用从控制台生成器生成的记录时,它可以完美地工作。

然而,我发现我们的Kafka Streams应用程序并不能保证向接收器生成记录,对于每个生成的记录,偏移量将增加1。

我有一个场景,其中记录可能在处理它所需的所有数据发布之前被接收。当streams应用程序处理的记录不匹配时,它们将移动到一个死信主题,而不是继续向下流。当发布新数据时,我们将最新的消息从死信主题转储回流应用程序的源主题,以便与新数据一起重新处理。

死信处理器:

  • 在运行应用程序开始时记录每个分区的结束偏移量
  • 结束偏移量标记停止处理给定死信主题的记录的点,以避免在重新处理的记录返回死信主题时出现无限循环。
  • 应用程序通过使用者组从上次运行产生的上次偏移恢复。
  • 应用程序正在使用事务和KafkaProducer#SendoffsetstoTransaction提交最后产生的偏移量。

为了跟踪何时为主题的分区处理了我范围内的所有记录,我的服务将其上次从生产者生成的偏移量与消费者保存的结束偏移量映射进行比较。当我们到达结束偏移量时,使用者通过kafkaconsumer#pause暂停该分区,并且当所有分区都暂停(意味着它们到达保存的结束偏移量)时,调用它退出。

Kafka消费者API指出:

偏移量和消费者位置Kafka为分区中的每个记录维护一个数值偏移量。该偏移量用作该分区中记录的唯一标识符,还表示使用者在分区中的位置。例如,处于位置5的消费者已经消费了具有偏移0到4的记录,并且接下来将接收具有偏移5的记录。

Kafka Producer API引用的下一个偏移量也总是+1。

向使用者组协调器发送指定偏移量的列表,还将那些偏移量标记为当前事务的一部分。只有在事务提交成功的情况下,才会认为提交了这些偏移。提交的偏移量应该是应用程序将要使用的下一个消息,即lastProcessedMessageOffset+1。

我想这可能是一个Kafkahtml" target="_blank">配置问题,比如max.message.bytes,但没有一个真正有意义。然后我想也许是从加入,但没有看到任何方式将改变的方式,制作人将发挥作用。

不确定它是否相关,但我们所有的Kafka应用程序都在使用Avro和模式注册表...

无论生产方法如何,补偿是否总是增加1,或者是否可能使用Kafka streams API不能提供与普通生产者和消费者客户端相同的保证?

是不是我完全错过了什么?

共有1个答案

孙梓
2023-03-14

消息偏移量增加1并不是官方的API契约,即使JavaDocs指出这一点(似乎应该更新JavaDocs)。

>

  • 如果您不使用事务,您得到的要么是至少一次语义,要么是没有保证(有些人称之为最多一次语义)。对于至少一次,记录可能被写入两次,因此,由于重复写入“消耗”两个偏移,两个连续消息的偏移实际上不会增加1。

    如果您使用事务,事务的每个提交(或中止)都将提交(或中止)标记写入主题--那些事务标记也“消耗”一个偏移量(这是您观察到的)。

    因此,通常不应依赖连续偏移。您得到的唯一保证是,每个偏移量在一个分区中是唯一的。

  •  类似资料:
    • 我在Kafka·吉拉也描述了这个问题:https://issues.apache.org/jira/browse/KAFKA-13014 我们有多个实例和线程的Kafka流。 这个Kafka流消耗了很多话题。 其中一个主题分区一天内无法访问,主题保留时间为4小时。 解决问题后,Kafka流正试图从不再存在的偏移量中消费: Kafka消费群体描述: 我们可以看到KS正在等待的当前偏移量是 Kafka

    • 我用Kafka和spring-布特: Kafka制作人班: Kafka-配置: 问题: 我有一个主题的5个分区,比方说。 发生的情况是,我获得成功(即消息成功发送到Kafka)日志,但是topic的无分区的偏移量增加。 正如您在上面看到的,我添加了日志和。我所期望的是,当Kafka不能发送消息给Kafka时,我应该得到一个错误,但在这种情况下,我没有收到任何错误消息。 Kafka的上述行为以的比例

    • 我有一个问题,假设有一个TOPIC T1,有两个消费者C1和C2属于两个不同的组,电流偏移量是0.我们知道Kafka维护消费者的偏移量。因此,如果 C1 使用消息并且 Offset 变为 1,那么如果 C2 使用消息,它将从 1 偏移量开始,还是从 0 偏移量开始使用消息,会发生什么情况?表示两个不同的消费群体将如何维持抵消? 谢啦

    • 为什么实际主题中的偏移值与同一主题中的偏移值不同?PFB偏移位置以及使用的命令。 我错过了什么?

    • 问题内容: 在轮询Kafka时,我已经使用该功能订阅了多个主题。现在,我想设置的偏离,我想从每个主题阅读,而无需每次重新订阅后,并从一个话题。 在轮询数据之前,是否可以迭代调用每个主题名称 来 达到结果?偏移量如何精确存储在Kafka中? 我每个主题有一个分区,并且只有一个使用者可以读取所有主题。 问题答案: Kafka如何存储每个主题的偏移量? 卡夫卡已将抵销存储从动物园管理员转移到卡夫卡经纪人

    • 我使用的是0.10.1.1 API的高级使用者。 奇怪的是,当我关闭应用程序并重新启动它时,偏移量比上次提交的偏移量大一点,我找不到原因。 我在代码中只有一个提交点。 一个分区的示例: 关机前偏移量:3107169023 分区分配时的偏移量:3107180350