当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka-Access Offsetsforms从特定偏移开始消费的时间

安坚诚
2023-03-14
MessageListener<String, T> messageListener = record -> {

    doStuff( record.value()));
  };

  startConsumer(messageListener);


protected void startConsumer(MessageListener<String, T> messageListener) {
ConcurrentMessageListenerContainer<String, T> container = new ConcurrentMessageListenerContainer<>(
    consumerFactory(this.brokerAddress, this.groupId),
    containerProperties(this.topic, messageListener));

   container.start();
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
        ConsumerSeekCallback callback) {

      assignments.forEach((t, o) -> callback.seek(t.topic(), t.partition(), ?????));
}

有没有其他方法可以做到这一点?

共有1个答案

王念
2023-03-14

使用ConsumerAwarereBalanceListener执行初始查找(在2.0中引入)。

当前版本是2.2.0。

如何测试ConsumerAwarereBalanceListener?

 类似资料:
  • 我是 kafka 的新手,并试图了解是否有办法从上次使用的偏移量读取消息,但不是从头开始。 我正在写一个例子,这样我的意图就不会偏离。 有没有一种方法可以获取从上次使用的偏移量生成的消息。?

  • 我已经将enable.auto.commit设置为true,并将auto.commit.interval.ms设置为10,000(即10秒)。现在我的问题是--消费者是每个记录的提交偏移量,还是根据10秒内消耗的记录数提交并提前偏移量?

  • 相反,我需要做的是将更改为新的内容,然后它将从最早的偏移量恢复。 会不会有其他的犯罪行为? 更新 根据我的理解,这看起来像是每次auto commit enable为false时,它都将提交偏移量。这是Camel Kafka组件的一个特性,因为即使启用了自动提交,它也将在x条消息之后同步

  • 我阅读了Kafka的所有文档,我读到的唯一方法是git和指定 但是为了客户的订单,我需要使用Spring,所以我的Kafkaendpoint是这样的 但是得到一个例外 无法为属性找到合适的setter:offsetRepository,因为没有具有相同类型的setter方法:java.lang.String也不可能进行类型转换:没有类型转换器可用于从类型转换:java.lang.String到所需

  • 试图理解消费者补偿和消费者群体补偿之间的关系。 下面的堆栈溢出链接提供了对消费群体补偿管理的极好理解<什么决定Kafka消费补偿?现在问题来了, 情节: 我们在一个消费者组组1中有消费者(c1)。 偏移值是否将存储在消费者(c1)和组(group1)两个级别?或者如果消费者属于任何消费者组,偏移量将存储在仅消费者组级别? 如果偏移值将存储在两个级别中,它是否是消费者级别偏移值将覆盖消费者组级别偏移

  • 我们有一个问题,似乎Kafka消费者没有收到发布到某个主题的消息。(我说这是因为我还没有弄清楚这件事的真相,我可能错了。) 我使用Spring for Apache Kafka,而我的消费者实际上是一个用注释的方法。 这个问题是断断续续的,我很难重新创建它。 有没有一种方法让我看看Kafka经纪人的日志,或任何其他工具,以帮助我找出抵消为我的消费者?我想要具体的证据来证明我的消费者是否收到了信息。