当前位置: 首页 > 知识库问答 >
问题:

Apache Camel Aggegation策略

郑宇
2023-03-14
 <camelContext id="camel" 
 xmlns="http://camel.apache.org/schema/spring">
    <camel:dataFormats>
        <camel:jaxb contextPath="org.example.departments" id="jaxb"/>
    </camel:dataFormats>
    <route id="simple-route">
        <!-- <camel:to id="unmarshallDeps" uri="bean:departmentProcessor"></camel:to> -->
        <from id="_from1" uri="cxf:bean:departmentsEndpoint?dataFormat=PAYLOAD&amp;loggingFeatureEnabled=true"/>
        <camel:unmarshal id="_unmarshal1" ref="jaxb"/>
        <camel:setHeader headerName="departments" id="_setHeader1">
            <camel:method method="getDepartmentRoute" ref="depRouter"/>
        </camel:setHeader>
        <camel:recipientList id="_recipientList1">
            <camel:header>departments</camel:header>
        </camel:recipientList>
        <camel:log id="_log1" message="Body in original cxf after aggregation ******** ${body} and exchange id is ${exchangeId}"/>
    </route>
    <camel:route id="_route1">
        <camel:from id="_from2" uri="direct:finance"/>
        <camel:to id="_to1" uri="mySqlComponent:select id,Location,Head,email,create_date from finance"/>
        <camel:to id="_to2" pattern="InOut" uri="seda:departmentAggregator"/>
    </camel:route>
    <camel:route id="_route2">
        <camel:from id="_from3" uri="direct:sales"/>
        <camel:to id="_to3" uri="mySqlComponent:select id,Location,Head,email,create_date from sales"/>
        <camel:to id="_to4" pattern="InOut" uri="seda:departmentAggregator"/>
    </camel:route>
    <camel:route id="_route3">
        <camel:from id="_from4" uri="direct:hr"/>
        <camel:to id="_to5" uri="mySqlComponent:select id,Location,Head,email,create_date from hr"/>
        <camel:to id="_to6" pattern="InOut" uri="seda:departmentAggregator"/>
    </camel:route>
    <camel:route id="_route4">
        <camel:from id="_from5" uri="seda:departmentAggregator"/>
        <camel:aggregate completionSize="2" id="_aggregate1" strategyRef="myAggregator">
            <camel:correlationExpression>
                <camel:constant>Constant</camel:constant>
            </camel:correlationExpression>
            <camel:log message="Aggregated Body : ${body} and exchange id is ${exchangeId}"></camel:log>
            <!-- <camel:marshal id="_marshal1" ref="jaxb"/> -->
            <!-- <camel:setBody id="_setBody1">
                <camel:simple>${body}</camel:simple>
            </camel:setBody> -->
        </camel:aggregate>
        <camel:log id="_log4" message="Body outside aggregate  : ${body} and exchange id is ${exchangeId}"/>
    </camel:route>
</camelContext>

现在,我注意到,在聚合内部打印的body实际上是一个聚合的body,包含了所有的员工,但是在聚合外部打印body时,它打印的是最新的交换,而不是聚合的交换。下面是我的聚合策略:

public Exchange aggregate(Exchange oldExchange, Exchange newExchange) 
{

    ArrayList<Map<String, Object>> depList = newExchange.getIn().getBody(ArrayList.class);
    int depCount = depList.size();

    System.out.println("Department Count is : " + depCount);

    if (oldExchange == null) {

        for (int i = 0; i < depCount; i++) {

            Map<String, Object> row = (Map) depList.get(i);
            Department newDepartment = new Department();
            newDepartment.setLocation((String) row.get("Location"));
            newDepartment.setHead((String) row.get("Head"));
            newDepartment.setEmail((String) row.get("email"));
            departments.getDepartment().add(newDepartment);
        }
        newExchange.getIn().setBody(departments);
        return newExchange;
    } else {
        System.out.println("New Exchange: Department Count is  : " + depCount);
        departments = oldExchange.getIn().getBody(Departments.class);
        System.out.println("Aggregate Department count : " + departments.getDepartment().size());

        for (int j = 0; j < depCount; j++) {

            Map<String, Object> row = (Map) newExchange.getIn().getBody(ArrayList.class).get(j);
            Department newDepartment = new Department();
            newDepartment.setLocation((String) row.get("Location"));
            newDepartment.setHead((String) row.get("Head"));
            newDepartment.setEmail((String) row.get("email"));
            departments.getDepartment().add(newDepartment);

        }
        newExchange.getIn().setBody(departments);
        oldExchange.getIn().setBody(departments);
    }

    // System.out.println("exchange is out capable ? : " +
    // newExchange.getPattern().isOutCapable());
    return oldExchange;
}

