当前位置: 首页 > 知识库问答 >
问题:

骆驼分割后,异常不会升级

隆飞宇
2023-03-14

我们在Camel中定义了一个路由,并且必须找出是否在处理器中抛出了异常。当我们只有一个处理器时,Camel会在sendbody()方法中重新抛出异常。如果有前面的拆分/聚合,则不会抛出异常。因此下面示例的结果是

before throwing Exception

after sendBody

如果我省略所有从。拆分到。完成大小(1)输出是

before throwing Exception

Exception thrown

任何想法如何找出,如果一个异常发生后分裂?

private static final String DIRECT_START = "direct:start";

public static void main(String[] args) throws Exception {
    CamelContext context = new DefaultCamelContext();

    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {

            from(DIRECT_START)
            .split(body())
                .aggregate(constant(true), new AggregationStrategy() {
                    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                        return oldExchange == null ? newExchange : oldExchange;
                    }
                })
                .completionSize(1)
            .process(new Processor() {
                public void process(Exchange exchange) throws Exception {
                    System.out.println("before throwing Exception");
                    exchange.setException(new Exception());
                    throw new Exception("my Exception");
                }
            });
        }});        

    context.start();

    ProducerTemplate producer = context.createProducerTemplate();
    try {
        producer.sendBody(DIRECT_START, Integer.valueOf(42));
        System.out.println("after sendBody");
    } catch (Exception e) {
        System.out.println("Exception thrown");
    }

    context.stop();

}

为了在之后检查异常,我们找到了一个解决方案。我们向onException()注册了一个ErrorProcess,它将状态设置为上下文属性。

但这不会中断producer.sendBody(…)。我们有非常长时间运行的处理器,我们必须中断。

所以问题是,我们可以配置Camel在sendbody中抛出异常,还是可以在Exceptionhandler中执行此操作?

共有1个答案

东方夕
2023-03-14

我强烈建议在Camel in Action(第8.3.5节)中有一章介绍拆分器EIP和异常处理。本节说明:

将自定义AggregationStrategy与Splitter一起使用时,重要的是要知道您负责处理异常。如果您不将异常传播回来,Splitter将假定您已经处理了异常,并忽略它。

您使用了拆分()方法,但没有指定聚合器。在Camel留档中,他们指定

默认情况下,拆分器将返回原始输入消息

这意味着离开拆分()方法的交换没有异常,因此不会将异常传播回您的调用代码。从技术上讲,您从处理器抛出的异常在拆分器内部。即使您使用了聚合器,它也与拆分调用无关,并且您还没有用end()显式结束拆分。因此,当您的处理器抛出异常时,拆分器会忽略它,因为您没有提供聚合器来处理和传播异常。

我们可以通过将聚合策略作为参数传递给拆分调用来测试这一点,如下所示:

.split(body(), new AggregationStrategy() {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        System.out.println("Aggregating");
        return oldExchange == null ? newExchange : oldExchange;
    }
})
    .log("test") // inside the split/aggregator EIP
.end() // outside the split/aggregator EIP
.process(new Processor() {
    public void process(Exchange exchange) throws Exception {
        System.out.println("before throwing Exception");
        throw new Exception("my Exception");
    }
});

您将获得输出:

test
Aggregating
before throwing Exception
Exception thrown

如果您希望处理器位于拆分/聚合器EIP中,如下所示:

.split(body(), new AggregationStrategy() {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        System.out.println("Aggregating");
        return oldExchange == null ? newExchange : oldExchange;
    }
})
    .process(new Processor() { // inside the split/aggregator EIP
        public void process(Exchange exchange) throws Exception {
            System.out.println("before throwing Exception");
            throw new Exception("my Exception");
        }
    })
.end(); // outside the split/aggregator EIP

您将获得输出:

before throwing Exception
Aggregating
Exception thrown

