当前位置: 首页 > 知识库问答 >
问题:

RabbitMQ CachingConnectionFactory和发布者返回配置

冯霖
2023-03-14

Make Spring RabbitMQ在缺少exchange时继续失败

我为多个队列注册MessageListenerContainer。

我应该在哪里以及如何配置通道返回侦听器?-我认为我这样做是错误的。我将CachingConnectionFactory配置插入createQueueBMessageListener(...)-负责创建多个MessageListeners之一的方法。

CachingConnectionFactory应该如何以Spring和Rabbit的方式进行额外配置?到目前为止,我没有在Java中配置它(仅由K8S环境中的application.properties和admins配置)。我只注入了ConnectionFactory,并在SimpleMessageListenerContainer中将其设置为ConnectionFactory(如createQueueAMessageListener(…),我甚至不知道这是CachingConnectionFactory。

是否有类似于CachingConnectionFactoryConfigurer的东西?

2、为什么是ReturnListener。把手转动(…)未执行?ChannelListener。onCreate(…)已执行。

3、检查cachingConnectionFactory中缺少的exchange异常。设置CloseExceptionLogger和doing系统。出口(1)对我来说似乎不对,不是吗?但这就是我现在所能做到的。我希望在绑定创建期间没有交换时应用程序不会启动。当我抛出异常时,应用程序仍然会启动。ReturnListener。handleReturn似乎是一个更好的地方,但在如下配置时不会执行它。

4、我如何才能优雅地停止Spring应用程序上下文而不是系统。出口(1)?-引发异常不会停止应用程序上下文。在这种情况下,如何使RabbitMq无法启动?-当在Spring应用程序上下文启动时创建@Bean绑定失败时。

@Bean
MessageListenerContainer createQueueAMessageListener(SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory,
                                                   ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = rabbitListenerContainerFactory.createListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("queueA");
    MessageConverter jsonMessageConverter = null;
    container.setMessageListener(new MessageListenerAdapter(new Object(), jsonMessageConverter));

    return container;
}

@Bean
MessageListenerContainer createQueueBMessageListener(SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory,
                                                     ConnectionFactory connectionFactory,
                                                     CachingConnectionFactory cachingConnectionFactory) {

    // I think configuring CachingConnectionFactory here is a lame, isn't it? Of course connectionFactory is redundant now, I left it to show how was it done earlier.
    // Where and how should I add listeners to CachingConnectionFactory?

    cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
    cachingConnectionFactory.setPublisherReturns(true);
    cachingConnectionFactory.addChannelListener(new ChannelListener() {
        @Override
        public void onCreate(final Channel channel, final boolean transactional) {

            log.info("channelListener onCreate - this is executed");

            channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(final int replyCode, final String replyText, final String exchange, final String routingKey,
                                         final AMQP.BasicProperties properties,
                                         final byte[] body) throws IOException
                {
                    log.info("!!! Why is this not executed ?!!! handleReturn replyCode: " + replyCode + " replyText: " + replyText);
                }
            });
        }
    });
    cachingConnectionFactory.addConnectionListener(new ConnectionListener() {
        @Override
        public void onCreate(final Connection connection) {
            log.info("connectionListener onCreate - this is executed" + connection);
        }
    });

    cachingConnectionFactory.setCloseExceptionLogger(new ConditionalExceptionLogger() {
        @Override
        public void log(final Log logger, final String message, final Throwable t) {
            try {
                logger.error(message + ": " + t.getMessage());
                if (t.getMessage().contains("reply-code=404, reply-text=NOT_FOUND")) {
                    // throw new RuntimeException(); it doesn't stop Spring ApplicationContext from starting
                    log.error("Executing System.exit(1) command.");
                    // System.exit(1);
                }
            } catch (Exception e) {
                log.error("err in listener ", e);
            }
           
        }
    });


    SimpleMessageListenerContainer container = rabbitListenerContainerFactory.createListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("queueB");
    MessageConverter jsonMessageConverter = null;
    container.setMessageListener(new MessageListenerAdapter(new Object(), jsonMessageConverter));

    return container;
}

@Bean
MessageListenerContainer createQueueCMessageListener(SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory,
                                                     ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = rabbitListenerContainerFactory.createListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("queueC");
    MessageConverter jsonMessageConverter = null;
    container.setMessageListener(new MessageListenerAdapter(new Object(), jsonMessageConverter));

    return container;
}

共有1个答案

淳于禄
2023-03-14

//我认为在这里配置CachingConnectionFactory很蹩脚,不是吗?

它不是“跛脚的”;这是用引导未直接公开的其他属性配置bean的正常方法。

它应该被称为;你试过调试吗?

