当前位置: 首页 > 知识库问答 >
问题:

一旦连接失败,Spring消费者不会重新连接到rabbit mq队列

裴经义
2023-03-14

我使用rabbitMQ作为代理,在spring中配置了生产者/消费者应用程序(配置如下所述)。当消费者机器出现故障时,我面临重新连接消费者的问题,消费者应用程序通常会在一段时间内恢复,但代理(队列)和消费者之间的连接没有重新建立,

我在rabbit-MQ管理控制台中进行了验证,发现队列中没有列出任何消费者,尽管消费者在一段时间后已经自动恢复。

任何关于如何解决这个问题的见解,请让我知道,如果需要任何进一步的细节。

连接工厂配置如下

@Bean 
public CachingConnectionFactory rabbitConnectionFactory() throws Exception 
{
    com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory(); 
    factory.setHost(host);
    factory.setUsername(username); 
    factory.setPassword(password);
    factory.setPort(5671);
    factory.useSslProtocol(); 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);
    return connectionFactory; 
}

样品容器工厂

   @Bean(name = "stockcontainer") 
    public SimpleRabbitListenerContainerFactory simpleStokcontainer()  throws Exception
    {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(rabbitConnectionFactory());
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(2); 
        factory.setMaxConcurrentConsumers(4);
        factory.setPrefetchCount(20);
        return factory; 
    }
    
    
    @Bean(name = "StockUploadSimplecontainer") 
    public SimpleRabbitListenerContainerFactory StockUploadListenerContainerFactory()  throws Exception
    {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(rabbitConnectionFactory());
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(2); 
        factory.setMaxConcurrentConsumers(4);
        factory.setPrefetchCount(15);
        return factory;
    }

消费者调查中的听众之一

@RabbitListener( queues = "${stock_daily.sync.queue}", containerFactory = "stockcontainer", autoStartup = "true")
    public void stockDailySync(SftpStockDailySyncAsyncRequest sftpStockDailySyncRequest) {
        
    }

例外

2021 7月20日18:05:08.081信息15087-[SimpleAsyncTaskExecutor-7]o.s.a.r.l.SimpleMessageListenerContainer:重新启动Consumer@1e89e61:tags=[{amq.ctag-joklemtramxv1u6p6rtig=omnirio_supplierbulk_queue}],channel=Cached Rabbit channel:AMQChannel(amqp://prod-core-mq@…*:5671/,11),康涅狄格州:Proxy@4de7441e共享兔子连接:SimpleConnection@302dbb33[代表=amqp://prod-core-mq@…*:5671/,localPort=36542],acknowledgeMode=AUTO local queue size=0 2021-07-20 18:05:08.081错误15087---[SimpleAsynctaskeExecutor-8]o.s.a.r.l.SimpleMessageListenerContainer:无法检查/重新声明自动删除队列。

org.springframework.amqp.rabbit.connection.AutoRecoverConnectionNotCurrentlyOpenException: Auto recovery connection 当前未在 org.springframework.amqp.rabbit.connection.SimpleConnection.isOpen(SimpleConnection.java:100) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.isOpen(CachingConnectionFactory.java:1240) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:472) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1500(CachingConnectionFactory.java:94) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1161) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1803) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1771) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] atorg.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1752) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:338) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1604) [spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:963) [spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]

