我需要打印/记录/存储处理消息的kafka分区和偏移量。我如何才能做到这一点?我使用StreamBridge从制作人那里发送消息,还使用功能性spring kafka streams方法
Public delegateToSupplier(String id, Abc obj) {
Message<Abc> message = MessageBuilder.withPayload(obj).seHeaders(KafkaHeaders.MESSAGE_KEY, id.getBytes()).build();
streamBridge.send("out-topic", message);
}
记录元数据可通过元数据通道(异步)获得:
java prettyprint-override">@SpringBootApplication
public class So66436499Application {
public static void main(String[] args) {
SpringApplication.run(So66436499Application.class, args);
}
@Autowired
StreamBridge bridge;
@Bean
public ApplicationRunner runner() {
return args -> {
this.bridge.send("myBinding", "test");
Thread.sleep(5000);
};
}
@ServiceActivator(inputChannel = "meta")
void meta(Message<?> sent) {
System.out.println("Sent: " + sent.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class));
}
}
spring.cloud.stream.bindings.myBinding.destination=foo
spring.cloud.stream.kafka.bindings.myBinding.producer.record-metadata-channel=meta
Sent: foo-0@5
我们正在使用kafka拓扑转发向kafka主题发送记录。 我们之前使用了一个单独的生产者来发布消息,我们能够获取消息的偏移量和分区。现在我们想用上下文替换它。向前地 如何使用上下文获取Kafka接收器处理器发送的记录的偏移量和分区。向前地
我有一个主题T,它有4个分区TP1、TP2、TP4和TP4。 假设我有8条消息M1到M8。现在当我的制作人将这些消息发送到主题T时,在以下场景下,Kafka经纪人将如何接收它们: 场景1:只有一个kafka broker实例具有前面提到的分区的主题T。 现在假设kafka broker实例1宕机,消费者会作何反应?我假设我的使用者正在读取broker实例1。
我对非常陌生,我们正在使用。 我需要做的是使用来自主题的消息。为此,我必须用Java编写一个消费者,它将消费来自主题的消息,然后将该消息保存到数据库。保存消息后,将向Java消费者发送一些确认。如果确认为true,则应使用主题中的下一条消息。如果AcknowlDement为false(这意味着由于某些错误消息,从主题读取的信息无法保存到数据库中),则应再次读取该消息。 我认为我需要使用<code>
我正在用SpringKafka实现同步请求-应答模式。堆栈: 组织。springframework。云:spring云依赖项:2020.0.2 组织。springframework。Kafka:SpringKafka 木卫一。合流:kafka avro序列化程序:6.2.0 爪哇11 我有一个请求主题有5个分区和响应8个分区 我的响应消费者端配置如下。为了简洁起见,我没有显示producer配置:
问题内容: 现在,Golang Kafka库(sarama)提供了使用者组功能,而kafka 10没有任何外部库帮助。如何在任何给定时间获得使用者组正在处理的当前消息偏移量? 以前,我使用kazoo-go(https://github.com/wvanbergen/kazoo- go )来获取我的消费者组消息偏移量,因为它存储在Zookeeper中。现在,我使用sarama- cluster(ht
我们希望在读取消息表单kafka时实现并行性。因此我们想在flinkkafkaconsumer中指定分区号。它将从kafka中的所有分区读取消息,而不是特定的分区号。以下是示例代码: 请建议任何更好的选择来获得并行性。