当前位置: 首页 > 知识库问答 >
问题:

Kafka提供生产者水平偏移吗?

子车俊材
2023-03-14

假设,我有多个Kafka制作者同时为单个Kafka主题生成数据。

有可能得到哪个是给定生产者生产的最后一个偏移吗?

例如:

生产者:P1、P2

我想找出分别由P1和P2发布的最后一条记录的偏移量。

请注意,我不是在要求全局主题分区偏移量。

共有1个答案

宋高寒
2023-03-14

Kafka经纪人不会追踪这些信息。

只有生产商才有这些信息,但这甚至不是他们内置度量标准的一部分。如果要跟踪每个生产者的最后一次偏移量,则需要自己在每个生产者实例中创建一个度量。

例如,您可以简单地检索send()返回的RecordMetadata对象中的偏移量,并将其公开为度量。

 类似资料:
  • 这是关于让生产者知道消息是否已被消费者消费的用例。 我的想法是,每次消费消息时,消费者都会提交偏移量,生产者可以跟踪和读取当前偏移量,以查看是否消费了相应的消息。 此外,请不要犹豫,让我知道这是否是处理用例的正确方法,因为我没有Kafka的经验。我知道Kafka不是为这种方式设计的(处理上述用例),但我必须坚持使用Kafka。

  • 我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端

  • 我对SpringBoot中的Kafka批处理侦听器有问题。 这是@KafkaListener 对于我的问题,这个解决方案不起作用,因为提交批处理。对于我的解决方案,我需要提交单个消息的偏移量。 我尝试使用

  • null 当侦听器处理记录后返回时提交偏移量。 如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。 我的配置: 验证偏移量的命令: 我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1

  • 我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。

  • 我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。