当前位置: 首页 > 知识库问答 >
问题:

Publisher向Spring Cloud Stream确认

顾曾笑
2023-03-14

我有兴趣在一些制作人中使用Publisher确认,我们在一个项目中使用了Spring Cloud Stream。我试过做一个小的PoC,但它不起作用。据我在文档中看到的情况,Asynchronous Publisher可以确认这一点,并且应该很容易进行以下更改:

应用程序中添加。yml confirmAckChannel并启用errorChannelEnabled属性。

spring.cloud.stream:
  binders:
    rabbitDefault:
      defaultCandidate: false
      type: rabbit
      environment.spring.rabbitmq.host: ${spring.rabbitmq.addresses}
 ....
  bindings:    
    testOutput:
      destination: test
      binder: rabbitDefault
      content-type: application/json    
  rabbit.bindings:   
    testOutput.producer:
      confirmAckChannel: "testAck"
      errorChannelEnabled: true

然后是一个由endpoint触发的简单服务,我在其中插入与事件的errorChannel相关的头。

@Service
@RequiredArgsConstructor
public class TestService {

    private final TestPublisher testPublisher;

    public void sendMessage() {

        testPublisher.send(addHeaders());
    }

    private Message<Event<TestEvent>> addHeaders() {
        return withPayload(new Event<>(TestEvent.builder().build()))
                .setHeader(MessageHeaders.ERROR_CHANNEL, "errorChannelTest")
                .build();
    }
}

然后是RabbitMQ的出版商

@Component
@RequiredArgsConstructor
public class TestPublisher {

    private final MessagingChannels messagingChannels;

    public boolean send(Message<Event<TestEvent>> message) {
        return messagingChannels.test().send(message);
    }
}

其中MessagingChannels实现为

public interface MessagingChannels {

    @Input("testAck")
    MessageChannel testAck();

    @Input("errorChannelTest")
    MessageChannel testError();


    @Output("testOutput")
    MessageChannel test();
}

之后,我实现了两个监听器,一个用于errorChannelTest输入,另一个用于testAck。

@Slf4j
@Component
@RequiredArgsConstructor
class TestErrorListener {

    @StreamListener("errorChannelTest")
    void onCommandReceived(Event<Message> message) {

        log.info("Message error received: " + message);
    }
}
@Slf4j
@Component
@RequiredArgsConstructor
class TestAckListener {

    @StreamListener("testAck")
    void onCommandReceived(Event<Message> message) {

        log.info("Message ACK received: " + message);
    }
}

但是,在这2个侦听器中,我没有收到RabbitMQ的任何ACK或NACK,事件已正确发送到RabbitMQ并由exchange管理,但我没有收到RabbitMQ的任何响应。

我错过什么了吗?我也检查了这两个属性,但效果不太好

spring:
  rabbitmq:
    publisher-confirm-type: CORRELATED
    publisher-returns: true

我正在使用Spring Cloud Stream 3.0.1。发布和spring cloud starter stream rabbit 3.0.1。释放

----已编辑------

这是根据Gary Russell的建议更新的工作示例

一个pplication.yml

spring.cloud.stream:
  binders:
   rabbitDefault:
      defaultCandidate: false
      type: rabbit
      environment.spring.rabbitmq.host: ${spring.rabbitmq.addresses}   
  bindings:   
    testOutput:
      destination: exchange.output.test
      binder: rabbitDefault
      content-type: application/json
    testOutput.producer:
      errorChannelEnabled: true  
  rabbit.bindings:   
    testOutput.producer:
      confirmAckChannel: "testAck"
spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true

测试服务

@Service
@RequiredArgsConstructor
public class TestService {

    private final TestPublisher testPublisher;

    public void sendMessage() {

        testPublisher.send(addHeaders());
    }

    private Message<Event<TestEvent>> addHeaders(Test test) {
        return withPayload(new Event<>(TestEvent.builder().test(test).build()))
                .build();
    }
}

TestService由下一个简单控制器中的endpoint触发,以检查此PoC。

@RestController
@RequiredArgsConstructor
public class TestController {