你为什么不按照我在这里的建议做呢?让Spring RabbitMQ在缺少交换时失败——这要简单得多。

关闭()它-但是,由于您使用的是Spring Boot,它会为您执行此操作-它注册了一个关闭上下文的JVM关闭挂钩。

编辑

绑定到不存在的交换将失败;您只需在应用程序完全初始化之前强制执行,例如在ApplicationRunner中。

@SpringBootApplication
public class So70212347Application {

    public static void main(String[] args) {
        SpringApplication.run(So70212347Application.class, args);
    }

    @Bean
    Binding binding() {
        return new Binding("foo", DestinationType.QUEUE, "doesn't exist", "foo", null);
    }

    @Bean
    Queue queue() {
        return new Queue("foo");
    }


    @Bean
    ApplicationRunner runner(ConnectionFactory cf) {
        return args -> {
            cf.createConnection().close();
        };
    }

}
Created new connection: rabbitConnectionFactory#6a0cbc6f:0/SimpleConnection@6cd164a6 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 62884]
Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'doesn't exist' in vhost '/', class-id=50, method-id=20)
Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'doesn't exist' in vhost '/', class-id=50, method-id=20)
Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'doesn't exist' in vhost '/', class-id=50, method-id=20)
Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'doesn't exist' in vhost '/', class-id=50, method-id=20)
Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'doesn't exist' in vhost '/', class-id=50, method-id=20)


Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
Application run failed
java.lang.IllegalStateException: Failed to execute ApplicationRunner
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:761)
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:748)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:309)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1290)
    at com.example.demo.So70212347Application.main(So70212347Application.java:16)
Caused by: org.springframework.amqp.AmqpIOException: java.io.IOException
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:70)
    at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:113)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2192)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2138)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2118)
    at org.springframework.amqp.rabbit.core.RabbitAdmin.initialize(RabbitAdmin.java:691)
    at org.springframework.amqp.rabbit.core.RabbitAdmin.lambda$null$10(RabbitAdmin.java:619)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:209)
    at org.springframework.amqp.rabbit.core.RabbitAdmin.lambda$afterPropertiesSet$11(RabbitAdmin.java:618)
    at org.springframework.amqp.rabbit.connection.CompositeConnectionListener.lambda$onCreate$0(CompositeConnectionListener.java:38)
    at java.base/java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:807)
    at org.springframework.amqp.rabbit.connection.CompositeConnectionListener.onCreate(CompositeConnectionListener.java:38)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:730)
    at com.example.demo.So70212347Application.lambda$0(So70212347Application.java:33)
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:758)
    ... 5 common frames omitted
Caused by: java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
    at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:1077)
    at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:46)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1157)
    at com.sun.proxy.$Proxy47.queueBind(Unknown Source)
    at org.springframework.amqp.rabbit.core.RabbitAdmin.declareBindings(RabbitAdmin.java:870)
    at org.springframework.amqp.rabbit.core.RabbitAdmin.lambda$initialize$12(RabbitAdmin.java:694)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.invokeAction(RabbitTemplate.java:2227)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2186)
    ... 18 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'doesn't exist' in vhost '/', class-id=50, method-id=20)
...
 类似资料:
  • There are two things you want to do preparing for a production build. 这里有两件事你需要为生产发布做准备。 Configure a script to run in your package.json file Create a production config 配置你的 package.json 里的脚本 创建一个生产的配置

  • 这是一个布尔方法,是我作业的一小部分。我想根据输入和记录之间是否匹配来返回true或false。我想我遇到了语法问题。 这是该方法的代码,但始终有红线指示我应该在代码中添加return,否则将该方法更改为void。但我已经在if语句中添加了return true。

  • 我想在每个通量事件后链接一个单声道。单声道发布者将需要通量发布的每个事件的信息。响应应该是带有通量事件和单声道响应数据的通量。 挖掘之后,我在平面地图中找到了一张地图。代码如下所示: 我主要关心的是: 代码气味是在平面图中使用地图的吗 pet可变内容是否会因多个流量事件以及单次事件而受到比赛条件的影响 有没有更好的方法来处理这种行为

  • 我真的不知道我的java代码中的错误在哪里。我必须使用REST API登录Kofax Total Agility。为此,我尝试使用postman测试我的json是否正确构建。以下是我的登录JSON: 我得到了肯定的回答: 到目前为止,一切顺利。为此,我创建了模型: 对于响应: 这些类应该允许我构建 json。现在,我创建了一个方法,用于生成请求对象并期望响应对象。 当我调用这部分代码时,我注意到我

  • 这三个函数的返回类型提示有什么不同吗? 他们都应该有< code>- 提问的动机是这个问题,这个很好的答案,以及我正在学习类型提示的事实。