当前位置: 首页 > 知识库问答 >
问题:

Apache Camel-具有死信通道功能的事务错误处理程序

益泰平
2023-03-14

我在开发一个消息路由器的过程中,我来到了处理意外的点,并把一些错误处理到位。

在我的代码中,我能够区分何时由于错误消息而发生异常,这是我唯一想将消息发送到死信终端的情况。在所有其他情况下,我认为异常是由架构体系问题引起的(例如数据库/JMS终端变得不可用),在这种情况下,我希望消息回滚到路由开始。

查看驼峰文档,唯一一个支持死信的错误处理程序是死信通道,但问题是这个错误处理程序没有被处理。

那么,有没有一种方法可以实现我想要的东西,有多简单或难呢?我可以看到,您可以将camel context配置为使用自定义错误处理程序生成器,并考虑在我自己的生成器中尝试在TransactionErrorHandlerDeadLetterChannel之间进行组合,但我不确定这是否是一种方法。开箱即用的构建器似乎有着相当复杂的逻辑。

另一个选项是扩展TransactionErrorHandlerBuilder,并从DeadLetterChannelBuilder中获取代码,该代码验证终结点并创建故障处理器,但仍然不确定。如果有那么简单的话,骆驼队的人会把它包括在框架中。我的用例必须是处理企业关键应用程序的任何人的用例。

提前感谢您的建议。任何暗示都将不胜感激。

更新我试图扩展TransactionErrorHandlerBuilder,如上所述,但由于它所创建的故障处理器从未使用过,因此无法工作。

因为我真的需要这个功能,我尝试了一种变通方法,让我的代码在消息头中添加一个**“dead.letter=true”,并将原始消息放回exchange中,如下所示:

@Override
public void process(Exchange exchange) {
    Message incomingMessage = exchange.getIn();
    try {
        // Do some work here
    } catch (MyCustomException e) {    
        incomingMessage.setHeader("dead.letter", "true");
        exchange.setIn(incomingMessage);
    } catch (Exception e) {
        exchange.setException(e);
    }
}

然后在我的路线定义中,我添加了:

from(ep).routeId(createRouteId(system))
        .autoStartup(false).transacted()
        .threads(route.getThreads())
        .filter(body().isNotNull())
        .process((Processor) routeBean)
        .choice()
        .when(header("dead.letter").isNotNull())
        .to(mq1:ERROR.QUEUE);

我以为这会解决我的问题。然而我的错误。队列没有得到任何东西(日志中也没有任何内容),事务已提交,我的消息已丢失。

请帮帮我,我的点子快用完了。

共有1个答案

堵乐
2023-03-14

我遇到了同样的问题。我不知道为什么TransactionErrorHandlerBuilder不能配置为向死信通道发送消息。我被它的方法setDeadLetterUri()搞糊涂了,它并没有像我预期的那样工作。经过长时间的研究,我找到了相当简单的解决办法。

它在事务中处理消息,如果抛出一些异常,事务将回滚,原始消息将发送到死信通道。请注意,您应该明确允许使用原始消息。

@Override
public void configure() {
    
    getContext().setAllowUseOriginalMessage(true); 

    from("jms:queue:{{foo.bar}}")
            .doTry()
                .to("direct:processMsgInTransaction")
            .doCatch(Exception.class)
                .process(exchange -> exchange.setMessage(exchange.getUnitOfWork().getOriginalInMessage()))
                .to("jms:queue:{{foo.bar.dead.letter}}")
            .endDoTry();

    from("direct:processMsgInTransaction")
            .errorHandler(new TransactionErrorHandlerBuilder() 
                .onExceptionOccurred(exchange -> log.error("Something went wrong")))
            //.transacted() //uncomment for using default TransactionErrorHandler
            .process(someProcessor)
            .process(someOtherProcessor);
            
}
 类似资料:
  • 我目前在Spring集成中处理JMS事务时遇到困难。我正在创建的集成流程如下所示: JMS队列A- 我希望在JMS队列B和JMS队列C上保证消息的传递。然而,为了使传递稍微困难一些,我希望将导致错误的消息存储在单独的JMQ队列上,并在队列a上确认消息。 但是,如果我对此进行测试并在队列C上设置消息之前抛出错误(让我们假设队列B首先完成,队列C其次完成),事务将确认队列A并在队列B和错误队列上提交消

  • 场景可能是:我的期望可能是批量10个数据点,我想对{failed 5,pass 5}或其他什么给出响应。 我的逻辑是将批处理拆分为数据元素并进行验证 成功的验证将发送给aggreagtor, 失败的验证将抛出错误并通过错误通道拾取。 收件人列表路由器将错误通道作为输入通道,并连接2个过滤器,目的是过滤某些类型的错误直接发送响应(与用户输入无关的信息-服务器错误等),某些类型的客户端错误将转到聚合器

  • 在happy path场景中,我有一个spring批处理工作,但现在我将重点放在错误处理上。 但是,在另一个测试中,我想证明一个不可预见的数据库错误会导致作业失败。为此,我创建了一个触发器,该触发器会导致对要插入的表的插入失败。 这似乎起作用了,在writer执行之后,在事务提交期间抛出异常,并且我得到以下日志消息: 这似乎也是预期的行为。问题是,这并不能阻止工作。该步骤退出到SimplyRetr

  • 本文向大家介绍浅析Python中MySQLdb的事务处理功能,包括了浅析Python中MySQLdb的事务处理功能的使用技巧和注意事项,需要的朋友参考一下 前言 任何应用都离不开数据,所以在学习python的时候,当然也要学习一个如何用python操作数据库了。MySQLdb就是python对mysql数据库操作的模块。今天写了个工具,目的是把csv中的数据插入到数据库中去。其中有一部分,是需要分

  • 问题内容: 我了解绑定的一般理论(重要的函数调用站点,隐式,显式绑定等)以及解决React中此绑定问题的方法,因此它始终指向我想要的对象(在构造函数,箭头函数等),但我正在努力获取内部机制。 看一下这两段代码: 与 我所知道的是: 在这两个版本中,我们都成功地使用goToStore方法成功,因为方法内部自动(由React绑定)到组件实例 因此,第一个成功了, 第二个失败,因为es6中的类方法未绑定

  • 我有骆驼路线。 如果异常发生在任何组播路由中,如materaialsTest,则异常会被捕获在组播代码中,但异常不会被发送到死信通道。根据文档,设置shareUnitOfWork应该可以实现这一点,但它没有,设置StoponException也没有。我是不是漏掉了什么? 当materialsEnrichment bean抛出异常时,第二条路由将消息发送到死信通道。这与组播路由通过“direct:m