当前位置: 首页 > 知识库问答 >
问题:

Spring云流Kafka:如何在生成Kafka主题的消息后访问Kafka头的recordMetadata

关项明
2023-03-14

我想在生成一条发送给Kafka主题的消息后,获取偏移量和分区信息。我通读了spring cloud stream kafka绑定文档,发现这可以通过fecting RECORD\u元数据kafka头来实现。

来自Spring文档:(https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#kafka-生产者财产)

RecordMetadataChannel成功发送结果应发送到的MessageChannel的bean名称;bean必须存在于应用程序上下文中。发送到通道的消息是已发送的消息(转换后,如果有的话),带有额外的标头KafkaHeaders。RECORD_METADATA。标头包含Kafka客户端提供的RecordMetadata对象;它包括记录在主题中写入的分区和偏移量。

结果元数据元数据=SendResultMg。getHeaders()。get(KafkaHeaders.RECORD\u METADATA,RecordMetadata.class)

我已经在属性文件spring中将输出主题bean名称配置为消息通道。云流动Kafka。绑定。确认输出。制作人记录元数据通道=确认输出

我的自定义界面和生产者类如下:

public interface OutputAcknowledgement {    

    @Output("acknowledgement-out")
    MessageChannel output();

}

生产者类别:

@EnableBinding(OutputAcknowledgement.class)
public class AcknowledgementProducer {
   

    @Autowired
    OutputAcknowledgement outputAcknowledgement;

    public Boolean produce(Acknowledgement acknowledgement) {
        Message msg = MessageBuilder.withPayload(acknowledgement).build();

        boolean val = outputAcknowledgement.output().send(msg);

        RecordMetadata recordMetadata = msg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class);

为记录获取null元数据。

请建议我的方法是否正确?

共有1个答案

林烨烨
2023-03-14

您获得null是因为它在您访问时不存在于该消息对象中。根据文档,元数据仅在成功发布后提供。请参阅此答案,了解如何通过为记录元数据通道提供处理程序/消费者来获取记录元数据。

 类似资料:
  • 如何使用新的Spring Cloud Stream Kafka功能模型发送消息? 不推荐的方式是这样的。 但是我如何以函数式风格发送消息呢? 应用yml公司 我会自动连接MessageChannel,但对于process、process-out-0、output或类似的东西,没有MessageChannel Bean。或者我可以用供应商Bean发送消息吗?谁能给我举个例子吗?谢谢!

  • 我有多个冗余的应用程序实例,希望消费一个主题的所有事件,并存储它们独立的磁盘查找(通过一个rocksdb)。 为了便于讨论,让我们假设这些冗余消费者正在服务无状态http请求;因此,不使用kafka共享负载,而是使用kafka将数据从生产者复制到每个实例LocalStore中。 在查看生成的主题时,每个消费应用程序创建了3个额外的主题: null null 下面是创建存储区的代码

  • 我有一个要求加入3个Kafka主题。前两个主题A和B将使用inner join添加,因为消息键相同,并且生成一个POJO与B相同的新Kafka流。现在,使用这个累积的流,我需要加入另一个主题C,并且我需要根据C中存在的字段对输出进行分组。 到目前为止,我有以下方法: 前两个主题(A和B)的KStream-KStream inner join是否可以不发布任何主题的累积输出,并且仍然可以在下面使用它

  • 从示例中,我看到了下面的代码片段,它运行良好。但问题是:我并不总是需要处理输入流并将其生成到接收器。 如果我有一个应用程序,根据某些事件,我必须只发布到kafka主题,以便下游应用程序可以做出某些决定。这意味着,我实际上没有输入流,但我只知道当我的应用程序中发生某些事情时,我需要向kafka的特定主题发布消息。也就是说,我只需要一个接收器。 我查看了示例,但没有找到符合我要求的任何内容。有没有一种

  • 我正在尝试用《Spring的云流》和《Kafka》。下面是示例代码。但它似乎没有任何作用。它总是创建一个名为“输出”的主题。但这些价值观尚未公布。 应用亚马尔 我的目标就是创造价值。 依赖性-2.2.6。释放