当前位置: 首页 > 知识库问答 >
问题:

阿帕奇骆驼——如何同步“窃听”?还是只发一份交换?

公良育
2023-03-14

我有一个apache骆驼路由,它正在交换主体上处理POJO。

请看从1到3标记的行序列。

    from("direct:foo")
        .to("direct:doSomething")         // 1 (POJO on the exchange body)
        .to("direct:storeInHazelcast")    // 2 (destroys my pojo! it gets -1)
        .to("direct:doSomethingElse")     // 3 (Where is my POJO??)
    ;

现在,我需要对< code>hazelcast组件使用< code>put操作,不幸的是,该组件需要将body设置为值-1。

    from("direct:storeInHazelcast")
            .setBody(constant(-1))
            .setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.PUT_OPERATION))
            .setHeader(HazelcastConstants.OBJECT_ID, constant(LAST_FLIGHT_UPDATE_SEQ))
            .to("hazelcast:map:MyNumber")
    ;

对于标记为 2 的行,我想将交换的副本发送到商店InHazelcast 路由。

首先,我尝试了. Multicast(),但交换体仍然被搞砸了(到-1)。

        // shouldnt this copy the exchange?
        .multicast().to("direct:storeInHazelcast").end()

然后我尝试。wireTap(),它是一种“fire-and-forget”(异步)模式,但我实际上需要它来阻止,并等待它完成。你会制作wireTap块吗?

        // this works but I need it to be sync processing (not async)
        .wireTap("direct:storeInHazelcast").end()

所以我在这里寻找一些提示。据我所知,< code>multicast()应该已经复制了交换,但是我的< code>storeInHazelcast路线中的< code>setBody()似乎破坏了原始交换。

或者,也许有其他方法可以做到这一点。

先谢了。骆驼2.10

共有3个答案

楚天宇
2023-03-14

将其保存在标题中并恢复。

from("direct:foo")
    .to("direct:doSomething")         // 1 (POJO on the exchange body)
    .setHeader("old_body", body())    // save body
    .to("direct:storeInHazelcast")    // 2 (destroys my pojo! it gets -1)
    .setBody(header("old_body"))      // RESTORE the body
    .removeHeader("old_body")         // cleanup header
    .to("direct:doSomethingElse")     // 3 (Where is my POJO??)
;

对于破坏性组件,这是一种相当常见的范式。

南门魁
2023-03-14

无需编写自己的聚合策略即可使用

.enrich("direct:storeInHazelcast", AggregationStrategies.useOriginal())
陆文康
2023-03-14

我想我已经偶然发现了答案,第 2 行可以使用 dsl 中的 enrich(),如下所示:

    .enrich("direct:storeInHazelcast", new KeepOriginalAggregationStrategy())

其中:

public class KeepOriginalAggregationStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        return oldExchange;
    }
}

有趣的是,我发现了一个名为< code > UseOriginalAggregationStrategy()的聚合策略,但是我看不到如何从DSL中指定名为< code>Exchange original的参数。

    .enrich("direct:storeInHazelcast",
        new UseOriginalAggregationStrategy(???, false))

在dsl中缺少某种< code>getExchange()方法的情况下,我看不出如何在这里使用这种聚合策略(但是如果有人能提供建议,请提供)。

 类似资料:
  • 我试图使用Apache Camel Quartz2实现一个调度器,它每分钟执行一次路由,并按预期执行一些任务。我使用spring DSL实现与apache camel相关联的路由,如下所示: 根据日志,它不会记录为路由记录的消息,例如Direct:DomainsWithFTPUsers等等。请指导如何实现同样的目标。

  • 遵循官方文件(https://camel.apache.org/manual/component-dsl.html#_using_component_dsl)我创建了以下代码: 但是中的告诉我: 并且中的特性不建议导入相应的库。 有人能给我指出正确的方向吗? 我必须理解的概念才能做到这一点吗?

  • 我正在使用带有Apache骆驼的Spring Boot。我正在从控制器调用路由。一旦路由完成,控制就会返回控制器。我正在VerifyLimitProcess和批准限制处理器中生成响应。如果我没有在路由中提供窃听配置,控制器会按预期检索标头和正文。但如果我在路由中引入窃听,控制器会将标头和正文接收为null。如果有人指出我需要做什么,以便我可以在选择语句中引入两个处理器的窃听配置,即VerifyLi

  • 我们需要的是直接的API来设置和使用集群消息队列。我们最初的计划是使用Camel在集群JMS或ActiveMQ队列上进行消费/生产。Kafka如何使这项任务变得更容易?在任何一种情况下,应用程序本身都将在WebLogic服务器上运行。 消息传递将是点对点类型,其中有多个相同服务的实例在运行,但根据负载平衡策略,只有一个实例应该处理消息并发出结果。消息队列也是群集的,因此服务实例或队列实例的失败都不

  • 考虑到apache Camel,我有一个问题:是否可以通过代码来创建全局拦截器,例如AOP?拦截器应该跳过endpoint还是模仿endpoint? 提前致谢

  • 如何使用Apache Camel调用带有空消息体的SOAP Web服务? 例如,路由上的最后一个endpoint将是调用我的代理上采用 0 个参数的方法。 编辑: xml配置示例: 问题是 WS 上的方法“invoke”需要 0 个参数,并且会抛出一个异常,指出正在接收 1 个参数。有没有办法让我指定忽略此收到的输入?