当前位置: 首页 > 知识库问答 >
问题:

Spring AMQP-使用带有TTL的死信机制的消息重排队

贺俊材
2023-03-14

就像“休斯顿我们这里有一个问题”,在第一次尝试处理事件失败后,我需要安排/延迟消息5分钟。我已经在这个场景中实现了死信交换。

失败的消息会路由到DLX-->Retry队列,并在TTL 5分钟后返回工作队列进行另一次尝试。

下面是我正在使用的配置:

public class RabbitMQConfig {
    @Bean(name = "work")
    @Primary
    Queue workQueue() {
        return new Queue(WORK_QUEUE, true, false, false, null);
    }

    @Bean(name = "workExchange")
    @Primary
    TopicExchange workExchange() {
        return new TopicExchange(WORK_EXCHANGE, true, false);
    }

    @Bean
    Binding workBinding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(workQueue()).to(workExchange()).with("#");
    }

    @Bean(name = "retryExchange")
    FanoutExchange retryExchange() {
        return new FanoutExchange(RETRY_EXCHANGE, true, false);
    }

    @Bean(name = "retry")
    Queue retryQueue() {
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-dead-letter-exchange", WORK_EXCHANGE);
        args.put("x-message-ttl", RETRY_DELAY); //delay of 5 min
        return new Queue(RETRY_QUEUE, true, false, false, args);
    }

    @Bean
    Binding retryBinding(Queue queue,FanoutExchange exchange) {
        return BindingBuilder.bind(retryQueue()).to(retryExchange());
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

    @Bean
    Consumer receiver() {
        return new Consumer();
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Consumer receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
}

producer.java:

@GetMapping(path = "/hello")
public String sayHello() {
    // Producer operation

        String messages[];
        messages = new String[] {" hello "};

    for (int i = 0; i < 5; i++) {
        String message = util.getMessage(messages)+i;

        rabbitTemplate.convertAndSend("WorkExchange","", message);
       System.out.println(" Sent '" + message + "'");
    }
    return "hello";
}

consumer.java:

public class Consumer {

    @RabbitListener(queues = "WorkQueue")
    public void receiveMessage(String message, Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) Long tag) throws IOException, InterruptedException {

        try {

            System.out.println("message to be processed: " + message);
            doWorkTwo(message);
            channel.basicAck(tag, false);

        } catch (Exception e) {
            System.out.println("In the exception catch block");
            System.out.println("message in dead letter exchange: " + message);
            channel.basicPublish("RetryExchange", "", null, message.getBytes());

        }

    }

    private void doWorkTwo(String task) throws InterruptedException {

        int c = 0;
        int b = 5;
        int d = b / c;

    }

}
@EnableRabbit
public class RabbitMQConfig {

    final static String WORK_QUEUE = "WorkQueue";
    final static String RETRY_QUEUE = "RetryQueue";
    final static String WORK_EXCHANGE = "WorkExchange"; // Dead Letter Exchange
    final static String RETRY_EXCHANGE = "RetryExchange";
    final static int RETRY_DELAY = 60000; // in ms (1 min)

    @Bean(name = "work")
    @Primary
    Queue workQueue() {
         Map<String, Object> args = new HashMap<String, Object>();
         args.put("x-dead-letter-exchange", RETRY_EXCHANGE);
        return new Queue(WORK_QUEUE, true, false, false, args);
    }

    @Bean(name = "workExchange")
    @Primary
    DirectExchange workExchange() {
        return new DirectExchange(WORK_EXCHANGE, true, false);
    }

