我有一个文件列表和一个分析这些文件的分析器列表。文件数量可以很大(200,000),分析器数量可以很大(1000)。所以操作总数可能非常大(200,000,000)。现在,我需要应用多线程来加快速度。我采用了这种方法:
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for (File file : listOfFiles) {
for (Analyzer analyzer : listOfAnalyzers){
executor.execute(() -> {
boolean exists = file.exists();
if(exists){
analyzer.analyze(file);
}
});
}
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
但这种方法的问题是,它占用了太多的内存,我想有更好的方法可以做到这一点。我还是java和多线程的初学者。
一种想法是采用fork/join算法,将项目(文件)分组成批,以便单独处理。
我的建议如下:
>
以下伪代码演示了可能对您有所帮助的算法:
public static class CustomRecursiveTask extends RecursiveTask<Integer {
private final Analyzer[] analyzers;
private final int threshold;
private final File[] files;
private final int start;
private final int end;
public CustomRecursiveTask(Analyzer[] analyzers,
final int threshold,
File[] files,
int start,
int end) {
this.analyzers = analyzers;
this.threshold = threshold;
this.files = files;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
final int filesProcessed = end - start;
if (filesProcessed < threshold) {
return processSequentially();
} else {
final int middle = (start + end) / 2;
final int analyzersCount = analyzers.length;
final ForkJoinTask<Integer> left =
new CustomRecursiveTask(analyzers, threshold, files, start, middle);
final ForkJoinTask<Integer> right =
new CustomRecursiveTask(analyzers, threshold, files, middle + 1, end);
left.fork();
right.fork();
return left.join() + right.join();
}
}
private Integer processSequentially() {
for (int i = start; i < end; i++) {
File file = files[i];
for(Analyzer analyzer : analyzers) { analyzer.analyze(file) };
}
return 1;
}
}
用法如下所示:
public static void main(String[] args) {
final Analyzer[] analyzers = new Analyzer[]{};
final File[] files = new File[] {};
final int threshold = files.length / 5;
ForkJoinPool.commonPool().execute(
new CustomRecursiveTask(
analyzers,
threshold,
files,
0,
files.length
)
);
}
请注意,根据约束,您可以操作任务的构造函数参数,以便算法根据文件量进行调整。
您可以根据文件量指定不同的阈值
。
final int threshold;
if(files.length > 100_000) {
threshold = files.length / 4;
} else {
threshold = files.length / 8;
}
您还可以根据输入量在< code>ForkJoinPool中指定工作线程的数量。
测量、调整、修改,你最终会解决问题。
希望这有帮助。
更新:
如果对结果分析不感兴趣,可以用< code>RecursiveTask替换< code>RecursiveAction。伪代码增加了自动装箱的开销。
2亿个任务将驻留在哪里?我希望不是在内存中,除非您计划以分布式方式实现您的解决方案。同时,您需要实例化一个不累积大量队列的执行器服务
。创建服务时与“调用方运行策略”(请参见此处)一起使用。如果您尝试在队列已满时将另一个任务放入队列,您最终将自己执行它,这可能是您想要的。
OTOH,现在我更认真地看了你的问题,为什么不同时分析单个文件呢?那么队列永远不会大于分析器的数量。坦白地说,这就是我要做的,因为我想要一个可读的日志,在我加载文件时,每个文件都有一条消息,而且顺序正确。
我很抱歉没有更有帮助:
analysts.stream(). map(分析师-
基本上,为一个文件创建一堆期货,然后在继续前进之前等待所有这些。
我们有一个Spring+JPA web应用程序。我们使用两个tomcat服务器,它们运行两个应用程序并使用相同的数据库。 我们的应用程序requirmemnt之一是预形成cron调度任务。 谢了!
关于spring batch tasklet与Task-Executor的步骤,我遇到了一个奇怪的问题。配置是正常和简单的,只是一个tasklet(不是面向块的),如下所示: someBean是一个实例实现的Tasklet接口。stange的问题是,当我启动作业时,execute方法调用了两次: 实际上,创建了两个线程,并执行了两次该逻辑。如果将task-executor更改为普通的(org.sp
我有一个数据库结果,每一个调用创建500条记录500条,然后下一个500条,然后下一个 我需要运行一个记录每个不同线程执行特定任务的程序 我举的例子如下 ExecutorService executor=Executors.newFixedThreadPool(10); 我的问题是,在完成当前executer服务之前,it需要获得接下来的500个用户并尝试开始处理,我需要停止该操作,直到处理了前5
我想写一个Gradle任务,在我所有的子项目中共享。此任务在调用它的子项目中查找所有其他类型为“GenerateMavenPom”的任务,并执行这些任务。 通过这样做,我的子项目可以定义他们想要的任何Maven发布,我可以使用“gradle GenerateMavenPomFiles”等单个任务执行gradle来创建pom.xml,而不需要知道每个子项目中的单个发布类型。为什么?因为Maven插件
我无法找到必要的信息,无论是在文档中还是在这里已经存在的问题中,这就是为什么我自己创建了一个(我还不能在类似的帖子下提问)。 我需要知道的是Spring任务执行器和调度器之间的关系。我当前的配置如下所示: 我不确定的是它是如何工作的。“谁”运行我的任务?是调度器,因为任务是和他一起安排的吗?或者调度器只是创建它们,放在队列中,由执行者运行它们? 如果没有,运行的是scheduler,我必须在特定类
我正在使用sping-boot,我有这样一个用例,我想将列表的每个元素提交给执行器服务(线程池大小=4)。但是在每个必须处理的元素之间,我想要1秒钟的延迟。 Thread.sleep(1000)不工作,因为执行程序一睡觉就启动另一个线程。 编辑:这是我的process()方法,我在最后尝试使用sleep,但没有成功。