    private final TestService testService;

    @PostMapping("/services/v1/test")
    public ResponseEntity<Object> test(@RequestBody Test test) {

        testService.sendMessage(test);
        return ResponseEntity.ok().build();
    }
}

然后是具有两个ServiceActivator的RabbitMQ的发布者

@Component
@RequiredArgsConstructor
public class TestPublisher {

    private final MessagingChannels messagingChannels;

    public boolean send(Message<Event<TestEvent>> message) {

        log.info("Message for Testing Publisher confirms sent: " + message);
        return messagingChannels.test().send(message);
    }

    @ServiceActivator(inputChannel = TEST_ACK)
    public void acks(Message<?> ack) {
        log.info("Message ACK received for Test: " + ack);
    }

    @ServiceActivator(inputChannel = TEST_ERROR)
    public void errors(Message<?> error) {
        log.info("Message error for Test received: " + error);
    }
}

其中MessagingChannels实现为

public interface MessagingChannels {

    @Input("testAck")
    MessageChannel testAck();

    @Input("testOutput.errors")
    MessageChannel testError();


    @Output("testOutput")
    MessageChannel test();
}

这是应用程序的主要部分(我也用@EnableIntegration进行了检查)。

@EnableBinding(MessagingChannels.class)
@SpringBootApplication
@EnableScheduling
public class Main {

    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }
}

共有1个答案

陶泳
2023-03-14

测试包不应是绑定;它应该是一个ServiceActivator。

.setHeader(MessageHeaders.ERROR_CHANNEL, "errorChannelTest")

这在这种情况下是行不通的;错误被发送到名为testOutput的通道。错误 ;再一次这需要一个ServiceActivator,而不是绑定。

您在错误的位置启用了错误通道;这是一个共同的生产者财产,而不是兔子特有的。

java prettyprint-override">@SpringBootApplication
@EnableBinding(Source.class)
public class So62219823Application {

    public static void main(String[] args) {
        SpringApplication.run(So62219823Application.class, args);
    }

    @InboundChannelAdapter(channel = "output")
    public String source() {
        return "foo";
    }

    @ServiceActivator(inputChannel = "acks")
    public void acks(Message<?> ack) {
        System.out.println("Ack: " + ack);
    }

    @ServiceActivator(inputChannel = "output.errors")
    public void errors(Message<?> error) {
        System.out.println("Error: " + error);
    }

}
spring:
  cloud:
    stream:
      bindings:
        output:
          producer:
            error-channel-enabled: true
      rabbit:
        bindings:
          output:
            producer:
              confirm-ack-channel: acks
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
 类似资料:
  • 我正在开发SpringCloudStream的Brooklyn.Release版本。我的用例具有多个接收器的HttpSource。当我将初学者应用程序依赖项添加到应用程序中并使用它时,如下所示: 我的聚合应用程序是 一直得到如下响应:

  • What is Open-Publisher Open publisher is really just a couple of bash scripts that wrap around Jekyll, Pandoc, KindleGen, and LaTeX, along with some custom Pandoc templates created with a focus on fic

  • Confluence Publisher The Confluence Publisher allows documentation written in AsciiDoc and versioned directly with the documented code baseto be published to a Confluence space. It supports a "docs-as

  • 温馨提示:该项目为开源软件,已停止维护。 LitePublisher是一个免费开源的内容管理系统,Lite Publisher可以简单的个性化任何内容任何,比如blog,wiki,bug管理,文档系统.这是Lite Publisher的基本功能. LitePublisher内置cache功能,有SEO方面的设置,比如每篇文章的url,description设置等等.内置5个风格.安装的时候可以选择

  • Gradle Play Publisher Gradle Play Publisher (GPP) is Android's unofficial release automation Gradle Plugin. It can doanything from building, uploading, and then promoting your App Bundle or APK to pub

  • 使用StreamBridge,我将包含两种不同类型的对象的消息发送到单个Kafka主题。有没有办法定义一个能够使用两种类型消息的Spring Cloud Stream的功能消费者?