我有一个处理大量文件的程序,其中每个文件需要做两件事:首先,读取并处理文件的一部分,然后存储生成的MyFileData。第一部分可以并行,第二部分不能并行。
按顺序做每件事都非常慢,因为CPU必须等待磁盘,然后工作一点,然后发出另一个请求,然后再次等待。。。
我做了以下事情
class MyCallable implements Callable<MyFileData> {
MyCallable(File file) {
this.file = file;
}
public MyFileData call() {
return someSlowOperation(file);
}
private final File file;
}
for (File f : files) futures.add(executorService.submit(new MyCallable(f)));
for (Future<MyFileData> f : futures) sequentialOperation(f.get());
这很有帮助。然而,我想改进两件事:
>
顺序操作
以固定顺序执行,而不是首先处理任何可用的结果。如何更改它?
有数千个文件需要处理,启动数千个磁盘请求可能会导致磁盘垃圾。通过使用执行器。newFixedThreadPool(10)我限制了这个数字,但我正在寻找更好的。理想情况下,它应该是自调整的,以便在不同的计算机上最佳工作(例如,当RAID和/或NCQ可用时,发出更多请求,等等)。我不认为这可以基于找出硬件配置,但测量处理速度并基于它进行优化应该是可能的。知道吗?
顺序操作按固定顺序执行,而不是先处理任何可用的结果。我怎样才能改变它?
假设:每个<代码>一些操作(文件) 调用将花费可变的时间,因此,您希望在收到MyFileData后立即处理它,但不要与另一个顺序操作同时处理。
您可以通过设置生产者/消费者队列来实现这一点。
生产者是您在示例中执行的可调用对象,带有添加的位,您将结果添加到等待处理的工作队列中。
Consumer是sequentialOperation()调用-它在自己的线程中运行,并且只有一个线程。这个线程所做的就是占据队列的头,并对其进行处理,重复执行,直到程序结束。
这样,您可以最大限度地利用机器上的所有资源。
带有一些示例代码的相关帖子:使用队列的生产者/消费者线程
编辑:我想你可能需要一个快速的样本,因为它对任何以前从未做过的人来说都非常不透明
public class Main {
private final ExecutorService producerExecutor = Executors.newFixedThreadPool(10);
private final ExecutorService consumerExecutor = Executors.newFixedThreadPool(1);
private final LinkedBlockingQueue<MyData> queue = new LinkedBlockingQueue();//or some other impl
abstract class Producer implements Runnable{
private final File file;
Producer(File file) {
this.file = file;
}
public void run() {
MyData result = someLongAssOperation(file);
queue.offer(result);
}
public abstract void someLongAssOperation(File file);
}
abstract class Consumer implements Runnable {
public void run() {
while (true) {
sequentialOperation(queue.take());
}
}
public abstract void sequentialOperation(MyData data);
}
private void start() {
consumerExecutor.submit(new Consumer(){
//implement sequentialOperation here
});
for (File f : files) {
producerExecutor.submit(new Producer(file) {
//implement the someLongAssOperation()
});
}
}
public static void main(String[] args) {
new Main().start();
}
}
顺序操作按固定顺序执行,而不是先处理任何可用的结果。我怎样才能改变它?
这正是CompletionService所做的:它并行处理任务,并在任务完成时返回它们,而不管提交顺序如何。
简化(未测试)示例:
int NUM_THREADS = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
CompletionService<MyFileData> completionService = new ExecutorCompletionService<MyFileData>(executor);
for (File f : files) futures.add(completionService.submit(new MyCallable(f)));
for(int i = 0; i < futures.size(); i++) {
Future<MyFileData> next = completionService.take();
sequentialOperation(next.get());
}
有成千上万的文件要处理,启动成千上万的磁盘请求可能会导致磁盘垃圾。通过使用Executors.newFixedThreadPool(10),我限制了这个数字,但是我正在寻找更好的东西。
我不是100%确定那个。我想这取决于你有多少磁盘,但我认为磁盘访问部分不应该分成太多线程(每个磁盘一个线程可能是明智的):如果多个线程同时访问一个磁盘,它将花费更多的时间寻找而不是阅读。
问题内容: 我有一个程序处理大量文件,其中每个文件都需要做两件事:首先,读取并处理一部分文件,然后存储结果。第一部分可以并行化,第二部分不能并行化。 顺序执行所有操作非常慢,因为CPU必须等待磁盘,然后工作一点,然后发出另一个请求,然后再次等待… 我做了以下 这很有帮助。但是,我想改善两点: 在获取一个固定的顺序,而不是处理任何结果,请首先执行。我该如何更改? 有成千上万的文件要处理,启动成千上万
null 问题1:Spark如何并行处理? 我想大部分的执行时间(99%?)上面的解决方案是从USB驱动器中读取1TB文件到Spark集群中。从USB驱动器读取文件是不可并行的。但是在读取整个文件之后,Spark在底层做了什么来并行处理呢? > 有多少节点用于创建DataFrame?(也许只有一个?) 假设Snappy压缩的Parquet文件小10倍,大小=100GB,HDFS块大小=128 MB
MaxMesssAgesPerPoll 线程(10) 聚合器 但是我在这里与聚合器没有任何关系,只需要从一个远程位置处理每个文件,处理它,然后将它们放在另一个远程位置。
我有从多个文件读取并写入多个文件的Spring批处理配置。是否可以只写入从多个读取的一个文件。假设我收到巨大的XML文件,我将XML拆分为小文件并使用分区器并行读取小文件。但我需要将从不同的小xml文件读取的所有数据写入一个输出文件。Spring批处理是否可以做到这一点?我知道通过使写入器同步是可能的,但我正在寻找任何其他可能的方式作业配置 我得到错误组织。springframework。一批项目
我想创建一个应用程序,每天从一个文件夹中读取一次多个xml文件,然后读取它们并提取数据并构建一个新的xml文件,我想知道哪个选项更适合这种情况: 使用Spring批处理读取并处理所有文件,然后写入新文件
我通过Julia使用GLPK,我需要反复优化同一个GLPK。Prob。每次优化之间的变化是变量的某些组合固定为0 简单的放入伪代码 当我运行这个程序时,看起来CPU1就像一个调度器,保持在9-11%的范围内,CPU3和CPU4上的负载在0和100%之间交替,尽管从来没有同时发生过。。。CPU2上的负载保持在0% 这可能需要一点时间,我想使用所有的核心 然而,使用Julia的并行功能有点麻烦,尤其是