当前位置: 首页 > 知识库问答 >
问题:

Kafka使用者在重新启动后未从上次提交的偏移量消费

章昱
2023-03-14

我有一个用户轮询从订阅的主题。它消耗每条消息并进行一些处理(在几秒内),推送到不同的主题并提交偏移量。

总共有5000条信息,

重新启动前-消耗2900条消息和提交的偏移量

kafka版本(strimzi)>2.0.0 kafka-python==2.0.1

共有1个答案

欧阳正德
2023-03-14

我们不知道您的主题中有多少个分区,但是当在同一个使用者组中创建使用者时,它们将使用来自不同分区的记录(我们不能在一个使用者组中有两个使用者使用同一个分区,如果您添加了一个使用者,组协调器将执行重新平衡的过程,将每个使用者重新分配到特定分区)。

我认为偏移量0来自属性auto.offset.reset,它可以是:

  • 最新:从日志中的最新偏移量开始
  • 最早:从最早的记录开始。
  • none:当没有现有偏移量数据时引发异常。

但只有当您的消费者组没有提交有效的偏移量时,该属性才会启动。

注意:主题中的记录具有保留期log.retention.ms属性,因此在处理日志中的第一条记录时,可以删除最新的邮件。

问题:当您想从一个主题中获取消息,处理数据并将其写入另一个主题时,为什么不使用Kafka流式处理?

 类似资料:
  • 我尝试将auto.offset.reset设置为最早和最晚,但这不会更改行为。 我在消费者配置中遗漏了什么吗?

  • 我有: 连接的Kafka消费者 此外,我有一个方法,它接受两个参数:消费者和一个重新平衡侦听器,该侦听器跟踪分配给消费者的分区 此方法在计时器上运行,其目标是处理记录,直到没有剩余的记录可读取,或者直到所有分区中的某个最长时间。 由于重新平衡可能发生在使用过程中(在consumer.poll()已触发多次之后),因此我希望检测此情况,重置并从所有分配的分区(即使已分配)的最后提交偏移量开始重新启动

  • 我正在使用事务性KafkaProducer向主题发送消息。这个很管用。我使用的是具有read_committed隔离级别的KafkaConsumer,而我的seek和seekToEnd方法存在问题。根据文档,seek和seekToEnd方法给出了LSO(上次稳定偏移量)。但这有点让人摸不着头脑。因为它给我的价值总是一样的,主题结束了。无论最后一个条目是(由生产者提交的)还是中止的事务的一部分。例如

  • 我正在尝试找出使用Spring-Kafka(1.1.0. RELEASE)在Kafka消费者中手动提交偏移的方法。我明白,最好将这些偏移提交给健壮的客户端实现,这样其他消费者就不会处理重复的事件,这些事件最初可能是由现已死亡的消费者处理的,或者因为重新平衡被触发了。 我知道有两种方法可以解决这个问题- > 将ACK_MODE设置为MANUAL_IMMEDIATE,并在侦听器实现中调用ack.ack

  • 我有一个自定义的Kafka消费者,我用它向REST API发送一些请求。根据API的响应,我要么提交偏移量,要么跳过没有提交的消息。 现在,当来自REST API的响应没有以开头时,偏移量不会提交,但消息不会被重新使用。如何强制使用者重新使用未提交的偏移信息?

  • 我对SpringBoot中的Kafka批处理侦听器有问题。 这是@KafkaListener 对于我的问题,这个解决方案不起作用,因为提交批处理。对于我的解决方案,我需要提交单个消息的偏移量。 我尝试使用