当前位置: 首页 > 知识库问答 >
问题:

使用spring-amqp,从PublisherReturn回调内部向rabbitmq发送消息的最佳方法是什么?

傅琦
2023-03-14

我有一个带有PublisherReturn回调的RabbitTemplate。

  • 如果我向没有队列绑定的routingKey发送消息,则正确调用返回回调。当发生这种情况时,我希望将消息发送到另一个路由密钥。但是,如果我在ReturnCallback中使用RabbitTemplate,它就会挂起。我没有看到任何消息可以/不能发送,RabbitTemplate没有将控制返回给我的ReturnCallback,也没有看到任何PublisherConfirm。
  • 如果我创建了一个新的RabbitTemplate(具有相同的CachingConnectionFactory),那么它的行为仍然是相同的。我的电话挂断了。
  • 如果我将一条消息发送给一个绑定了队列的路由键,那么该消息将正确地到达队列。在此方案中不调用ReturnCallback。

经过一些调查,我得出的结论是rabbitTemplate和/或连接将被阻塞,直到原始消息被完全处理。

以下是我所拥有的简化细节:

@Configuration
public class MyConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setPublisherReturns(true);
        // ... other settings left out for brevity
        return connectionFactory;
    }

    @Bean
    @Qualifier("rabbitTemplate")
    public RabbitTemplate rabbitTemplate(ReturnCallbackForAlternative returnCallbackForAlternative) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(returnCallbackForAlternative);
        // ... other settings left out for brevity
        return rabbitTemplate;
    }

    @Bean
    @Qualifier("connectionFactoryForUndeliverable")
    public ConnectionFactory connectionFactoryForUndeliverable() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        // ... other settings left out for brevity
        return connectionFactory;
    }

    @Bean
    @Qualifier("rabbitTemplateForUndeliverable")
    public RabbitTemplate rabbitTemplateForUndeliverable() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactoryForUndeliverable());
        // ... other settings left out for brevity
        return rabbitTemplate;
    }

}

然后发送我正在使用的信息

    @Autowired
    @Qualifier("rabbitTemplate")
    private RabbitTemplate rabbitTemplate;

    public void send(Message message) {
        rabbitTemplate.convertAndSend(
            "exchange-name",
            "primary-key",
            message);
    }

ReturnCallback中的代码是

@Component
public class ReturnCallbackForAlternative implements RabbitTemplate.ReturnCallback {

    @Autowired
    @Qualifier("rabbitTemplateForUndeliverable")
    private RabbitTemplate rabbitTemplate;

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        rabbitTemplate.convertAndSend(
            "exchange-name",
            "alternative-key",
            message);
    }

}
    null
in returnCallback before message send
in returnCallback after message send

如果注释掉ConnectionFactory.SetPublisherConnections(true);,它运行正常。

@SpringBootApplication
public class HangingApplication {

    public static void main(String[] args) {
      SpringApplication.run(HangingApplication.class, args);
    }

    @Bean
    public ConnectionFactory connectionFactory() {
      CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
      connectionFactory.setPublisherReturns(true);
      connectionFactory.setPublisherConfirms(true);
      return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
      RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
      rabbitTemplate.setExchange("foo");
      rabbitTemplate.setMandatory(true);

      rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        System.out.println("Confirm callback for main template. Ack=" + ack);
      });

      rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
        System.out.println("in returnCallback before message send");
        rabbitTemplate.send("foo", message);
        System.out.println("in returnCallback after message send");
      });

      return rabbitTemplate;
    }

    @Bean
    public ApplicationRunner runner(@Qualifier("rabbitTemplate") RabbitTemplate template) {

      return args -> {
        template.convertAndSend("BADKEY", "foo payload");
      };
    }

    @RabbitListener(queues = "foo")
    public void listen(String in) {
      System.out.println("Message received on undeliverable queue : " + in);
    }

}

这是我用过的建筑物:

plugins {
    id 'org.springframework.boot' version '2.1.5.RELEASE'
    id 'java'
}

apply plugin: 'io.spring.dependency-management'

group 'pcoates'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.11

repositories {
    mavenCentral()
}

dependencies {
    compile 'org.springframework.boot:spring-boot-starter-amqp'
}

共有1个答案

全彬
2023-03-14

它会在amqp-client代码中造成某种死锁。最简单的解决方案是在单独的线程上执行发送--在回调中使用taskexecutor...

exec.execute(() -> template.send(...));

您可以使用相同的模板/连接工厂,但发送必须在不同的线程上运行。

我以为我们最近改变了框架,总是在一个不同的线程上调用返回回调(在最后一个人报告了这件事之后),但它看起来像是从缝隙中掉下来的。

@SpringBootApplication
public class So57234770Application {

    public static void main(String[] args) {
        SpringApplication.run(So57234770Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            template.send("foo", message);
        });
        return args -> {
            template.convertAndSend("BADKEY", "foo");
        };
    }

    @RabbitListener(queues = "foo")
    public void listen(String in) {
        System.out.println(in);
    }

}
 类似资料:
  • 我正在使用Spring AMQP侦听RabbitMQ队列。在侦听队列时,根据业务逻辑,我的服务可以引发RuntimeException,在这种情况下,消息将重试多次。在最大次数重试后,消息将保留在DLQ中。我想知道,在DLQ中处理这些消息的最佳方法是什么?我从博客上读到我可以使用停车场队列。但在这种情况下,如何监控队列并通知人们死信消息?P、 对不起,我的英语不好。希望我能够解释我的问题:)

  • null 谁能给我一个向RabbitMQ发送消息的标准程序的例子。我正在使用Spring Boot,也可以使用它的特性。

  • 我想使用rabbitMq队列中Storm喷口中的消息。 现在,我们使用Spring AMQP异步发送和接收来自RabbitMq的消息。 Spring AMQP提供了从队列读取消息的机制(创建监听器或使用注释@RabbitListner)。 问题是我可以让一个侦听器从队列中读取消息。但是,我如何将此消息发送到Storm群上运行的Storm喷口? 拓扑将启动一个集群,但在我的spout的nextTup

  • 我需要在一定的持续时间后将消息发送给MessageListener,所以有没有任何方法可以使用SpringAMQP实现。 如。Producer生成消息并将消息发送到RabbitMQ Q,该消息立即被侦听器接收到,我想延迟消费者端接收到的消息,比如说在一些配置参数(比如1000ms)之后

  • 如标题所述,我想使用RabbitMQ向Websocket发送消息。使用AngularJS前端,我想从Websocket读取RabbitMQ消息,并将它们打印到控制台。原则上,我的代码似乎是可行的,尽管我不知道如何获得消息的实际(字符串)内容? 后端:为了创建Websocket并执行路由,我使用Spring Boot和Apache Camel:http://camel.Apache.org/Spri

  • 本文向大家介绍RabbitMQ 的消息是怎么发送的?相关面试题,主要包含被问及RabbitMQ 的消息是怎么发送的?时的应答技巧和注意事项,需要的朋友参考一下 首先客户端必须连接到 RabbitMQ 服务器才能发布和消费消息,客户端和 rabbit server 之间会创建一个 tcp 连接,一旦 tcp 打开并通过了认证(认证就是你发送给 rabbit 服务器的用户名和密码),你的客户端和 Ra