注意,在拆分/聚合器EIP中,聚合器是如何在引发异常后运行的?这很重要,因为如果没有聚合器传递异常,拆分器将忽略它。为了使其工作,您需要在聚合器中正确传播异常。例如,在您的代码中,如果newExchange包含一个异常,那么它将被忽略,因为您没有传播它。您需要更改聚合器以添加:

if (newExchange.getException() != null) {
    oldExchange.setException(newExchange.getException());
}

注意:如果您在拆分的EIP中有一个调用,并且您将异常设置为正在处理,那么当您调用getException()时,它将不再返回。因此,如果您想处理异常,但仍然通过聚合器传播它们,可以使用exchange.getProperty(exchange.EXCEPTION\u Capture)

您也可以使用。stopOneException(),如下所示:

.split(body()).stopOnException()
    .process(new Processor() { // inside the split/aggregator EIP
        public void process(Exchange exchange) throws Exception {
            System.out.println("before throwing Exception");
            throw new Exception("my Exception");
        }
    });

这会导致拆分在出现异常时停止,并传播它。但是,当您将聚合器放置在stopOneException()之后时,它将不再工作。我不完全清楚为什么。我猜这是因为聚合器改变了交换对象。

另外请注意,您不需要将处理器中的异常设置为exchange。当处理器抛出异常时,Camel将为您执行此操作。因此,线<代码>交换。setException(new Exception()) 不是必需的。

tl;是的,您可以从分割内部将异常传播到调用方法。您只需要确保它是通过与拆分相关联的聚合器完成的,或者设置stopOnException()。这取决于您试图通过拆分/聚合/处理实现什么,最好的方法是什么。

 类似资料:
  • 我正在将CSV文件输入集成到一个更大的系统中,我想对其进行设置,以便将CSV中未正确解析的各行(由)发送到医院队列,并将所有其他工作行转换为一个对象并聚合到一个列表中。但是,我很难将

  • 我正在尝试从目录中选取一个文件,拆分一个文件,并将每一个拆分行添加到ActiveMQ中。我在这个过程中遇到了异常处理的问题。假设目录中的文件是一个二进制文件(可执行文件),那么splitter会显示org.apache.camel.runtimeCamelException和java.nio.charset.MalFormedInputException异常。如果出现这种情况,那么我需要捕获这些异

  • 我试图使用Kerberos从ActiveMQ-Camel桥向Kafka (Cloudera)发送消息。 ActiveMQ v5.15.4 骆驼:2.21.1 Kafka客户端:1.1.0 服务器版本:Apache/2.4.6(CentOS) 骆驼.xml剪刀是: 这是日志中的kafka客户端配置: 日志级别:调试 Jaas文件: 出口: 当我发送消息时,我在调试级别收到以下日志,但消息未被发送:

  • 在camel spring ws中,有没有办法将自定义主体(例如自定义bean)和http状态返回为500? 我试过了 然后在processor公共类ExceptionProcessor实现processor{ } 但是即使身体是我想要的,超文本传输协议状态总是200。 你能帮我吗? 更多信息:我正在使用camel 2.20.2版

  • 无法弄清楚这是怎么回事-试图设置一个路由,只看到cxf连接到soap web服务(我不关心实际数据,也不期望数据实际“工作”,但它不断引发一个我不理解的异常: 我想知道我是否正确配置了它。我想这可能是一个丢失的jar,但当我尝试引入其他jar时,导致了依赖冲突 我正在使用maven依赖项“camel-cxf”来加载我所有的jar配置 “原因:org.apache.cxf.bus.extension

  • 希望你有时间回答我的问题。在过去的几天里,我一直在阅读关于Camel的文章,并设法将一切都设置好并运行起来。现在,我碰到了一个棘手的部分:)。基本上,我在运行时使用Java定义一个路由,并将路由配置放在DB中。路线是可行的,信息从一边流向另一边。但是,当异常发生时,我想知道异常发生在哪里(在哪个路由endpoint之间),在DB中存储当前的交换主体(对进一步处理有用的进行中的消息),由用户更新消息