2021-07-20 18:05:08.806 INFO 15087---[SimpleAsyncTaskExecort-6]o.s.a.r.l.SimpleMessageListenerContainer: RestartingConsumer@664b6f7c: tags=[{amq.ctag-HzahvR3wv6m0E4BKPaROw=omnirio_supplierbulk_queue}], Channel=Cache Rabbit Channel: AMQChannel(amqp://prod-core-mq@...*: 5671/,4), conn:Proxy@4de7441eShared Rabbit Connection:SimpleConnection@302dbb33[委托=amqp://prod-core-mq@...*: 5671/, localPort=36542],确认模式=AUTO本地队列大小=0 2021-07-20 18:05:08.807 ERROR 15087---[SimpleAsyncTaskExecort-9]o.s.a.r.l.SimpleMessageListenerContainer:无法检查/重新声明自动删除队列。

org.springframework.amqp.rabbit.connection.AutoRecoverConnectionNotCurrentlyOpenException: Auto recovery connection 当前未在 org.springframework.amqp.rabbit.connection.SimpleConnection.isOpen(SimpleConnection.java:100) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.isOpen(CachingConnectionFactory.java:1240) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:472) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1500(CachingConnectionFactory.java:94) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1161) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1803) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1771) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] atorg.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1752) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:338) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1604) [spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:963) [spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]

我通过添加语句“factory.setAutomaticRecoveryEnabled(false)”更新了rabbitConnectionFactory()方法,现在工厂方法如下图所示,这次我遇到了下面提到的另一个异常(异常-2)

@Bean 
    public CachingConnectionFactory rabbitConnectionFactory() throws Exception 
    {
        com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory(); 
        factory.setHost(host);
        factory.setUsername(username); 
        factory.setPassword(password);
        factory.setPort(5671);
        factory.useSslProtocol(); 
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);
        return connectionFactory; 
    }

例外-2

org . spring framework . amqp . amqpioexception:com . rabbit MQ . client . channelcontinuationtimeoutexception:对方法#method的延续调用

共有1个答案

国兴贤
2023-03-14

自动恢复连接不当前打开异常

不要对Spring AMQP使用< code > autorecovery enabled -它不兼容(早在自动恢复被添加到AMQP客户端之前,Spring就有了自己的恢复机制);它实际上是被禁用的——Spring会关闭任何自动恢复的连接。

该误差是瞬时的;当客户端恢复连接时,Spring将关闭它并创建一个新连接。

2.0.3.发布

不再支持2.0. x;您应该尽快升级到2.2.18或2.3.10。

 类似资料:
  • 我有一个PHP应用程序,使用RabbitMQ。为了实现冗余,我创建了一对RabbitMQ服务器,并将它们连接到一个集群中。我也有一个VyOS故障转移群集运行HAProxy负载平衡连接,并在故障转移的情况下提供重新路由。 昨天,我们的VyOS集群决定需要故障转移(可能是短暂的网络中断)。在一个VyOS上停止了HA代理,虚拟IP被移动,并在另一个节点上重新启动HA代理。 之后,我查看了Rabbit中的

  • Camel RabbitMQ的新手。用Apache Camel写了一个简单的RabbitMQ消费者。 当前在从队列中弹出一个值后进行一个简单的rest调用。它可以处理大约100条消息,并开始抛出此错误 我不确定我做错了什么,任何帮助都是感激的。

  • 我已经用RabbitMQ(3.7.6)设置了一个Laravel(5.6)应用程序,使用最新的Laradock和作曲家安装的https://github.com/vyuldashev/laravel-queue-rabbitmq。当我启动Laravel消息工作人员与我得到以下错误 但是,我可以使用telnet 127.0连接到Rabbit。0.1 5672(连接在不活动10秒后关闭)。听众似乎工作正

  • 我在项目中使用solace作为JMS提供者。我使用spring CachingConnectionFactory检索连接。在这个连接上,我创建了一个新会话。我在那个会话中创建了一个消费者的线程。 我正在做一些故障转移测试。当我将服务器从网络连接上拔下时,它会失败。当我再次连接服务器时,仍会收到相同的异常: 更重要的是,CachingConnectionFactory默认将reConnectOnEx

  • 文件描述符:256个可用套接字描述符:138个可用 显然我不能打开超过138个连接。 问题1:这个限制是基于什么?我能提高它吗?我想知道在生产机器上(需要哪种EC2实例),每个用户有一个连接是否是一个好主意。我读到过限制可能与ulimit有关,但当我在命令行上运行ulimit时,我看到的是'unlimited'。 还有什么其他的策略?