当前位置: 首页 > 知识库问答 >
问题:

带延迟的死信队列处理

颜英博
2023-03-14

今天,我使用Spring Cloud Streams和RabbitMQ,根据本文档编写了以下代码:

@Component
public class HandlerDlq {

    private static final Logger LOGGER = LoggerFactory.getLogger(HandlerDlq.class);
    private static final String X_RETRIES_HEADER = "x-retries";
    private static final String X_DELAY_HEADER = "x-delay";
    private static final int NUMBER_OF_RETRIES = 3;
    private static final int DELAY_MS = 300000;
    private RabbitTemplate rabbitTemplate;

    @Autowired
    public HandlerDlq(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @RabbitListener(queues = MessageInputProcessor.DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer  retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = 0;
        }
        if (retriesHeader > NUMBER_OF_RETRIES) {
            LOGGER.warn("Message {} added to failed messages queue", failedMessage);
            this.rabbitTemplate.send(MessageInputProcessor.FAILED, failedMessage);
            throw new ImmediateAcknowledgeAmqpException("Message failed after " + NUMBER_OF_RETRIES + " attempts");
        }
        retriesHeader++;
        headers.put(X_RETRIES_HEADER, retriesHeader);
        headers.put(X_DELAY_HEADER, DELAY_MS * retriesHeader);
        LOGGER.warn("Retrying message, {} attempts", retriesHeader);
        this.rabbitTemplate.send(MessageInputProcessor.DELAY_EXCHANGE, MessageInputProcessor.INPUT_DESTINATION, failedMessage);
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(MessageInputProcessor.DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(MessageInputProcessor.INPUT_DESTINATION)).to(delayExchange()).with(MessageInputProcessor.INPUT_DESTINATION);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(MessageInputProcessor.FAILED);
    }
}

我的MessageInputProcessor接口:

public interface MessageInputProcessor {

    String INPUT = "myInput";

    String INPUT_DESTINATION = "myInput.group";

    String DLQ = INPUT_DESTINATION + ".dlq"; //from application.properties file

    String FAILED = INPUT + "-failed";

    String DELAY_EXCHANGE = INPUT_DESTINATION + "-DlqReRouter";

    @Input
    SubscribableChannel storageManagerInput();

    @Input(MessageInputProcessor.FAILED)
    SubscribableChannel storageManagerFailed();
}

和我的属性文件:

#dlx/dlq setup - retry dead letter 5 minutes later (300000ms later)
spring.cloud.stream.rabbit.bindings.myInput.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.myInput.consumer.republish-to-dlq=true
spring.cloud.stream.rabbit.bindings.myInput.consumer.dlq-ttl=3000
spring.cloud.stream.rabbit.bindings.myInput.consumer.delayedExchange=true


#input
spring.cloud.stream.bindings.myInput.destination=myInput
spring.cloud.stream.bindings.myInput.group=group

共有1个答案

邹慈
2023-03-14

使用该配置,MyInput.group绑定到延迟(主题)交换MyInput并使用路由#

您可能应该删除spring.cloud.stream.rabbit.bindings.myinput.consumer.delayedexchange=true,因为您不需要延迟主交换。

它还将绑定到显式延迟交换,键为myinput.group

myinput.group.dlq绑定到dlx,键为myinput.group

您应该设置一个更长的TTL并检查DLQ中的消息,看看是否有突出的内容。

编辑

Retrying message, 4 attempts
added to failed messages queue
 类似资料:
  • 我有一个使用Spring Cloud Streams-RabbitMQ在微服务中交换消息的项目。对我的项目至关重要的一件事是,我不能丢失任何信息。 null 我是这些框架的新手,我希望你能帮助配置我的...

  • 我有一个lambda函数,我想为它创建一个SQS死信队列。我首先在Terraform中创建SQS: 这是来自Terraform的例子。但是,我被redrive_policy卡住了。 我是否正确理解,这为SQS队列设置了一个死信队列? 如果我设置了redrive_policy,这意味着我在一个DLQ上设置了一个DLQ。我觉得可以在DLQ上设置DLQ,在DLQ上设置DLQ,以此类推。 我找不到这方面的

  • 死信队列(Dead Letter Queue)本质上同普通的Queue没有区别,只是它的产生是为了隔离和分析其他Queue(源Queue)未成功处理的消息。 创建死信队列的方法参见createQueue() API,与创建普通队列无异, 死信队列不可调用deadMessage(), deadMessageBatch API,其他操作都与对普通Queue的操作无异。 为了将源Queue的未能成功处理

  • 对于异步的触发器,平台会对函数失败的任务进行最多3次重试。 在新建触发器的时候,为触发器配置一条死信队列,从用户的EMQ队列中选择一条,用于接收函数失败的任务。 在设置死信队列前,请对group: CIf76b0600-24e9-42c4-acf3-d491fbd9fd71​ 授予 FULL_CONTROL 权限,若不授予权限,平台将丢弃失败的任务信息。 消息的内容如下,以后可能增加字段,请用户在

  • 什么是最好的方法来实现死信队列(DLQ)的概念在Spring Boot 2.0应用程序中使用sping-kafka 2.1. x有所有的消息被处理失败的@KafkaListener方法的一些bean发送到一些预定义的Kafka DLQ主题并且不丢失一条信息? 因此,Kafka的记录是: 已成功处理, 处理失败,并发送到DLQ主题, 处理失败,未发送到DLQ主题(由于意外问题),因此将再次被侦听器使

  • 未创建我的exchange和dlq。我在下面的YML中有以下内容。我确实创建了一个匿名队列,但也没有发布消息。任何想法。