当前位置: 首页 > 知识库问答 >
问题:

无法创建 RabbitMQ 队列

薛烨
2023-03-14

我收到了一个关于这个问题的建议,一个评论,说我可能没有我需要的交换,因为我的队列已经存在。于是,我就全部手动删除了。

但是,在重新部署应用程序时,我发现所有队列都有以下异常:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[myInput.group]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:721)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:598)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1472)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123)
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:992)
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclarePassive(AutorecoveringChannel.java:364)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:980)
    at com.sun.proxy.$Proxy166.queueDeclarePassive(Unknown Source)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:700)
    ... 3 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'myInput.group' in vhost 'production', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117)
    ... 12 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'myInput.group' in vhost 'production', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:505)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:336)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:143)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:90)
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:634)
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572)
    ... 1 more

我不确定这意味着什么,因为我显然想要创建队列<code>myInput。组,它似乎在抱怨它不存在。。。

我也检查了这个问题,我的问题可能是权限。但我应该有它们——否则我相信我无法删除队列......

你能给我一个解决方案吗?

我上一个问题中的代码,以帮助您进行调查:

@Component
public class HandlerDlq {

    private static final Logger LOGGER = LoggerFactory.getLogger(HandlerDlq.class);
    private static final String X_RETRIES_HEADER = "x-retries";
    private static final String X_DELAY_HEADER = "x-delay";
    private static final int NUMBER_OF_RETRIES = 3;
    private static final int DELAY_MS = 300000;
    private RabbitTemplate rabbitTemplate;

    @Autowired
    public HandlerDlq(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @RabbitListener(queues = MessageInputProcessor.DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer  retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = 0;
        }
        if (retriesHeader > NUMBER_OF_RETRIES) {
            LOGGER.warn("Message {} added to failed messages queue", failedMessage);
            this.rabbitTemplate.send(MessageInputProcessor.FAILED, failedMessage);
            throw new ImmediateAcknowledgeAmqpException("Message failed after " + NUMBER_OF_RETRIES + " attempts");
        }
        retriesHeader++;
        headers.put(X_RETRIES_HEADER, retriesHeader);
        headers.put(X_DELAY_HEADER, DELAY_MS * retriesHeader);
        LOGGER.warn("Retrying message, {} attempts", retriesHeader);
        this.rabbitTemplate.send(MessageInputProcessor.DELAY_EXCHANGE, MessageInputProcessor.INPUT_DESTINATION, failedMessage);
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(MessageInputProcessor.DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(MessageInputProcessor.INPUT_DESTINATION)).to(delayExchange()).with(MessageInputProcessor.INPUT_DESTINATION);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(MessageInputProcessor.FAILED);
    }
}

我的< code > MessageInputProcessor 接口:

public interface MessageInputProcessor {

    String INPUT = "myInput";

    String INPUT_DESTINATION = "myInput.group";

    String DLQ = INPUT_DESTINATION + ".dlq"; //from application.properties file

    String FAILED = INPUT + "-failed";

    String DELAY_EXCHANGE = INPUT_DESTINATION + "-DlqReRouter";

    @Input
    SubscribableChannel storageManagerInput();

    @Input(MessageInputProcessor.FAILED)
    SubscribableChannel storageManagerFailed();
}

和我的属性文件:

#dlx/dlq setup - retry dead letter 5 minutes later (300000ms later)
spring.cloud.stream.rabbit.bindings.myInput.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.myInput.consumer.republish-to-dlq=true
spring.cloud.stream.rabbit.bindings.myInput.consumer.dlq-ttl=3000

#input
spring.cloud.stream.bindings.myInput.destination=myInput
spring.cloud.stream.bindings.myInput.group=group

编辑:

我在项目的另一个组件中有<code>EnableBinding

2019-12-17 08:45:11.584 INFO 96124 --- [ main]c.s.b.r.p.RabbitExchangeQueueProvisioner:声明入站队列:myInput.group,绑定到:myInput

之后,有一条没有stacktraces的错误消息。以下内容:

{ "written_at":"2019-12-17T14:06:59.263Z","written_ts":460692906701698,"type":"log","logger":"org.springframework.amqp.rabbit.connection.CachingConnectionFactory","thread":"AMQP Connection <ip>:5672","level":"ERROR","categories":[],"msg":"Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)","tenant_id":"-","component_type":"application","component_id":"******","space_name":"dev","component_name":"*******","component_instance":"0","organization_id":"****","correlation_id":"-","organization_name":"****","space_id":"*****","container_id":"***","custom_fields":{} }

共有1个答案

唐啸
2023-03-14

你有配置类的@Enable绑定(MessageInputProcessor.class)吗?

如果是这样,请检查启动日志;你应该看到

2019-12-17 08:45:11.584 INFO 96124 --- [ main]c.s.b.r.p.RabbitExchangeQueueProvisioner:声明入站队列:myInput.group,绑定到:myInput

在那里寻找任何错误。

如果您没有@EnableBinding,则不会执行设置,您将不得不手动设置@Beans,就像其他队列一样。

如果此“DLQ 重新处理”是与主流应用不同的应用,则不应在此处@EnableBinding,而应在主应用中正确配置绑定。

 类似资料:
  • 我有两个独立的应用程序,一个是生产者,一个是具有交换类型默认(DIRECT)的消费者。 下面是带有死信队列设置的Rabbit MQ Producer的配置。 现在,这里是单独应用程序中Rabbit MQ listener的代码。 现在,当我运行消费者服务时,它会创建一个同名的重复队列。如下图所示 Rabbit MQ GUI 我也在消费者中尝试了以下设置,但结果相同 注意:当我在生产者中添加死信队列

  • 我在 RabbitMQ 中创建新队列时遇到了一些问题。我只创建一个使用者客户端,该客户端将使用来自另一个微服务的消息。 这是我到目前为止所做的。 应用程序属性: 配置类: 和侦听器类: 当我运行这个程序时,我有一条ACCESS_REFUSED消息,但我不知道为什么。我错过了什么吗?? 谢谢

  • 我花了整整一天的时间来尝试让spring-AMQP示例项目在docker版本的RabbitMQ上运行。我只是在运行标准的rabbitmq Docker。虽然我没有连接问题,但我总是得到与创建队列相关的异常,并且我已经尝试了所有可能的变体,在这一点上。 我尝试在我的配置中声明队列,就像示例项目一样。我尝试显式配置RabbitAdmin。我已经尝试显式配置整个自动配置混乱。我在rabbitmq中创建了

  • 我试图使用RabbitMQ cluster Kubernetes操作符创建一个RabbitMQ实例,但是PersistentVolumeClaims存在一个问题。我正在使用Docker Desktop for Windows运行Kubernetes 1.18.8。 为什么不自动创建卷,我应该做什么?

  • 问题内容: 根据我对Rabbit-mq的(有限的)经验,如果您为尚不存在的队列创建新的侦听器,则会自动创建该队列。我试图将Spring AMQP项目与rabbit- mq一起使用来设置侦听器,但是却出现错误。这是我的xml配置: 我在RabbitMq日志中得到了这个: 还有来自AMQP的类似错误: 从堆栈跟踪中可以看出,队列是在“被动”模式下创建的-任何人都可以指出我如何不使用被动模式来创建队列,

  • null 结果:队列已从RabbitMQ GUI中删除,但当我试图通过已删除的RabbitMQ队列发送消息时,连接仍然有效。(con.isConnected()==true)我需要找到一种方法来侦听队列,如果它被删除,我不应该向已删除的队列发送任何消息。 注意:删除队列后,我不会重新启动RabbitMQ。 渠道创建: 正在发送消息: