当前位置: 首页 > 知识库问答 >
问题:

骆驼:同一骆驼环境中并行路线之间的同步

牧飞鹏
2023-03-14

我正在研究一个骆驼原型,它在同一骆驼环境中使用两个起点。

第一个路由使用用于“配置”应用程序的消息。消息通过配置服务bean加载到配置存储库中:

    // read configuration files
    from("file:data/config?noop=true&include=.*.xml")
        .startupOrder(1)
        .to("bean:configService?method=loadConfiguration")
        .log("Configuration loaded");   

第二个路由实现了收件人列表eip模式,将不同类型的输入消息传递给许多收件人,这些收件人从同一个配置存储库中按顺序读取:

    // process some source files (using configuration)       
    from("file:data/source?noop=true")
        .startupOrder(2)
        .unmarshal()
        .to("setupProcessor") // set "recipients" header
        .recipientList(header("recipients"))

    // ...

现在出现的问题是如何同步它们,因此如果第一个路由正在处理新数据,则第二个路由“等待”。

我是Apache Camel的新手,对如何处理这样的问题感到非常迷茫,任何建议将不胜感激。

共有3个答案

祁高格
2023-03-14

ApachecamelFile将为正在处理的文件创建一个锁。如果存在锁,则此文件上的任何其他文件进程都不会共享(除非您将consumer.extrolveReadLock设置为false)

源:

http://camel.apache.org/file.html=

乐正峰
2023-03-14

您还可以在第一条路线中放置“完成时”http://camel.apache.org/oncompletion.html,以激活第二条路线。

翁和颂
2023-03-14

结合动态启动和停止路线的可能性,使用聚合

from("file:data/config?noop=true&include=.*.xml")
    .id("route-config")
    .aggregate(constant(true), new MyAggregationStrategy()).completionSize(2).completionTimeout(2000)
    .process(new Processor() {
        @Override
        public void process(final Exchange exchange) throws Exception {
            exchange.getContext().startRoute("route-source");
        }
    });

from("file:data/source?noop=true&idempotent=false")
    .id("route-source")                              // the id is needed so that the route is found by the start and stop processors
    .autoStartup(false)                              // this route is only started at runtime
    .aggregate(constant(true), new MyAggregationStrategy()).completionSize(2).completionTimeout(2000)
    .setHeader("recipients", constant("direct:end")) // this would be done in a separate processor
    .recipientList(header("recipients"))
    .to("seda:shutdown");                            // shutdown asynchronously or the route would be waiting for pending exchanges

from("seda:shutdown")
    .process(new Processor() {
        @Override
        public void process(final Exchange exchange) throws Exception {
            exchange.getContext().stopRoute("route-source");
        }
    });

from("direct:end")
    .log("End");

这样,< code>route-source仅在< code>route-config完成时启动。如果在< code>config目录中发现新文件,则< code>route-config和< code>route-source会重新启动。

 类似资料:
  • 问题内容: 我一直在仔细研究Apache Camel文档,试图对它的两个 最基本 概念(端点和路由)有一个具体的了解,尽管这些术语在文档中各处都使用,但是我找不到真正定义它们的参考。是以及它们的用途。尽管它们的名称听起来很明显,而且我 想 我理解它们的含义,但是现在我已被分配到一项使我深深陷入Apache Camel Land的任务,而了解这些机制的绝对至关重要是。 我的猜测是,“端点”只是一个b

  • 问题内容: 我尝试对Apache骆驼路线进行junit测试。像这样的东西: 构建器类的定义如下 “ myExportRouteProcessor”类仅从JPA存储库中获取一些数据,并将结果放入路由。我想要的是在测试类中触发此路由,以检查整个过程是否正确完成。当前,处理器未启动。我该怎么办? 问题答案: 您可以使用AdviceWithRouteBuilder#replaceFromWith直接替换测

  • 我试着为阿帕奇骆驼路线做一个jUnit测试。类似于这样: builder类的定义如下 myExportRouteProcencer类只是从JPA存储库中获取一些数据,并将结果放入路由。我想要的是在测试类中触发此路由,以检查整个过程是否正确完成。目前,处理器没有被触发。我应该做些什么?

  • 嗨,我在camel中有一个JMS消费者路由,我的要求是在特定事件时停止/暂停该路由(基于某个字段值),然后使用调度器恢复该路由。为此,我创建了两个路由,一个是我的原始jms消费者路由,另一个是调度程序路由,它们恢复jms消费者路由,虽然我能够暂停路由,但第二个路由不恢复暂停的路由,它显示的状态为已启动。 以下是我的两条路线 请帮助我如何实现上述场景。

  • 我有一个Springboot应用程序,其中配置了一些驼峰路线。 我想测试从到的路由。我试过这里提到的不同东西http://camel.apache.org/camel-test.html,但似乎无法让它工作。 我正在尝试这样做: 但是我的ProducerTemplate总是。我尝试了自动连接CamelContext,但遇到一个异常,它无法解析CamelContext。但这可以通过添加到类。但是我的

  • 我是Camel的新手,网上没有类似的问题让我相信我在做一些愚蠢的事情。我正在使用camel 2.12.1组件,并且正在从本地目录解析大型CSV文件,并通过SFTP下载它们。我发现 拆分(body(). Tokenize("\n")).流().散集(). csv() 适用于本地文件(windows 7);我与 列表 对于csv文件中的每一行。但是,当我从sftp组件(连接到linux服务器下载文件)