我是骆驼的新手。我将有一个文件处理与骆驼,但我还没有找到一个现成的解决方案,我的情况。我必须一起处理多个文件,以防它们存在。这些文件上载到特定文件夹时会有一些延迟(例如:我们有两个文件a.csv和b.csv,a.csv上载的时间比b.csv晚10秒,反之亦然)。此外,如果一个文件缺席超过特定的时间,我只需要处理一个文件。有谁能帮我选一个图案吗?据我所知,我可以使用camel过滤器来确保我们已经有了这两个文件a.csv和b.csv,然后才开始处理,但这并不能解决我的问题。
这是聚合器EIP。
from("file:inputFolder")
.aggregate(constant(true), AggregationStrategies.groupedExchange())
.completionSize(2) //Wait for two files
.completionTimeout(60000) //Or process single file, if completionSize was not fulfilled within one minute
.to("log:do_something") //Here you can access List<Exchange> from message body
若要对消息进行分组,可以使用相关expression
。对于您的示例(按_
前的文件名前缀分组消息),可能如下所示:
private final Expression CORRELATION_EXPRESSION = new Expression() {
@Override
public <T> T evaluate(Exchange exchange, Class<T> type) {
final String fileName = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class);
final String correlationExpression = fileName.substring(0, fileName.indexOf('_'));
return exchange.getContext().getTypeConverter().convertTo(
type,
correlationExpression
);
}
};
并将其传递给聚合器
:
from("file:inputDirectory")
.aggregate(CORRELATION_EXPRESSION, AggregationStrategies.groupedExchange())
...
我的问题陈述。读取包含1000万数据的csv文件,并将其存储在数据库中。用尽可能少的时间 我使用java的简单多线程执行器实现了它,其逻辑几乎与spring batch的chunk相似。从csv文件中读取预配置数量的数据,然后创建一个线程,并将数据传递给线程,该线程验证数据,然后写入多线程运行的文件。完成所有任务后,我将调用sql loader来加载每个文件。现在我想把这段代码移到spring b
我正在尝试运行批处理文件, 它将转到祖父母文件夹,并对其所有子存储库目录执行git pull。出于某种原因,这是行不通的。我怎样才能让它正常工作? 参考以下内容: 如何git拉多个repos上的窗口? 使用Windows 10
MaxMesssAgesPerPoll 线程(10) 聚合器 但是我在这里与聚合器没有任何关系,只需要从一个远程位置处理每个文件,处理它,然后将它们放在另一个远程位置。
我需要创建3个单独的文件。 我的批处理作业应该从Mongo读取,然后解析信息并找到“业务”列(3种业务类型:retAIL、HPP、SAX),然后为它们各自的业务创建一个文件。该文件应该创建任何一个retAIL formattedDate;HPP formattedDate;SAX formattedDate作为文件名和在txt文件中的DB中找到的信息。此外,我需要将.资源(new FileSyst
问题内容: 我在python中遇到以下问题。 我需要并行执行一些计算,这些计算的结果需要按顺序写入文件中。因此,我创建了一个函数,该函数接收和文件句柄,进行计算并将结果打印到文件中: 但是脚本运行后文件最终为空。我试图将worker()函数更改为: 并将文件名作为参数传递。然后它按我的预期工作。当我尝试按顺序执行相同的操作而不进行多处理时,它也可以正常工作。 为什么它在第一个版本中不起作用?我看不