假设,我有多个Kafka制作者同时为单个Kafka主题生成数据。
有可能得到哪个是给定生产者生产的最后一个偏移吗?
例如:
生产者:P1、P2
我想找出分别由P1和P2发布的最后一条记录的偏移量。
请注意,我不是在要求全局主题分区偏移量。
Kafka经纪人不会追踪这些信息。
只有生产商才有这些信息,但这甚至不是他们内置度量标准的一部分。如果要跟踪每个生产者的最后一次偏移量,则需要自己在每个生产者实例中创建一个度量。
例如,您可以简单地检索send()
返回的RecordMetadata
对象中的偏移量,并将其公开为度量。
这是关于让生产者知道消息是否已被消费者消费的用例。 我的想法是,每次消费消息时,消费者都会提交偏移量,生产者可以跟踪和读取当前偏移量,以查看是否消费了相应的消息。 此外,请不要犹豫,让我知道这是否是处理用例的正确方法,因为我没有Kafka的经验。我知道Kafka不是为这种方式设计的(处理上述用例),但我必须坚持使用Kafka。
我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端
我对SpringBoot中的Kafka批处理侦听器有问题。 这是@KafkaListener 对于我的问题,这个解决方案不起作用,因为提交批处理。对于我的解决方案,我需要提交单个消息的偏移量。 我尝试使用
null 当侦听器处理记录后返回时提交偏移量。 如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。 我的配置: 验证偏移量的命令: 我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1
我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。
我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。