当前位置: 首页 > 知识库问答 >
问题:

使用Reactor堆芯发布服务器在Spring Cloud Stream中回调生产者

蔚弘量
2023-03-14

我编写了一个Spring-Cloud-Stream应用程序,其中制作人将消息发布到指定的Kafka主题。我的问题是,如何添加生产者回调以接收确认/确认消息已成功发布到主题上?就像我们在《KafkaSpring》中所做的那样。发送(记录,新回调{…}) (维护异步生产者)。下面是我的代码:

private final Sinks.Many<Message<?>> responseProcessor = Sinks.many().multicast().onBackpressureBuffer();

@Bean
public Supplier<Flux<Message<?>>> event() {
        return responseProcessor::asFlux;
    }    

public Message<?> publishEvent(String status) {
  try {
      String key = ...;
      response = MessageBuilder.withPayload(payload)
                  .setHeader(KafkaHeaders.MESSAGE_KEY, key)
                  .build();
      responseProcessor.tryEmitNext(response);
  } 

如何确保tryEmitNext已成功写入主题?

实施ProducerListener是否是一种解决方案,是否可能?在Spring Cloud Stream中找不到具体的解决方案/文档

更新

我现在已经实现了下面的内容,似乎按预期工作

@Component
public class MyProducerListener<K, V> implements ProducerListener<K, V> {

    @Override
    public void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
        // Do nothing on onSuccess
    }

    @Override
    public void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
        log.error("Producer exception occurred while publishing message : {}, exception : {}", producerRecord, exception);
    }

}

 @Bean
    ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<?, ?>> customizer(MyProducerListener pl) {
        return (handler, destinationName) -> handler.getKafkaTemplate().setProducerListener(pl);
    }

共有1个答案

隆飞驰
2023-03-14

请参见Kafka制作人属性。

记录元数据通道

成功发送结果应发送到的MessageChannel的bean名称;bean必须存在于应用程序上下文中。发送到通道的消息是已发送的消息(转换后,如果有),带有附加标头KafkaHeaders。RECORD_METADATA。标头包含Kafka客户端提供的RecordMetadata对象;它包括在主题中写入记录的分区和偏移量。

结果元数据元数据=SendResultMg。getHeaders()。get(KafkaHeaders.RECORD\u METADATA,RecordMetadata.class)

失败发送到生产者错误通道(如果配置);请参阅错误通道。默认值:null

您可以添加一个ServiceActivator来异步使用此通道。

 类似资料:
  • 到目前为止,我一直在使用RxJava,但我开始使用 projectreactor.io 的反应器堆芯,因为它符合反应性流规范。 在下面的测试中,我创建了一个生成随机数的热通量(可连接Flux)。我立即连接()它,它预取了256个值(我可以在日志中看到258个值)。我等待5秒钟来模拟订阅者直到一段时间后才会订阅。 主线程唤醒后,RnApp订阅ConnectableFlux,

  • 我试图确定我的ColdFusion PRODUCTION服务器的最佳设置。服务器有以下规格。 ColdFusion: Enterprise Version 10 O/S: Windows Server 2012R2 Standard 处理器:Intel(R)Xeon(R)CPU E5-2660 v2@2.20GHz 已安装内存(RAM):20.0 GB 系统类型:64位操作系统,基于x64的处理器

  • 我的插座有问题。io作为一名广播员与laravel echo合作。 我试过什么: 我可以看到用户在日志中连接: 我的队列正在运行,正在正确记录所有事件。 我可以在redis控制台中完美地看到redis我的事件和数据库通知。 但是没有广播任何事件,我也没有在laravel echo服务器控制台中看到它们。一切都在我的本地主机上运行,但在生产环境中没有,我失去了理智。 以下是我的laravel ech

  • hprose 为发布服务提供了多个方法,这些方法可以随意组合,通过这种组合,你所发布的服务将不会局限于某一个函数,某一个方法,某一个对象,而是可以将不同的函数和方法随意重新组合成一个服务。 AddFunction 方法 AddFunction(name string, function interface{}, option ...Options) Service 该方法的用于发布一个函数(命名函

  • 我正在使用windows,当socket.io发出事件时,命令提示符显示以下内容: 我想不出问题出在哪里,我做错了什么?

  • 更具体的问题是为什么通量。fromIterable()不适用于ReactorNetty HttpClient?这个简单的例子很好用。所有10项均由Flux publisher发出: 使用ReactiveSubscriber: 如下图所示,如果我在HttpClient中使用类似的订阅服务器。send(Flux.fromIterable())仅发出1项,而不是队列中的所有项。因此,由于这种流量,有些配