当前位置: 首页 > 知识库问答 >
问题:

具有大量任务的执行器服务

戴化
2023-03-14

我有一个文件列表和一个分析这些文件的分析器列表。文件数量可以很大(200,000),分析器数量可以很大(1000)。所以操作总数可能非常大(200,000,000)。现在,我需要应用多线程来加快速度。我采用了这种方法:

ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for (File file : listOfFiles) {
  for (Analyzer analyzer : listOfAnalyzers){
    executor.execute(() -> {
      boolean exists = file.exists();
      if(exists){
        analyzer.analyze(file);
      }
    });
  }
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

但这种方法的问题是,它占用了太多的内存,我想有更好的方法可以做到这一点。我还是java和多线程的初学者。

共有2个答案

赵星华
2023-03-14
匿名用户

一种想法是采用fork/join算法,将项目(文件)分组成批,以便单独处理。

我的建议如下:

>

  • 首先,过滤掉所有不存在的文件——它们不必要地占用资源。
  • 以下伪代码演示了可能对您有所帮助的算法:

    public static class CustomRecursiveTask extends RecursiveTask<Integer {
    
    private final Analyzer[] analyzers;
    
    private final int threshold;
    
    private final File[] files;
    
    private final int start;
    
    private final int end;
    
    public CustomRecursiveTask(Analyzer[] analyzers,
                               final int threshold,
                               File[] files,
                               int start,
                               int end) {
        this.analyzers = analyzers;
        this.threshold = threshold;
        this.files = files;
        this.start = start;
        this.end = end;
    }
    
    @Override
    protected Integer compute() {
        final int filesProcessed = end - start;
        if (filesProcessed < threshold) {
            return processSequentially();
        } else {
            final int middle = (start + end) / 2;
            final int analyzersCount = analyzers.length;
    
            final ForkJoinTask<Integer> left =
                    new CustomRecursiveTask(analyzers, threshold, files, start, middle);
            final ForkJoinTask<Integer> right =
                    new CustomRecursiveTask(analyzers, threshold, files, middle + 1, end);
            left.fork();
            right.fork();
    
            return left.join() + right.join();
        }
    }
    
    private Integer processSequentially() {
        for (int i = start; i < end; i++) {
            File file = files[i];   
            for(Analyzer analyzer : analyzers) { analyzer.analyze(file) };
        }
    
        return 1;
    }
    }
    

    用法如下所示:

     public static void main(String[] args) {
        final Analyzer[] analyzers = new Analyzer[]{};
        final File[] files = new File[] {};
    
        final int threshold = files.length / 5;
    
        ForkJoinPool.commonPool().execute(
                new CustomRecursiveTask(
                        analyzers,
                        threshold,
                        files,
                        0,
                        files.length
                )
        );
    }
    

    请注意,根据约束,您可以操作任务的构造函数参数,以便算法根据文件量进行调整。

    您可以根据文件量指定不同的阈值

    final int threshold;
    if(files.length > 100_000) {
       threshold = files.length / 4;
    } else {
       threshold = files.length / 8;
    }
    

    您还可以根据输入量在< code>ForkJoinPool中指定工作线程的数量。

    测量、调整、修改,你最终会解决问题。

    希望这有帮助。

    更新:

    如果对结果分析不感兴趣,可以用< code>RecursiveTask替换< code>RecursiveAction。伪代码增加了自动装箱的开销。

  • 白浩气
    2023-03-14

    2亿个任务将驻留在哪里?我希望不是在内存中,除非您计划以分布式方式实现您的解决方案。同时,您需要实例化一个不累积大量队列的执行器服务。创建服务时与“调用方运行策略”(请参见此处)一起使用。如果您尝试在队列已满时将另一个任务放入队列,您最终将自己执行它,这可能是您想要的。

    OTOH,现在我更认真地看了你的问题,为什么不同时分析单个文件呢?那么队列永远不会大于分析器的数量。坦白地说,这就是我要做的,因为我想要一个可读的日志,在我加载文件时,每个文件都有一条消息,而且顺序正确。

    我很抱歉没有更有帮助:

    analysts.stream(). map(分析师-

    基本上,为一个文件创建一堆期货,然后在继续前进之前等待所有这些。

     类似资料:
    • 我们有一个Spring+JPA web应用程序。我们使用两个tomcat服务器,它们运行两个应用程序并使用相同的数据库。 我们的应用程序requirmemnt之一是预形成cron调度任务。 谢了!

    • 关于spring batch tasklet与Task-Executor的步骤,我遇到了一个奇怪的问题。配置是正常和简单的,只是一个tasklet(不是面向块的),如下所示: someBean是一个实例实现的Tasklet接口。stange的问题是,当我启动作业时,execute方法调用了两次: 实际上,创建了两个线程,并执行了两次该逻辑。如果将task-executor更改为普通的(org.sp

    • 我有一个数据库结果,每一个调用创建500条记录500条,然后下一个500条,然后下一个 我需要运行一个记录每个不同线程执行特定任务的程序 我举的例子如下 ExecutorService executor=Executors.newFixedThreadPool(10); 我的问题是,在完成当前executer服务之前,it需要获得接下来的500个用户并尝试开始处理,我需要停止该操作,直到处理了前5

    • 我想写一个Gradle任务,在我所有的子项目中共享。此任务在调用它的子项目中查找所有其他类型为“GenerateMavenPom”的任务,并执行这些任务。 通过这样做,我的子项目可以定义他们想要的任何Maven发布,我可以使用“gradle GenerateMavenPomFiles”等单个任务执行gradle来创建pom.xml,而不需要知道每个子项目中的单个发布类型。为什么?因为Maven插件

    • 我无法找到必要的信息,无论是在文档中还是在这里已经存在的问题中,这就是为什么我自己创建了一个(我还不能在类似的帖子下提问)。 我需要知道的是Spring任务执行器和调度器之间的关系。我当前的配置如下所示: 我不确定的是它是如何工作的。“谁”运行我的任务?是调度器,因为任务是和他一起安排的吗?或者调度器只是创建它们,放在队列中,由执行者运行它们? 如果没有,运行的是scheduler,我必须在特定类

    • 我正在使用sping-boot,我有这样一个用例,我想将列表的每个元素提交给执行器服务(线程池大小=4)。但是在每个必须处理的元素之间,我想要1秒钟的延迟。 Thread.sleep(1000)不工作,因为执行程序一睡觉就启动另一个线程。 编辑:这是我的process()方法,我在最后尝试使用sleep,但没有成功。