我编写了一个Spring-Cloud-Stream应用程序,其中制作人将消息发布到指定的Kafka主题。我的问题是,如何添加生产者回调以接收确认/确认消息已成功发布到主题上?就像我们在《KafkaSpring》中所做的那样。发送(记录,新回调{…}) (维护异步生产者)。下面是我的代码:
private final Sinks.Many<Message<?>> responseProcessor = Sinks.many().multicast().onBackpressureBuffer();
@Bean
public Supplier<Flux<Message<?>>> event() {
return responseProcessor::asFlux;
}
public Message<?> publishEvent(String status) {
try {
String key = ...;
response = MessageBuilder.withPayload(payload)
.setHeader(KafkaHeaders.MESSAGE_KEY, key)
.build();
responseProcessor.tryEmitNext(response);
}
如何确保tryEmitNext已成功写入主题?
实施ProducerListener是否是一种解决方案,是否可能?在Spring Cloud Stream中找不到具体的解决方案/文档
更新
我现在已经实现了下面的内容,似乎按预期工作
@Component
public class MyProducerListener<K, V> implements ProducerListener<K, V> {
@Override
public void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
// Do nothing on onSuccess
}
@Override
public void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
log.error("Producer exception occurred while publishing message : {}, exception : {}", producerRecord, exception);
}
}
@Bean
ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<?, ?>> customizer(MyProducerListener pl) {
return (handler, destinationName) -> handler.getKafkaTemplate().setProducerListener(pl);
}
请参见Kafka制作人属性。
记录元数据通道
成功发送结果应发送到的MessageChannel
的bean名称;bean必须存在于应用程序上下文中。发送到通道的消息是已发送的消息(转换后,如果有),带有附加标头KafkaHeaders。RECORD_METADATA
。标头包含Kafka客户端提供的RecordMetadata
对象;它包括在主题中写入记录的分区和偏移量。
结果元数据元数据=SendResultMg。getHeaders()。get(KafkaHeaders.RECORD\u METADATA,RecordMetadata.class)
失败发送到生产者错误通道(如果配置);请参阅错误通道。默认值:null
您可以添加一个ServiceActivator来异步使用此通道。
到目前为止,我一直在使用RxJava,但我开始使用 projectreactor.io 的反应器堆芯,因为它符合反应性流规范。 在下面的测试中,我创建了一个生成随机数的热通量(可连接Flux)。我立即连接()它,它预取了256个值(我可以在日志中看到258个值)。我等待5秒钟来模拟订阅者直到一段时间后才会订阅。 主线程唤醒后,RnApp订阅ConnectableFlux,
我试图确定我的ColdFusion PRODUCTION服务器的最佳设置。服务器有以下规格。 ColdFusion: Enterprise Version 10 O/S: Windows Server 2012R2 Standard 处理器:Intel(R)Xeon(R)CPU E5-2660 v2@2.20GHz 已安装内存(RAM):20.0 GB 系统类型:64位操作系统,基于x64的处理器
我的插座有问题。io作为一名广播员与laravel echo合作。 我试过什么: 我可以看到用户在日志中连接: 我的队列正在运行,正在正确记录所有事件。 我可以在redis控制台中完美地看到redis我的事件和数据库通知。 但是没有广播任何事件,我也没有在laravel echo服务器控制台中看到它们。一切都在我的本地主机上运行,但在生产环境中没有,我失去了理智。 以下是我的laravel ech
hprose 为发布服务提供了多个方法,这些方法可以随意组合,通过这种组合,你所发布的服务将不会局限于某一个函数,某一个方法,某一个对象,而是可以将不同的函数和方法随意重新组合成一个服务。 AddFunction 方法 AddFunction(name string, function interface{}, option ...Options) Service 该方法的用于发布一个函数(命名函
我正在使用windows,当socket.io发出事件时,命令提示符显示以下内容: 我想不出问题出在哪里,我做错了什么?
更具体的问题是为什么通量。fromIterable()不适用于ReactorNetty HttpClient?这个简单的例子很好用。所有10项均由Flux publisher发出: 使用ReactiveSubscriber: 如下图所示,如果我在HttpClient中使用类似的订阅服务器。send(Flux.fromIterable())仅发出1项,而不是队列中的所有项。因此,由于这种流量,有些配