当前位置: 首页 > 知识库问答 >
问题:

Apache camel:路由中的多线程

慕容超
2023-03-14

String postProcessor=“file:

from(processFiles).threads(10).routeid(“someid”)
.to(“bean:someBean”);

从(postProcessor).routeid(“PostProcress”).到(“bean:PostProcessorBean”);

解决方案已经到位。但目前需要更多的时间。因此,我们尝试在camel路由级别使用线程,以便可以同时处理多个文件。现在有了这个,我们可以最小化时间,但不能进行后处理(即步骤2)

共有1个答案

东方新霁
2023-03-14

使用聚合器模式:

from("file:src/main/resources/data/parallel-file-processing?noop=true")
    .threads(10)
    .process(new PreProcessor())
    .aggregate(constant(true), new ArrayListAggregationStrategy())
    .completionFromBatchConsumer()
    .process(new PostProcessor());

preprocessor实现processor接口,在10个独立的线程中执行耗时的工作。

预处理的结果用aggrege聚合,使用取自Camel聚合器文档的ArrayListAggregatorStrategy:

//simply combines Exchange body values into an ArrayList<Object>
public class ArrayListAggregationStrategy implements AggregationStrategy {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        Object newBody = newExchange.getIn().getBody();
        List<Object> list = null;
        if (oldExchange == null) {
            list = new ArrayList<Object>();
            list.add(newBody);
            newExchange.getIn().setBody(list);
            return newExchange;
        } else {
            list = oldExchange.getIn().getBody(List.class);
            list.add(newBody);
            return oldExchange;
        }
    }
}
private static class PostProcessor implements Processor {
    @SuppressWarnings("unchecked")
    @Override
    public void process(final Exchange exchange) throws Exception {
        final Object body = exchange.getIn().getBody();
        final List<GenericFile<File>> list = (List<GenericFile<File>>) body;
        for (final GenericFile<File> genericFile : list) {
            LOG.info("file = " + genericFile.getAbsoluteFilePath());
        }
    }
}
from("file:src/main/resources/data/parallel-file-processing?noop=true")
    .threads(10)
    .process(new PreProcessor())
    .aggregate(constant(true), new ArrayListAggregationStrategy())
    .completionFromBatchConsumer()
    .parallelProcessing()  // <- for parallel processing
    .process(new PostProcessor());
 类似资料:
  • 我需要多个嵌套路由 我用的是react-router-dom的v4 我有我的 我需要组件渲染成这样 Home组件包含Page1、Page2和Page3组件共有的标题组件,但不存在于Login和About中。 我的js代码是这样读的 我希望登录组件只显示在 /login当我请求 /page1、 /page2、 /page3时,它们应该分别包含主页组件和该页面的内容。 取而代之的是呈现的登录组件,在该

  • 问题内容: 有没有办法在单个函数调用上做到这一点? 就像是: 我知道这是一个语法混乱,但是只是为了给我一个我想实现的目标一个思路,一系列路由就很棒了! 有人知道怎么做吗? 问题答案: 我在寻找相同功能时遇到了这个问题。 @Jonathan Ong在上面的评论中提到,不建议将数组用于路径,但已在Express 4中对其进行了明确描述,并且它在Express 3.x中有效。这是尝试的示例: 从对象内部

  • C1B无线云路由配置零门槛,无线组网快速扩容,钉钉免密一键连网安全便捷。 安装方式 桌面平放 配置说明 设备通电和连网 配置:连接WiFi、打开钉钉 绑定团队,可设置上网人员范围,选中的人员才可连网 设置网络名称 配置完成后,WiFi将重启,员工会收到连网工作通知 扩容添加更多C1B 无线组网,灵活简便 智能办公网络管理 智能办公网络 如何连网 员工连网 通过钉钉授权免密一键连网,无需输入密码,也

  • 如果我将路由折叠起来,这样看起来就像: 工作很好。我嵌套的原因是因为我将在“dashboard”下有多个子项,并且希望它们都在URL中以为前缀。

  • 问题内容: 我有一个提供文件列表的应用程序。 应用程序必须响应以下路由: 这是没有指定路径的路由,例如目录。我在想类似的东西显然不起作用。 如何定义一个响应所有示例的路线? 问题答案: 使用(注意尾随星号)。 完整的例子