当前位置: 首页 > 知识库问答 >
问题:

获取使用StreamBridge处理kafka消息的分区和偏移量

董子平
2023-03-14

我需要打印/记录/存储处理消息的kafka分区和偏移量。我如何才能做到这一点?我使用StreamBridge从制作人那里发送消息,还使用功能性spring kafka streams方法

Public delegateToSupplier(String id, Abc obj) {
Message<Abc> message = MessageBuilder.withPayload(obj).seHeaders(KafkaHeaders.MESSAGE_KEY, id.getBytes()).build();
streamBridge.send("out-topic", message);
}

共有1个答案

文自怡
2023-03-14

记录元数据可通过元数据通道(异步)获得:

java prettyprint-override">@SpringBootApplication
public class So66436499Application {

    public static void main(String[] args) {
        SpringApplication.run(So66436499Application.class, args);
    }

    @Autowired
    StreamBridge bridge;

    @Bean
    public ApplicationRunner runner() {
        return args -> {
            this.bridge.send("myBinding", "test");
            Thread.sleep(5000);
        };
    }

    @ServiceActivator(inputChannel = "meta")
    void meta(Message<?> sent) {
        System.out.println("Sent: " + sent.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class));
    }

}
spring.cloud.stream.bindings.myBinding.destination=foo
spring.cloud.stream.kafka.bindings.myBinding.producer.record-metadata-channel=meta
Sent: foo-0@5
 类似资料:
  • 我们正在使用kafka拓扑转发向kafka主题发送记录。 我们之前使用了一个单独的生产者来发布消息,我们能够获取消息的偏移量和分区。现在我们想用上下文替换它。向前地 如何使用上下文获取Kafka接收器处理器发送的记录的偏移量和分区。向前地

  • 我有一个主题T,它有4个分区TP1、TP2、TP4和TP4。 假设我有8条消息M1到M8。现在当我的制作人将这些消息发送到主题T时,在以下场景下,Kafka经纪人将如何接收它们: 场景1:只有一个kafka broker实例具有前面提到的分区的主题T。 现在假设kafka broker实例1宕机,消费者会作何反应?我假设我的使用者正在读取broker实例1。

  • 我对非常陌生,我们正在使用。 我需要做的是使用来自主题的消息。为此,我必须用Java编写一个消费者,它将消费来自主题的消息,然后将该消息保存到数据库。保存消息后,将向Java消费者发送一些确认。如果确认为true,则应使用主题中的下一条消息。如果AcknowlDement为false(这意味着由于某些错误消息,从主题读取的信息无法保存到数据库中),则应再次读取该消息。 我认为我需要使用<code>

  • 我正在用SpringKafka实现同步请求-应答模式。堆栈: 组织。springframework。云:spring云依赖项:2020.0.2 组织。springframework。Kafka:SpringKafka 木卫一。合流:kafka avro序列化程序:6.2.0 爪哇11 我有一个请求主题有5个分区和响应8个分区 我的响应消费者端配置如下。为了简洁起见,我没有显示producer配置:

  • 问题内容: 现在,Golang Kafka库(sarama)提供了使用者组功能,而kafka 10没有任何外部库帮助。如何在任何给定时间获得使用者组正在处理的当前消息偏移量? 以前,我使用kazoo-go(https://github.com/wvanbergen/kazoo- go )来获取我的消费者组消息偏移量,因为它存储在Zookeeper中。现在,我使用sarama- cluster(ht

  • 我们希望在读取消息表单kafka时实现并行性。因此我们想在flinkkafkaconsumer中指定分区号。它将从kafka中的所有分区读取消息,而不是特定的分区号。以下是示例代码: 请建议任何更好的选择来获得并行性。