当前位置: 首页 > 知识库问答 >
问题:

kafka消费者(通过新消费者api创建)的偏移和延迟设置为未知

邬良才
2023-03-14

我经常看到kafka消费者的当前偏移和滞后设置为未知的问题
早期消费者的偏移和滞后

GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER  
G.AB_KAFF, T.AB_KAFF, 0, 3, 6, 3, consumer-2
G.AB_KAFF, T.AB_KAFF, 1, 8, 11, 3, consumer-2

几天后,当我再次订阅该消费者时,其偏移和滞后被设置为未知

GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER  
G.AB_KAFF, T.AB_KAFF, 0, unknown, 7, unknown, consumer-2
G.AB_KAFF, T.AB_KAFF, 1, unknown, 11, unknown, consumer-2  

kafka是否删除了该消费者之前的偏移,因为我正在取消订阅整个消费群的主题?

共有1个答案

龚昊然
2023-03-14

kafka是否删除了该消费者之前的补偿,因为我正在从主题中取消订阅整个消费者群体?

是的,你是对的。< code > offsets . retention . minutes = 1440(24小时)默认值为24小时。< br >偏移的日志保留窗口(分钟)主题

 类似资料:
  • 我正在使用Spring Kafka1.0.3来消费kafka消息。Kafka的2个主题,每个主题有1个分区。在java代码中,有2@KafKalistener来消费每个主题消息。ConcurrentKafkaListenerContainerFactory的并发设置为1。但消息有时会延迟20秒以上。 有人知道为什么吗? 添加调试日志,并且延迟不是每次都可以,有时也可以:

  • 我正在使用Kafka2.0版和java消费者API来消费来自一个主题的消息。我们使用的是一个单节点Kafka服务器,每个分区有一个使用者。我注意到消费者正在丢失一些消息。场景是:消费者投票主题。我为每个线程创建了一个消费者。获取消息并将其交给处理程序来处理消息。然后使用“至少一次”的Kafka消费者语义来提交Kafka偏移量来提交偏移量。同时,我有另一个消费者使用不同的group-id运行。在这个

  • 试图理解消费者补偿和消费者群体补偿之间的关系。 下面的堆栈溢出链接提供了对消费群体补偿管理的极好理解<什么决定Kafka消费补偿?现在问题来了, 情节: 我们在一个消费者组组1中有消费者(c1)。 偏移值是否将存储在消费者(c1)和组(group1)两个级别?或者如果消费者属于任何消费者组,偏移量将存储在仅消费者组级别? 如果偏移值将存储在两个级别中,它是否是消费者级别偏移值将覆盖消费者组级别偏移

  • 我的用例是使用kafka消费者api,这样我们就可以从kafka主题中手动读取最后一次成功处理的数据的偏移量,然后手动确认Kafka的成功处理数据。(这是为了减少数据丢失)。然而,在我当前的实现中,程序向前移动并从下一个偏移读取,即使我注释掉了“ack.acknowledge()”。我是新来的Kafka和实现我的消费者下面的方式(我们使用Spring引导) 问题是:即使我注释掉ack.acknow

  • 我正在为Kafka0.9.0.0做Kafka快速入门。 我让zookeeper在监听,因为我运行了 只有一个代理在处侦听,因为我运行了 我有一个制作人在主题“测试”上发帖,因为我跑了 当我运行旧的API使用者时,它通过运行 但是,当我运行新的API使用者时,我在运行时没有得到任何东西 是否可以使用新的API从控制台使用者订阅主题?我该怎么修好它?

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?