共有1个答案

萧阳波
2023-03-14

聚合策略通常是无缝嵌入到EIPS中的东西;你不必手动调用,骆驼为你做。

recipientList正是接受聚合策略作为参数的EIPs之一。参见Camel Doc中的示例

因此,实际上不需要“seda:departmentaggregator”路由(以及“direct:[department]”路由中对它的所有调用)。相反,它的内容应该+/-包含在recipientList定义中。

 类似资料:
  • 通过定义隐藏操作菜单、配置回调地址的策略,并将策略分配给对应项目、域、全局,从而实现自定义控制前端显示菜单、显示页面等。 策略分配 策略分配即为策略设置应用范围,在应用范围内策略才会生效。 策略定义 用于自定义设置隐藏虚拟机、镜像菜单功能以及配置第三方回调地址。

  • WAF策略用于为Web应用提供集中式保护,使其免受常见攻击和漏洞的侵害。 WAF(Web Application Firewall)用于为Web应用提供集中式保护,使其免受常见攻击和漏洞的侵害。WAF可以有效识别Web业务流量的恶意特征,在对流量进行清洗和过滤后,将正常、安全的流量返回给服务器,避免网站服务器被恶意入侵导致服务器性能异常等问题,保障网站的业务安全和数据安全。 目前仅只读对接AWS、

  • 开始第一个策略 新建策略 编写Hello OpenQuant策略 导入第三方库 常用的策略事件 订单类型

  • 中文图形界面 由于 Linux 的控制台不能方便的显示中文,所以最实用的方案是“英文控制台+中文图形界面” 为了能够正常处理中文,需要使用locale-gen生成中文 locale,在/etc/locale.gen文件中添加如下内容: zh_CN.UTF-8 UTF-8 zh_CN.GB18030 GB18030 zh_CN.GBK GBK zh_CN GB2312 然后在 gdm 启动菜

  • OnStrategyStart – 在策略启动时调用,在第一笔行情到达之前 OnStrategyStop – 在策略结束时调用,在最后一笔行情之后 OnBarOpen – 在Bar行情最前沿调用(如,在日线数据开盘时买入) OnBar – 在所有行情的后沿调用(如,在日线数据收盘时买入) OnPositionOpened – 当一个新的交易开仓确认后调用 OnPositionChanged – 当

  • Solution:解决方案 Project:项目 一个解决方案下可以有多个项目,但是只有一个启动项 双击cs文件可以打开编辑代码 新建策略 在菜单栏File->New->Solution,新建一个解决方案 选择新建SmartQuant Instrument Strategy Solution模式的解决方案 Solution的类型 说明 SmartQuant Instrument Strategy

  • 主要内容:介绍,实现,Strategy.java,OperationAdd.java,OperationSubtract.java,OperationMultiply.java,Context.java,StrategyPatternDemo.java在策略模式(Strategy Pattern)中,一个类的行为或其算法可以在运行时更改。这种类型的设计模式属于行为型模式。 在策略模式中,我们创建表示各种策略的对象和一个行为随着策略对象改变而改变的 context 对象。策略对象改变 contex

  • Ansible Playbooks 的集成测试 很多时候, 人们问, “我怎样才能最好的将 Ansible playbooks 和测试结合在一起?” 这有很多选择. Ansible 的设计实际上是一个”fail-fast”有序系统, 因此它可以很容易地嵌入到 Ansible playbooks. 在这一章节, 我们将讨论基础设施的集成测试及合适的测试等级. Note 这是一个关于测试你部署应用程序