当前位置: 首页 > 知识库问答 >
问题:

如何在Spring AMQP中使用Ack或Nack

李星辰
2023-03-14

我是SpringAMQP的新手。我有一个应用程序是生产者发送消息给另一个应用程序是消费者。

一旦消费者收到消息,我们将对数据进行验证。

如果数据是正确的,我们必须确认,消息应该从队列中删除。如果数据不正确,我们必须对数据进行NACK(否定确认),以便它在RabbitMQ中重新排队。

我偶然发现

**工厂。setDefaultRequeueRejected(假)** (它根本不会重新发出消息)

**工厂。setDefaultRequeueRejected(真)** (发生异常时将重新发出消息)

但在我的情况下,我会根据确认确认信息。然后它应该删除该消息。如果是NACK,则重新获取消息。

我在RabbitMQ网站上看过

AMQP规范定义了基本的。拒绝方法,该方法允许客户端拒绝单个已传递的消息,指示代理放弃或重新获取这些消息

如何实现上述场景?请给我举几个例子。

我尝试了一个小程序

       logger.info("Job Queue Handler::::::::::" + new Date());
        try {

        }catch(Exception e){

            logger.info("Activity Object Not Found Exception so message should be Re-queued the Message::::::::::::::");

        }

        factory.setErrorHandler(new ConditionalRejectingErrorHandler(cause ->{
            return cause instanceof XMLException;
        }));

消息没有为不同的异常工厂重新排队。setDefaultRequeueRejected(真)

09:46:38854错误[stderr](SimpleAsyncTaskExecutor-1)组织。activiti。发动机ActivitiObjectNotFoundException:没有使用键“WF89012”部署进程

09:46:39102从错误队列接收到的信息[com.example.bip.rabbitmq.handler.ErrorQueueHandler](SimpleAsynctaskeExecutor-1):{Error=无法提交JPA事务;嵌套异常为javax.persistence.RollbackException:标记为rollbackOnly的事务}


共有1个答案

郤立果
2023-03-14

请参阅文档。

默认情况下,如果侦听器正常退出,容器将装入消息(导致消息被删除),如果侦听器抛出异常,容器将拒绝(并请求)它。

如果侦听器(或错误处理程序)抛出一个AmqpRejectAndDontRepeueExctive,则会重写默认行为并丢弃消息(或路由到DLX/DLQ(如果这样配置)-容器调用basicReject(false)而不是basicReject(true)< /code>。

因此,如果验证失败,抛出一个AmqpRejectAndDontRequeueException。或者,使用自定义错误处理程序配置侦听器,将异常转换为AmqpRejectAndDontRequeueException

这在本答复中有所描述。

如果你真的想自己负责确认,请将确认模式设置为MANUAL,并使用ChannelAwareMessageListener,如果你使用的是@RabbitListener,则使用此技术。

但大多数人只是让容器来处理事情(一旦他们明白了发生了什么)。通常,使用手动确认是为了特殊的使用情况,例如延迟确认或提前确认。

编辑

我给你指出的答案中有一个错误(现已修复);你必须看看ListenerExefftionFailedExctive的原因。我刚刚测试了这个,它像预期的那样工作...

@SpringBootApplication
public class So39530787Application {

    private static final String QUEUE = "So39530787";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So39530787Application.class, args);
        RabbitTemplate template = context.getBean(RabbitTemplate.class);
        template.convertAndSend(QUEUE, "foo");
        template.convertAndSend(QUEUE, "bar");
        template.convertAndSend(QUEUE, "baz");
        So39530787Application bean = context.getBean(So39530787Application.class);
        bean.latch.await(10, TimeUnit.SECONDS);
        System.out.println("Expect 1 foo:"  + bean.fooCount);
        System.out.println("Expect 3 bar:"  + bean.barCount);
        System.out.println("Expect 1 baz:"  + bean.bazCount);
        context.close();
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setErrorHandler(new ConditionalRejectingErrorHandler(
                t -> t instanceof ListenerExecutionFailedException && t.getCause() instanceof FooException));
        return factory;
    }

    @Bean
    public Queue queue() {
        return new Queue(QUEUE, false, false, true);
    }
    private int fooCount;

    private int barCount;

    private int bazCount;

    private final CountDownLatch latch = new CountDownLatch(5);

    @RabbitListener(queues = QUEUE)
    public void handle(String in) throws Exception {
        System.out.println(in);
        latch.countDown();
        if ("foo".equals(in) && ++this.fooCount < 3) {
            throw new FooException();
        }
        else if ("bar".equals(in) && ++this.barCount < 3) {
            throw new BarException();
        }
        else if ("baz".equals(in)) {
            this.bazCount++;
        }
    }

    @SuppressWarnings("serial")
    public static class FooException extends Exception { }

    @SuppressWarnings("serial")
    public static class BarException extends Exception { }

}

结果:

Expect 1 foo:1
Expect 3 bar:3
Expect 1 baz:1

 类似资料:
  • 一、 我与CachingConnectionFactory有一个SpringAMQP项目。我需要从AMQP连接获取一些属性,例如:状态、连接时间、通道和一些运行时度量。CachingConnectionFactory是否有任何指标支持(例如:https://www.rabbitmq.com/blog/2016/11/30/metrics-support-in-rabbitmq-java-clien

  • 问题内容: 因此,我在这里已经读到,在Vue.js中,可以使用或在选择器中创建适用于子组件内部元素的样式规则。但是,无论是在SCSS还是普通的旧CSS中,尝试以我的样式使用它均无效。而是将它们原样发送到浏览器,因此无效。例如: home.vue: 生成的CSS: 我想要的是: 我与之相关的webpack配置如下所示: 所以我的问题是,如何让该操作员工作? 我已经找到了这个答案,但我确实在这样做,而

  • 在一个flink项目中,我使用一个case类click。 这个类填充了数据集,并且在日期为Java8的情况下可以很好地工作。在Java7环境中切换到org.joda(Version2.9)之后,对数据集中的click对象的调用不像以前那样执行。对click对象的date字段的某些函数的访问引发。这些函数的例子有等。我能够确保click类的日期字段不为空。我怀疑joda时间库与kryo序列化的交互不

  • 问题内容: 我想使用或在C中使用。我该怎么做?我不知道如何用C处理它们,如果有人知道,请告诉我如何。谢谢。 问题答案: 通常,最好避免打开文件以获取随机数据,因为该过程中存在多个故障点。 在最近的Linux发行版,该系统调用可用来获取加密安全随机数,它不能失败 ,如果 是 没有 指定为标志和读取量最多256个字节。 截至2017年10月,OpenBSD,Darwin和Linux(带有)现在都已实现

  • 问题内容: 我还不能弄清楚如何在Swift中获取a的子字符串: 我无法在Swift中创建范围。在Playground中自动完成并不是超级有帮助-这说明: 我在Swift标准参考库中找不到任何有帮助的东西。这是另一个疯狂的猜测: 还有这个: 我看过其他答案(在SwiftString中查找字符的索引),似乎暗示着由于是的桥类型,因此“旧”方法应该起作用,但尚不清楚如何- 例如,这也不起作用(似乎不是有

  • 显然IE(11)在方面存在问题,例如在事件方面。IE是否有其他方法获得? 下面是一个在IE中产生错误的示例:https://jsfiddle.net/rnyqy78m/