我有一个程序处理大量文件,其中每个文件都需要做两件事:首先,读取并处理一部分文件,然后MyFileData
存储结果。第一部分可以并行化,第二部分不能并行化。
顺序执行所有操作非常慢,因为CPU必须等待磁盘,然后工作一点,然后发出另一个请求,然后再次等待…
我做了以下
class MyCallable implements Callable<MyFileData> {
MyCallable(File file) {
this.file = file;
}
public MyFileData call() {
return someSlowOperation(file);
}
private final File file;
}
for (File f : files) futures.add(executorService.submit(new MyCallable(f)));
for (Future<MyFileData> f : futures) sequentialOperation(f.get());
这很有帮助。但是,我想改善两点:
在sequentialOperation
获取一个固定的顺序,而不是处理任何结果,请首先执行。我该如何更改?
有成千上万的文件要处理,启动成千上万的磁盘请求可能会导致磁盘损坏。通过使用Executors.newFixedThreadPool(10)
我限制了这个数字,但是我正在寻找更好的东西。理想情况下,它应该是自调整的,以使其在不同的计算机上最佳工作(例如,在RAID和/或NCQ可用时发出更多请求,等等)。我认为这可能不是基于发现硬件配置,但是应该可以 以某种方式 测量处理速度并基于它进行优化。任何想法?
serialOperation将以固定的顺序执行,而不是先处理任何可用的结果。我该如何更改?
这正是CompletionService的工作:它并行处理任务,并在完成任务时将其返回,而与提交顺序无关。
简化(未经测试)的示例:
int NUM_THREADS = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
CompletionService<MyFileData> completionService = new ExecutorCompletionService<MyFileData>(executor);
for (File f : files) futures.add(completionService.submit(new MyCallable(f)));
for(int i = 0; i < futures.size(); i++) {
Future<MyFileData> next = completionService.take();
sequentialOperation(next.get());
}
有成千上万的文件要处理,启动成千上万的磁盘请求可能会导致磁盘损坏。通过使用Executors.newFixedThreadPool(10),我限制了这个数字,但是我正在寻找更好的东西。
我不确定那一个。我想这取决于您有多少个磁盘,但是我会认为磁盘访问部分不应拆分成太多线程(每个磁盘一个线程可能是明智的):如果有多个线程同时访问一个磁盘,它将花费比阅读更多的时间。
我有一个处理大量文件的程序,其中每个文件需要做两件事:首先,读取并处理文件的一部分,然后存储生成的MyFileData。第一部分可以并行,第二部分不能并行。 按顺序做每件事都非常慢,因为CPU必须等待磁盘,然后工作一点,然后发出另一个请求,然后再次等待。。。 我做了以下事情 这很有帮助。然而,我想改进两件事: > 以固定顺序执行,而不是首先处理任何可用的结果。如何更改它? 有数千个文件需要处理,启
null 问题1:Spark如何并行处理? 我想大部分的执行时间(99%?)上面的解决方案是从USB驱动器中读取1TB文件到Spark集群中。从USB驱动器读取文件是不可并行的。但是在读取整个文件之后,Spark在底层做了什么来并行处理呢? > 有多少节点用于创建DataFrame?(也许只有一个?) 假设Snappy压缩的Parquet文件小10倍,大小=100GB,HDFS块大小=128 MB
MaxMesssAgesPerPoll 线程(10) 聚合器 但是我在这里与聚合器没有任何关系,只需要从一个远程位置处理每个文件,处理它,然后将它们放在另一个远程位置。
我通过Julia使用GLPK,我需要反复优化同一个GLPK。Prob。每次优化之间的变化是变量的某些组合固定为0 简单的放入伪代码 当我运行这个程序时,看起来CPU1就像一个调度器,保持在9-11%的范围内,CPU3和CPU4上的负载在0和100%之间交替,尽管从来没有同时发生过。。。CPU2上的负载保持在0% 这可能需要一点时间,我想使用所有的核心 然而,使用Julia的并行功能有点麻烦,尤其是
问题内容: 我可以一次下载一个文件: 我可以这样尝试: 有没有不使用或作弊的并行化方法? 鉴于我现在必须诉诸“作弊”,是否是下载数据的正确方法? 使用上述方法时,它使用的是多线程而不是多核的,是否正常?有没有办法使它成为多核而不是多线程? 问题答案: 您可以使用线程池并行下载文件: 您还可以使用以下命令在一个线程中一次下载多个文件: 这里定义在哪里。
我的问题陈述。读取包含1000万数据的csv文件,并将其存储在数据库中。用尽可能少的时间 我使用java的简单多线程执行器实现了它,其逻辑几乎与spring batch的chunk相似。从csv文件中读取预配置数量的数据,然后创建一个线程,并将数据传递给线程,该线程验证数据,然后写入多线程运行的文件。完成所有任务后,我将调用sql loader来加载每个文件。现在我想把这段代码移到spring b