当前位置: 首页 > 知识库问答 >
问题:

优化多个文件的并行处理

谢学名
2023-03-14

我有一个处理大量文件的程序,其中每个文件需要做两件事:首先,读取并处理文件的一部分,然后存储生成的MyFileData。第一部分可以并行,第二部分不能并行。

按顺序做每件事都非常慢,因为CPU必须等待磁盘,然后工作一点,然后发出另一个请求,然后再次等待。。。

我做了以下事情

class MyCallable implements Callable<MyFileData> {
    MyCallable(File file) {
        this.file = file;
    }
    public MyFileData call() {
        return someSlowOperation(file);
    }
    private final File file;
}

for (File f : files) futures.add(executorService.submit(new MyCallable(f)));
for (Future<MyFileData> f : futures) sequentialOperation(f.get());

这很有帮助。然而,我想改进两件事:

>

  • 顺序操作以固定顺序执行,而不是首先处理任何可用的结果。如何更改它?

    有数千个文件需要处理,启动数千个磁盘请求可能会导致磁盘垃圾。通过使用执行器。newFixedThreadPool(10)我限制了这个数字,但我正在寻找更好的。理想情况下,它应该是自调整的,以便在不同的计算机上最佳工作(例如,当RAID和/或NCQ可用时,发出更多请求,等等)。我不认为这可以基于找出硬件配置,但测量处理速度并基于它进行优化应该是可能的。知道吗?

  • 共有2个答案

    俞新翰
    2023-03-14

    顺序操作按固定顺序执行,而不是先处理任何可用的结果。我怎样才能改变它?

    假设:每个<代码>一些操作(文件) 调用将花费可变的时间,因此,您希望在收到MyFileData后立即处理它,但不要与另一个顺序操作同时处理。

    您可以通过设置生产者/消费者队列来实现这一点。

    生产者是您在示例中执行的可调用对象,带有添加的位,您将结果添加到等待处理的工作队列中。

    Consumer是sequentialOperation()调用-它在自己的线程中运行,并且只有一个线程。这个线程所做的就是占据队列的头,并对其进行处理,重复执行,直到程序结束。

    这样,您可以最大限度地利用机器上的所有资源。

    带有一些示例代码的相关帖子:使用队列的生产者/消费者线程

    编辑:我想你可能需要一个快速的样本,因为它对任何以前从未做过的人来说都非常不透明

    public class Main {
    
        private final ExecutorService producerExecutor = Executors.newFixedThreadPool(10);
        private final ExecutorService consumerExecutor = Executors.newFixedThreadPool(1);
        private final LinkedBlockingQueue<MyData> queue = new LinkedBlockingQueue();//or some other impl
    
        abstract class Producer implements Runnable{
            private final File file;
            Producer(File file) {
                this.file = file;
            }
    
            public void run() {
                MyData result = someLongAssOperation(file);
                queue.offer(result);
            }
    
            public abstract void someLongAssOperation(File file);
        }
    
        abstract class Consumer implements Runnable {
            public void run() {
                while (true) {
                    sequentialOperation(queue.take());  
                }
            }
    
            public abstract void sequentialOperation(MyData data);
        } 
    
        private void start() {
            consumerExecutor.submit(new Consumer(){
                //implement sequentialOperation here
            });
    
            for (File f : files) {
                producerExecutor.submit(new Producer(file) {
                    //implement the someLongAssOperation()
                });
            }
    
        }
    
        public static void main(String[] args) {
            new Main().start();     
        } 
    
    }
    
    司空劲
    2023-03-14

    顺序操作按固定顺序执行,而不是先处理任何可用的结果。我怎样才能改变它?

    这正是CompletionService所做的:它并行处理任务,并在任务完成时返回它们,而不管提交顺序如何。

    简化(未测试)示例:

    int NUM_THREADS = Runtime.getRuntime().availableProcessors();
    ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
    CompletionService<MyFileData> completionService = new ExecutorCompletionService<MyFileData>(executor);
    
    for (File f : files) futures.add(completionService.submit(new MyCallable(f)));
    
    for(int i = 0; i < futures.size(); i++) {
        Future<MyFileData> next = completionService.take();
        sequentialOperation(next.get());
    }
    

    有成千上万的文件要处理,启动成千上万的磁盘请求可能会导致磁盘垃圾。通过使用Executors.newFixedThreadPool(10),我限制了这个数字,但是我正在寻找更好的东西。

    我不是100%确定那个。我想这取决于你有多少磁盘,但我认为磁盘访问部分不应该分成太多线程(每个磁盘一个线程可能是明智的):如果多个线程同时访问一个磁盘,它将花费更多的时间寻找而不是阅读。

     类似资料:
    • 问题内容: 我有一个程序处理大量文件,其中每个文件都需要做两件事:首先,读取并处理一部分文件,然后存储结果。第一部分可以并行化,第二部分不能并行化。 顺序执行所有操作非常慢,因为CPU必须等待磁盘,然后工作一点,然后发出另一个请求,然后再次等待… 我做了以下 这很有帮助。但是,我想改善两点: 在获取一个固定的顺序,而不是处理任何结果,请首先执行。我该如何更改? 有成千上万的文件要处理,启动成千上万

    • null 问题1:Spark如何并行处理? 我想大部分的执行时间(99%?)上面的解决方案是从USB驱动器中读取1TB文件到Spark集群中。从USB驱动器读取文件是不可并行的。但是在读取整个文件之后,Spark在底层做了什么来并行处理呢? > 有多少节点用于创建DataFrame?(也许只有一个?) 假设Snappy压缩的Parquet文件小10倍,大小=100GB,HDFS块大小=128 MB

    • MaxMesssAgesPerPoll 线程(10) 聚合器 但是我在这里与聚合器没有任何关系,只需要从一个远程位置处理每个文件,处理它,然后将它们放在另一个远程位置。

    • 我有从多个文件读取并写入多个文件的Spring批处理配置。是否可以只写入从多个读取的一个文件。假设我收到巨大的XML文件,我将XML拆分为小文件并使用分区器并行读取小文件。但我需要将从不同的小xml文件读取的所有数据写入一个输出文件。Spring批处理是否可以做到这一点?我知道通过使写入器同步是可能的,但我正在寻找任何其他可能的方式作业配置 我得到错误组织。springframework。一批项目

    • 我想创建一个应用程序,每天从一个文件夹中读取一次多个xml文件,然后读取它们并提取数据并构建一个新的xml文件,我想知道哪个选项更适合这种情况: 使用Spring批处理读取并处理所有文件,然后写入新文件

    • 我通过Julia使用GLPK,我需要反复优化同一个GLPK。Prob。每次优化之间的变化是变量的某些组合固定为0 简单的放入伪代码 当我运行这个程序时,看起来CPU1就像一个调度器,保持在9-11%的范围内,CPU3和CPU4上的负载在0和100%之间交替,尽管从来没有同时发生过。。。CPU2上的负载保持在0% 这可能需要一点时间,我想使用所有的核心 然而,使用Julia的并行功能有点麻烦,尤其是