当前位置: 首页 > 知识库问答 >
问题:

对RabbitMQ队列的否定确认,以使用Spring AMQP对消息重新排队

傅正阳
2023-03-14

我有一个应用程序,它使用spring AMQP向其他应用程序消费和生成消息。我有一个场景,其中发生了一些异常,我需要重新排队回到RabbitMQ。对于一些例外情况,我需要忽略(基本上我需要忽略消息,无需重新查询)

目前在下面的代码中,我已经将配置设置为

工厂setDefaultRequeueRejected(假);

但我的要求是动态拒绝某些消息,并将某些消息重新排队回RabbitMQ。

请建议

@Bean(name="rabbitListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
        DefaultClassMapper classMapper = new DefaultClassMapper();
        Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
        idClassMapping.put(Constants.JOB_TYPE_ID_, JobListenerDTO.class);
        classMapper.setIdClassMapping(idClassMapping);
        messageConverter.setClassMapper(classMapper);
        factory.setMessageConverter(messageConverter);
        factory.setDefaultRequeueRejected(false);
        factory.setReceiveTimeout(10L);
        return factory;
    }

共有1个答案

郑光济
2023-03-14

你不能这样做(默认为false)。

要有选择地执行此操作,必须将defaultRequeueRejected设置为true,并对任何想要丢弃的对象抛出amqprejected和dontrequeuerejected

您可以将所需的逻辑封装在错误处理程序中。

默认错误处理程序正是针对特定的异常列表执行此操作的,如本文所述——您可以插入自定义的FatalExceptionStrategy

但对于有条件拒绝,defaultRequeueRejected必须是true

编辑

factory.setErrorHandler(new ConditionalRejectingErrorHandler(t -> {
        Throwable cause = t.getCause();
        return cause instanceof MessageConversionException
                || cause instanceof org.springframework.messaging.converter.MessageConversionException
                || cause instanceof MethodArgumentNotValidException
                || cause instanceof MethodArgumentTypeMismatchException
                || cause instanceof NoSuchMethodException
                || cause instanceof ClassCastException
                || cause instanceof MyBadXMLException;
    }));

这将MyBadXMLExctive添加到标准列表中。

如果您没有使用Java 8,请使用new FatalExceptionStrategy(){…}

 类似资料:
  • 在队列选项卡的rabbitMQ web界面上,我看到了“概述”面板,我在其中找到了以下内容: 排队消息: 准备好了 未确认 总数 我猜“总数”是多少。但什么是“准备就绪”和“未确认”?“准备好了”——传递给消费者的信息?“未确认”-? 消息费率: 发表 交付 重新交付 承认 这些信息是什么?尤其是“重新交付”和“确认”?这是什么意思?

  • 我有一个使用Spring和RabbitMQ的项目设置。目前,我的应用程序可能会收到一条amqp消息,在另一个异步进程完成之前无法处理该消息(遗留和完全分离,我无法控制)。因此,结果是我可能不得不等待处理消息一段时间。其结果是变压器出现异常。 当消息被NACK回rabbitMQ时,它会将其放回队列的头部,并立即重新拉入队列。如果我收到的无法处理的消息等于并发侦听器的数量,我的工作流就会被锁定。它转动

  • 我将不折不扣地学习以下教程:https://www.rabbitmq.com/tutorials/tutorial-two-java.html。 我以这样的方式启动RabbitMQ服务器: 我生成了两个消费者,当我Ctrl+C其中一个时,另一个正在运行的消费者不会接收到最初发往前一个消费者的消息。如何在Ctrl+C'ing从一个消费者中重新传递消息? 编辑:我现在正在通过'brew'安装Rabbi

  • 问题内容: 我的Java应用程序将消息发送到RabbitMQ交换,然后交换将消息重定向到绑定队列。我将RabbitMQ与Springframework AMQP java插件一起使用。 问题:消息进入队列,但消息始终处于“未确认”状态,永远不会变为“就绪”状态。 可能是什么原因? 问题答案: 一条未确认的消息表示您的使用者已经读取了该消息,但是该使用者从未将ACK发送回RabbitMQ代理以表示它

  • 问题内容: 我们正在使用amqplib来发布/使用消息。我希望能够读取队列中的消息数(理想情况下是已确认和未确认)。这将使我能够向管理员用户显示良好的状态图,并检测某个组件是否无法满足负载需求。 我在amqplib文档中找不到有关读取队列状态的任何信息。 有人可以指出我正确的方向吗? 问题答案: 使用皮卡: 使用PyRabbit: 使用HTTP 句法: 例: 注意:默认虚拟主机是需要转义为 使用C

  • 好吧,我不需要详细介绍我设置的整个系统, 我遇到的问题是,当使用者取消(amqpchannel->basic_cancel)对队列的监听时,会留下一个额外的消息未被这个工作者确认。它也不会触发正常的回调来处理此消息。 队列正在阻塞(使用wait) 预取为1,这是您所能拥有的最小值 使用者可以动态侦听() 使用者可以动态忘记() 我不会告诉给定使用者使用或取消给定队列的确切方式。但这一切都是完美的工