当前位置: 首页 > 知识库问答 >
问题:

用于多文件处理的Camel

唐和洽
2023-03-14

我是骆驼的新手。我将有一个文件处理与骆驼,但我还没有找到一个现成的解决方案,我的情况。我必须一起处理多个文件,以防它们存在。这些文件上载到特定文件夹时会有一些延迟(例如:我们有两个文件a.csv和b.csv,a.csv上载的时间比b.csv晚10秒,反之亦然)。此外,如果一个文件缺席超过特定的时间,我只需要处理一个文件。有谁能帮我选一个图案吗?据我所知,我可以使用camel过滤器来确保我们已经有了这两个文件a.csv和b.csv,然后才开始处理,但这并不能解决我的问题。

共有1个答案

艾望
2023-03-14

这是聚合器EIP。

from("file:inputFolder")
        .aggregate(constant(true), AggregationStrategies.groupedExchange())
        .completionSize(2) //Wait for two files
        .completionTimeout(60000) //Or process single file, if completionSize was not fulfilled within one minute
            .to("log:do_something") //Here you can access List<Exchange> from message body

若要对消息进行分组,可以使用相关expression。对于您的示例(按_前的文件名前缀分组消息),可能如下所示:

private final Expression CORRELATION_EXPRESSION = new Expression() {
    @Override
    public <T> T evaluate(Exchange exchange, Class<T> type) {
        final String fileName = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class);
        final String correlationExpression = fileName.substring(0, fileName.indexOf('_'));
        return exchange.getContext().getTypeConverter().convertTo(
                type,
                correlationExpression
        );
    }
};

并将其传递给聚合器:

from("file:inputDirectory")
    .aggregate(CORRELATION_EXPRESSION, AggregationStrategies.groupedExchange())
    ...
 类似资料:
  • 我的问题陈述。读取包含1000万数据的csv文件,并将其存储在数据库中。用尽可能少的时间 我使用java的简单多线程执行器实现了它,其逻辑几乎与spring batch的chunk相似。从csv文件中读取预配置数量的数据,然后创建一个线程,并将数据传递给线程,该线程验证数据,然后写入多线程运行的文件。完成所有任务后,我将调用sql loader来加载每个文件。现在我想把这段代码移到spring b

  • 我正在尝试运行批处理文件, 它将转到祖父母文件夹,并对其所有子存储库目录执行git pull。出于某种原因,这是行不通的。我怎样才能让它正常工作? 参考以下内容: 如何git拉多个repos上的窗口? 使用Windows 10

  • MaxMesssAgesPerPoll 线程(10) 聚合器 但是我在这里与聚合器没有任何关系,只需要从一个远程位置处理每个文件,处理它,然后将它们放在另一个远程位置。

  • 我需要创建3个单独的文件。 我的批处理作业应该从Mongo读取,然后解析信息并找到“业务”列(3种业务类型:retAIL、HPP、SAX),然后为它们各自的业务创建一个文件。该文件应该创建任何一个retAIL formattedDate;HPP formattedDate;SAX formattedDate作为文件名和在txt文件中的DB中找到的信息。此外,我需要将.资源(new FileSyst

  • 问题内容: 我在python中遇到以下问题。 我需要并行执行一些计算,这些计算的结果需要按顺序写入文件中。因此,我创建了一个函数,该函数接收和文件句柄,进行计算并将结果打印到文件中: 但是脚本运行后文件最终为空。我试图将worker()函数更改为: 并将文件名作为参数传递。然后它按我的预期工作。当我尝试按顺序执行相同的操作而不进行多处理时,它也可以正常工作。 为什么它在第一个版本中不起作用?我看不