当前位置: 首页 > 面试题库 >

Apache Camel-拆分和聚合-旧Exchange始终为null

徐兴昌
2023-03-14
问题内容

我看到这个问题已经被问过很多次了,但是没有一个帖子有帮助或最终的解决方案。我正在拆分一条消息,然后使用Aggregator2对其进行汇总。该代码引发异常,因为oldExchange始终为null。因此,为了测试,我设计了一个小代码。

我读了一个订单,xml文件,看起来像这样

<Orders xmlns="http://some/schema/Order">
    <Order>
            <orderNum>1</orderNum>
    </Order>
    <Order>
            <orderNum>2</orderNum>
    </Order>
    <Order>
            <orderNum>3</orderNum>
    </Order>
    <Order>
            <orderNum>5</orderNum>
    </Order>
    <Order>
            <orderNum>6</orderNum>
    </Order>

我的骆驼上下文看起来像这样

<camel:route>
<camel:from uri="file:src/data/catask/test?noop=true"/>
<camel:log message="${body}"></camel:log>
<camel:split>
<camel:xpath>//te:Orders/*</camel:xpath>
<camel:to uri="direct:logQueries"/>
<camel:to uri="direct:aggegateQueries"/>  
</camel:split>

</camel:route>

<camel:route>
<camel:from uri="direct:logQueries"/>
<camel:log message="After the call : \n ${body}"></camel:log>  
</camel:route>

 <camel:route>
<camel:from uri="direct:aggegateQueries"/>
<camel:aggregate strategyRef="aggrTask" completionInterval="8000" >
<camel:correlationExpression>
<camel:xpath>//te:Order</camel:xpath>
</camel:correlationExpression>
<camel:to uri="file:src/data/catask/output?fileName=output.xml"/>

</camel:aggregate>
</camel:route>

我的汇总策略课程如下所示

   public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 
            if (oldExchange == null) { 
            System.out.println("Returning new exchange"); 
                return newExchange; 
            }

            String oldBody = oldExchange.getIn().getBody(String.class); 
            String newBody = newExchange.getIn().getBody(String.class); 
            oldExchange.getIn().setBody(oldBody + "+" + newBody); 
            return oldExchange; 
        }

问题在于,将汇总结果保存在output.xml文件中时,它仅包含从Orders.xml中读取的最后一条记录。

<Order xmlns="http://some/schema/Order">
            <orderNum>6</orderNum>
    </Order>

我对其进行了进一步研究,发现这是因为在第一次调用oldExchange之后应该具有一些值,但事实证明它始终为null。我认为这是因为它是从单个文件读取所有内容并将其拆分,因此只有交换。

> 有什么建议吗?

更新1:
克劳斯,我只能使用Splitter解决此问题。我做到了,并且能够成功加入所有消息。但是我仍在寻找使用Aggregator2的方法。这是我仅使用Splitter的方法。

camel:route>
<camel:from uri="file:src/data/catask/test?noop=true"/>
<camel:log message="${body}"></camel:log>
<camel:split strategyRef="aggrTask"> 
<camel:xpath>//te:Orders/*</camel:xpath>
<camel:to uri="direct:logQueries"/>
 < 
</camel:split>

</camel:route>

<camel:route>
<camel:from uri="direct:logQueries"/>
<camel:log message="After the call : \n ${body}"></camel:log>  
</camel:route>

问题答案:

我想我已经弄清楚了如何使用Aggregator来聚合消息。我添加了一个名为id的headerName并将其用作相关ID。

<camel:route>
  <camel:from uri="file:src/data/catask/test?noop=true"/>
  <camel:log message="${body}"></camel:log>
  <camel:split>
    <camel:xpath>//te:Orders/*</camel:xpath>
    <camel:to uri="direct:addHeaders"/>
    <camel:to uri="direct:aggegateQueries"/>
  </camel:split>
</camel:route>

<camel:route>
  <camel:from uri="direct:addHeaders"/>
  <camel:setHeader headerName="id">
    <camel:constant>order</camel:constant>
  </camel:setHeader>
</camel:route>

<camel:route>
  <camel:from uri="direct:aggegateQueries"/>
  <camel:aggregate strategyRef="aggrTask" completionInterval="8000" >
    <camel:correlationExpression>
      <simple>header.id</simple>
    </camel:correlationExpression>
    <camel:to uri="file:src/data/catask/output?fileName=output.xml"/>
    <camel:log message="MERGED:: /n ${body}"></camel:log>
  </camel:aggregate>
</camel:route>

这汇总了我的消息。但是我仍然不确定,尽管使用了正确的XPATH,但为什么骆驼认为它是不同类型的消息?

在骆驼论坛上复制CLAUS的解释:
“看起来像您的相关表达式,它是每条消息的新组,例如,每个xpath结果是不同的。如果要拆分和合并相同的消息,请参阅此eip http:// camel。
apache.org/composed-message-processor.html
并仅使用分割器查看示例。这样做更加容易。

我使用Xpath评估器工具测试了Xpath表达式,并打印出了相关表达式的结果,并且所有带有// Order的消息都是相同的。前

Group 1: 
<Order>  
  <orderNum>1</orderNum>  
</Order>

Group 2: 
<Order>  
  <orderNum>2</orderNum>  
</Order>


 类似资料:
  • 我想分割交换消息体(它是MyCustomClass对象的列表),处理它们(一个接一个),然后将所有的交换聚合在一起。拆分可以,一个一个处理也可以,但是我想不出怎么把它们聚合起来。 我不需要复杂的聚合,只需要收集分离的交换列表,并在最终的处理器中处理它们。

  • 我试图在聚合器完成后获得一个回复,但是我得到一个异常,我没有指定任何聚合器子项,但是当我指定一个。to()endpoint我没有收到聚合结果。。。这可能吗? 控制器: 聚合器:

  • 目前,我正在与spring integration合作开发新的应用程序,并启动了poc,以了解如何处理故障案例。在我的应用程序中,spring integration将接收来自IBM mq的消息,并根据消息类型验证头信息和到不同队列的路由。传入的消息可能是批量消息,所以我使用了spring integration的拆分器和聚合器,并且对技术工作流程有很好的进展和控制。目前我面临的问题很少,我们有I

  • 我正在尝试通过拖放向导 IO 指标监视某些指标。我想获取在特定时间段内下载的文件数,并且我想自己聚合该指标。例如,假设在 10.00 到 10.15 之间,下载了 60 个文件。我希望指标在此时间段和 10.15 之后为 60,它必须返回零。但是,在 10.15 之后,指标始终返回 60。有没有办法避免其自动聚合?

  • 我试图在使用RESTendpoint的骆驼路由中构建一个分割/聚合模式。它需要一个包含请求详细信息列表的请求对象。我想并行处理请求详细信息,然后将聚合结果返回给调用方。我希望这是一个同步调用。 这是我的路线中的代码。 我希望调用的结果是聚合调用(我的响应对象)的输出。但我实际上得到的是REST调用返回的请求对象?? 当我放入更多的日志语句时,我可以看到Split调用正在触发多个线程,这很好。我可以

  • 我们在Camel中定义了一个具有拆分和聚合功能的路由,但无法在聚合器之后将异常传播回拆分。这导致即使我们遇到异常,拆分也会运行 下面是不工作的代码 上述代码中的处理器(myProcessor)如下: 但是,当我从路由中移除聚合时,Split能够在异常情况下停止路由。