当前位置: 首页 > 知识库问答 >
问题:

将消息回滚到死信队列-Apache Camel

葛泳
2023-03-14

我已经设置了Apache camel,在其中我使用来自一个队列的消息并对其进行某种操作,然后将其传输到其他队列。

现在,如果异常来了,我希望它应该回滚,然后在6次尝试后,它发送到死信队列,目前回滚发生5-6次,但我的消息没有转移到死信队列。

这里会发生什么-->Queue1->>(消耗)-->Operation(引发异常)-->Rollback-->Queue1->>(消耗)-->Operation(引发异常)-->Rollback-->...这种情况发生5-6次,然后我的消息丢失

我不知道消息的去向和丢失的原因,并且从活动的MQ GUI中我可以看到它已出列。

@Bean
public RedeliveryPolicy redeliveryPolicy() {
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setMaximumRedeliveries(2);
    redeliveryPolicy.setMaximumRedeliveryDelay(10000);
    redeliveryPolicy.setRedeliveryDelay(10000);
    return redeliveryPolicy;
}

---------------------Route extends SpringRouteBuilder-------------------

onException(MyException.class)
    .markRollbackOnly()
    .redeliveryPolicy(redeliveryPolicy)
    .useExponentialBackOff()
    .handled(true)

from("jms:queue:Queue1")
    .process(new Processor(){
       public void process(Exchange ex){
         throw new RuntimeException();
        }
    }).to("jms:queue:myQueue)

共有1个答案

百里嘉泽
2023-03-14

我假设有多重问题。

  1. markrollbackonly停止消息。在此声明之后,不再进行进一步的路由。

这就是您的RedeliveryPolicyOnException路由的其余部分被完全忽略的原因。您配置了2次重传尝试,但编写时会执行5次(ActiveMQ的默认重传)。

若要解决此问题,请将MarkRollBackOnly移动到OnException路由的末尾

由于在发生错误时丢失了它,因此事务配置有问题。配置Camel的ActiveMQ组件,使其在消费时使用本地JMS事务。

@Bean(name = "activemq")
@ConditionalOnClass(ActiveMQComponent.class)
public ActiveMQComponent activeMQComponent(ConnectionFactory connectionFactory) {
    ActiveMQComponent activeMQComponent = new ActiveMQComponent();
    activeMQComponent.setConnectionFactory(connectionFactory);
    activeMQComponent.setTransacted(true);
    activeMQComponent.setLazyCreateTransactionManager(false);
    return activeMQComponent;
}

这样做之后,您实际上可以删除onexception路由,因为重新传递是由JMS代理完成的,因此您必须在JMS连接上配置重新传递设置。如果配置的重新传递已耗尽,并且消息仍然产生回滚,则将其移动到DLQ。

使用其他onexception路由时要注意,因为这是纯骆驼路由。Camel错误处理程序不是在路由级别重新传递,而是在处理器级别重新传递。因此,如果同时配置broker和Camel redelivery,它可以将它们相乘。

 类似资料:
  • 我正在使用AWS SQS和死信队列。 这可能吗?我是不是缺少了一个配置选项? 问候你,伊多

  • 我主要在RPC模式下使用rabbitMq,但我还想将请求和响应消息复制到另一个队列。 最后,我想实现的是,外部消费者可以通过听一个队列来查看所有流量,我们称之为“日志队列”。 复制传入消息是可以的,我只需要使用扇出交换,或者使用与RPC调用使用的路由密钥相同的路由密钥将日志队列绑定到使用过的交换。 但我无法找到通过直接回复功能“扇出”发送的消息的方法。 到目前为止,我了解到响应消息以amqp的形式

  • 这种需求类似于通过公开的REST服务API(Spring Boot)处理来自死信队列的消息。以便一旦调用REST服务,就会从DL队列中消耗一条消息,并将再次发布到主队列中进行处理。@RabbitListener(queues=“queue_name”)立即使用消息,这在场景中是不需要的。该消息只需由REST服务API使用。有什么建议或解决办法吗?

  • 死信队列(Dead Letter Queue)本质上同普通的Queue没有区别,只是它的产生是为了隔离和分析其他Queue(源Queue)未成功处理的消息。 创建死信队列的方法参见createQueue() API,与创建普通队列无异, 死信队列不可调用deadMessage(), deadMessageBatch API,其他操作都与对普通Queue的操作无异。 为了将源Queue的未能成功处理

  • 对于异步的触发器,平台会对函数失败的任务进行最多3次重试。 在新建触发器的时候,为触发器配置一条死信队列,从用户的EMQ队列中选择一条,用于接收函数失败的任务。 在设置死信队列前,请对group: CIf76b0600-24e9-42c4-acf3-d491fbd9fd71​ 授予 FULL_CONTROL 权限,若不授予权限,平台将丢弃失败的任务信息。 消息的内容如下,以后可能增加字段,请用户在