当前位置: 首页 > 面试题库 >

优化许多文件的并行处理

蒙奇
2023-03-14
问题内容

我有一个程序处理大量文件,其中每个文件都需要做两件事:首先,读取并处理一部分文件,然后MyFileData存储结果。第一部分可以并行化,第二部分不能并行化。

顺序执行所有操作非常慢,因为CPU必须等待磁盘,然后工作一点,然后发出另一个请求,然后再次等待…

我做了以下

class MyCallable implements Callable<MyFileData> {
    MyCallable(File file) {
        this.file = file;
    }
    public MyFileData call() {
        return someSlowOperation(file);
    }
    private final File file;
}

for (File f : files) futures.add(executorService.submit(new MyCallable(f)));
for (Future<MyFileData> f : futures) sequentialOperation(f.get());

这很有帮助。但是,我想改善两点:

  • sequentialOperation获取一个固定的顺序,而不是处理任何结果,请首先执行。我该如何更改?

  • 有成千上万的文件要处理,启动成千上万的磁盘请求可能会导致磁盘损坏。通过使用Executors.newFixedThreadPool(10)我限制了这个数字,但是我正在寻找更好的东西。理想情况下,它应该是自调整的,以使其在不同的计算机上最佳工作(例如,在RAID和/或NCQ可用时发出更多请求,等等)。我认为这可能不是基于发现硬件配置,但是应该可以 以某种方式 测量处理速度并基于它进行优化。任何想法?


问题答案:

serialOperation将以固定的顺序执行,而不是先处理任何可用的结果。我该如何更改?

这正是CompletionService的工作:它并行处理任务,并在完成任务时将其返回,而与提交顺序无关。

简化(未经测试)的示例:

int NUM_THREADS = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
CompletionService<MyFileData> completionService = new ExecutorCompletionService<MyFileData>(executor);

for (File f : files) futures.add(completionService.submit(new MyCallable(f)));

for(int i = 0; i < futures.size(); i++) {
    Future<MyFileData> next = completionService.take();
    sequentialOperation(next.get());
}

有成千上万的文件要处理,启动成千上万的磁盘请求可能会导致磁盘损坏。通过使用Executors.newFixedThreadPool(10),我限制了这个数字,但是我正在寻找更好的东西。

我不确定那一个。我想这取决于您有多少个磁盘,但是我会认为磁盘访问部分不应拆分成太多线程(每个磁盘一个线程可能是明智的):如果有多个线程同时访问一个磁盘,它将花费比阅读更多的时间。



 类似资料:
  • 我有一个处理大量文件的程序,其中每个文件需要做两件事:首先,读取并处理文件的一部分,然后存储生成的MyFileData。第一部分可以并行,第二部分不能并行。 按顺序做每件事都非常慢,因为CPU必须等待磁盘,然后工作一点,然后发出另一个请求,然后再次等待。。。 我做了以下事情 这很有帮助。然而,我想改进两件事: > 以固定顺序执行,而不是首先处理任何可用的结果。如何更改它? 有数千个文件需要处理,启

  • null 问题1:Spark如何并行处理? 我想大部分的执行时间(99%?)上面的解决方案是从USB驱动器中读取1TB文件到Spark集群中。从USB驱动器读取文件是不可并行的。但是在读取整个文件之后,Spark在底层做了什么来并行处理呢? > 有多少节点用于创建DataFrame?(也许只有一个?) 假设Snappy压缩的Parquet文件小10倍,大小=100GB,HDFS块大小=128 MB

  • MaxMesssAgesPerPoll 线程(10) 聚合器 但是我在这里与聚合器没有任何关系,只需要从一个远程位置处理每个文件,处理它,然后将它们放在另一个远程位置。

  • 我通过Julia使用GLPK,我需要反复优化同一个GLPK。Prob。每次优化之间的变化是变量的某些组合固定为0 简单的放入伪代码 当我运行这个程序时,看起来CPU1就像一个调度器,保持在9-11%的范围内,CPU3和CPU4上的负载在0和100%之间交替,尽管从来没有同时发生过。。。CPU2上的负载保持在0% 这可能需要一点时间,我想使用所有的核心 然而,使用Julia的并行功能有点麻烦,尤其是

  • 问题内容: 我有一个SQL查询,给出正确的结果,但执行速度太慢。 该查询对以下三个表进行操作: 包含许多客户数据,例如姓名,地址,电话等。为简化表格,我仅使用名称。 包含某些自定义(而非客户)数据。(表是用软件创建的,这就是为什么该表的复数形式是错误的) 将自定义数据与客户相关联。 顾客 自订资料 customercustomdatarels (客户数据和自定义数据之间的关系-具有相应的值) 我想

  • 我有一个用于将实时数据移动到测试环境中的事务数据置乱的过程。该表包含大约一亿行,分布在50个分区中。每月添加一个新分区。随着音量的增加,过程的执行速度比以前慢。 我正在考虑在我的代码中引入某种程度的并行化。这是一个新领域,我想知道是否有任何最佳实践。也许使用dbms_parallel_execute将更新拆分为块? 任何关于如何优化我的代码的建议都非常感谢! 编辑,我的解决方案基于以下反馈:重写部