当前位置: 首页 > 知识库问答 >
问题:

如何使用上次保存的偏移量中的kafka消息

魏鸿哲
2023-03-14

我是Kafka的新手,一直在尝试实现一个消费者。下面是我的场景

  1. 启动消费者应用程序
  2. 产生来自生产者的消息。这些消息被消费者消费
  3. 停止消费者并再次生成消息。当我启动消费者时,在消费者被停止时发布的消息不会被读取

虽然auto.offset.commit=最早会消耗消息,但它会消耗发布到主题的所有消息。我想只消耗那些在消费者关闭时发布的消息。

var options = new KafkaOptions(new Uri(kafkaUri));
var router = new BrokerRouter(options);
var consumer = new Consumer(new ConsumerOptions(kafkaTopic, router));
var offset = consumer.GetTopicOffsetAsync(kafkaTopic, 100000).Result;
var t = from x in offset select new OffsetPosition(x.PartitionId, x.Offsets.Max());
consumer.SetOffsetPosition(t.ToArray());

foreach (Message msg in consumer.Consume()) {
    string kafkaResponse = System.Text.Encoding.UTF8.GetString(msg.Value);

    Console.WriteLine("PickList Json : " + kafkaResponse);

    offsetCommitRequest.Offset = msg.Meta.Offset;
    offsetCommitRequest.PartitionId = msg.Meta.PartitionId;
    offsetCommitRequest.Topic = kafkaTopic;
    offsetCommitRequest.Metadata = "CommitOffset";
    var offsetCommitResponse = await _kafkaPublishService.SetOffsetvalue(kafkaUri, kafkaTopic, consumerGroup, offsetCommitRequest);
}

共有1个答案

杨宏儒
2023-03-14

您应该重新启动您的消费者,而无需设置任何手动偏移量。意味着你不应该这样做:

var offset = consumer.GetTopicOffsetAsync(kafkaTopic, 100000).Result;
var t = from x in offset select new OffsetPosition(x.PartitionId, x.Offsets.Max());
consumer.SetOffsetPosition(t.ToArray());

您的消费者将从上次promise的地方赶上

亚尼克

 类似资料:
  • 我正在使用事务性KafkaProducer向主题发送消息。这个很管用。我使用的是具有read_committed隔离级别的KafkaConsumer,而我的seek和seekToEnd方法存在问题。根据文档,seek和seekToEnd方法给出了LSO(上次稳定偏移量)。但这有点让人摸不着头脑。因为它给我的价值总是一样的,主题结束了。无论最后一个条目是(由生产者提交的)还是中止的事务的一部分。例如

  • 我们有一个问题,似乎Kafka消费者没有收到发布到某个主题的消息。(我说这是因为我还没有弄清楚这件事的真相,我可能错了。) 我使用Spring for Apache Kafka,而我的消费者实际上是一个用注释的方法。 这个问题是断断续续的,我很难重新创建它。 有没有一种方法让我看看Kafka经纪人的日志,或任何其他工具,以帮助我找出抵消为我的消费者?我想要具体的证据来证明我的消费者是否收到了信息。

  • 我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。

  • 问题内容: 在轮询Kafka时,我已经使用该功能订阅了多个主题。现在,我想设置的偏离,我想从每个主题阅读,而无需每次重新订阅后,并从一个话题。 在轮询数据之前,是否可以迭代调用每个主题名称 来 达到结果?偏移量如何精确存储在Kafka中? 我每个主题有一个分区,并且只有一个使用者可以读取所有主题。 问题答案: Kafka如何存储每个主题的偏移量? 卡夫卡已将抵销存储从动物园管理员转移到卡夫卡经纪人

  • 问题内容: 我对使用Kafka和Zookeeper时在哪里存储偏移量感到困惑。在某些情况下,偏移似乎存储在Zookeeper中,而在其他情况下,偏移存储在Kafka中。 是什么决定偏移量存储在Kafka还是Zookeeper中?优点和缺点是什么? 注意:当然,我也可以将偏移量单独存储在其他数据存储区中,但这并不是本文的内容。 有关我的设置的更多详细信息: 我运行以下版本:KAFKA_VERSION

  • 我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。