    @Bean
    Binding workBinding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(workQueue()).to(workExchange()).with("");
    }

    @Bean(name = "retryExchange")
    DirectExchange retryExchange() {
        return new DirectExchange(RETRY_EXCHANGE, true, false);
    }

    // Messages will drop off RetryQueue into WorkExchange for re-processing
    // All messages in queue will expire at same rate
    @Bean(name = "retry")
    Queue retryQueue() {
        Map<String, Object> args = new HashMap<String, Object>();
        //args.put("x-dead-letter-exchange", WORK_EXCHANGE);
        //args.put("x-message-ttl", RETRY_DELAY);
        return new Queue(RETRY_QUEUE, true, false, false, null);
    }

    @Bean
    Binding retryBinding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(retryQueue()).to(retryExchange()).with("");
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setDefaultRequeueRejected(false);
        /*factory.setAdviceChain(new Advice[] {
                org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
                        .stateless()
                        .maxAttempts(2).recoverer(new RejectAndDontRequeueRecoverer())
                        .backOffOptions(1000, 2, 5000)
                        .build()
        });*/
        return factory;
    }

    @Bean
    Consumer receiver() {
        return new Consumer();
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Consumer receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
}
public class Consumer {

    @RabbitListener(queues = "WorkQueue")
    public void receiveMessage(String message, Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) Long tag,
            @Header(required = false, name = "x-death") HashMap<String, String> xDeath)
            throws IOException, InterruptedException {

        doWorkTwo(message);
        channel.basicAck(tag, false);
    }

    private void doWorkTwo(String task) {
        int c = 0;
        int b = 5;
        if (c < b) {
            throw new AmqpRejectAndDontRequeueException(task);
        }
    }
}

共有1个答案

米俊喆
2023-03-14

如果拒绝消息以便代理将其路由到DLQ,则可以检查x-death标头。在这个场景中,我有一个TTL为5秒的DLQ,来自主队列的消息使用者拒绝它;代理将它路由到DLQ,然后它过期并被路由回主队列--x-death标头显示重新路由操作的次数:

 类似资料:
  • 我正在使用带有Avro和汇流模式注册表的Spring云流。我正在为所有服务使用一个单独的DLQ主题,因此具有不同模式的消息可能会落在这个主题中。我已禁用动态架构注册,以确保不传递错误消息()。 然而,问题是由于dlq上缺少模式,我可能会在进入这个主题时丢失一条消息。因此,我希望能够以JSON格式向dlq生成消息,并在管道的其余部分使用Avro。如果有人能帮助我如何做到这一点,或者能为我指出这件事的

  • 主要内容:1 并发消费重试,1.1 失败重试,1.2 超时重试,2 顺序消费重试,2.1 失败重试,2.2 超时重试,3 broker处理回退请求,3.1 asyncConsumerSendMsgBack处理回退请求,3.2 handleRetryAndDLQ处理重试和死信消息基于RocketMQ release-4.9.3,深入的介绍了DefaultMQPushConsumer消费者重试消息和死信消息源码。 消费重试:并发消费和顺序消费对于消费失败的消息均会有消息重试机制。 1 并发消费重试

  • 我正在使用Django中的Firebase云消息传递,使用django-push-notifications通过桌面通知向用户提供推送通知。 在浏览器完全关闭后(例如当计算机关闭时),我们的用户在下次启动时会收到先前发送的所有通知的积压。 虽然在某些情况下,用户希望接收全部积压的消息,但这并不是其中之一。 答案似乎是按照FCM文档的这一节设置TTL=0,但我的尝试并没有导致所需的行为。 请帮助我在

  • 我有一个使用RabbitMQ构建的发布-订阅场景。有一个交换,其中消息由发布者发送,订阅该交换的任何使用者在其各自的队列中接收这些消息。这是一个扇出场景,其中有一个生产者,但有多个消费者。 现在我愿意在系统中集成一个死信队列,以便稍后处理被拒绝的消息。我的问题是 1)我应该为每个使用者配置一个单独的死信队列,还是应该有一个单独的死信队列来处理来自所有使用者的所有被拒绝的消息? 2)如果两者都有可能

  • 我正在使用AWS SQS和死信队列。 这可能吗?我是不是缺少了一个配置选项? 问候你,伊多

  • 本文向大家介绍PHP的消息通信机制测试实例,包括了PHP的消息通信机制测试实例的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了PHP的消息通信机制。分享给大家供大家参考,具体如下: 更多关于PHP相关内容感兴趣的读者可查看本站专题:《php curl用法总结》、《php socket用法总结》、《PHP网络编程技巧总结》、《php面向对象程序设计入门教程》、《PHP数组(Array)操作技