当前位置: 首页 > 知识库问答 >
问题:

Runnable的ExecutorService,处理批量ArrayList未完成处理

酆晔
2023-03-14

编辑:谢谢你,马克,对于那些有类似问题的人,我的问题是我首先创建了runnable类的线程实例,然后将线程提交给executorservice
它帮助我弄清楚,实际上,当我使用ExecutorService时,如果存在未捕获的异常;它不会通知您,它将取消该过程,而不会通知您。这就是为什么我得到了不完整的处理。

我有一个对象的ArrayList,我想对其进行多线程批量处理,但要限制在给定时间运行的线程数。我发现ExecutorService可以处理这个问题。但在测试它是否正在处理每个记录时,它似乎只处理我传递给它的对象的一小部分。

编辑:我已经删除了它的多线程部分,并且像正常一样处理对象而不使用执行器服务,在小批量(只有710)上,它工作正常;线程是否有可能完成得太快并且处理不正确?这意味着通常一次处理300k-800k记录;这就是为什么我想多线程它。

public void processContainerRecords(ArrayList<? extends ContainerRecord> records) {
    int cores = Runtime.getRuntime().availableProcessors();
    ExecutorService executor = Executors.newFixedThreadPool(cores);
    int batchSize = Settings.LOGIC_BATCH_SIZE;//100
    int batches = (int) Math.ceil((double) records.size() / (double) batchSize);

    ArrayList<Future<?>> threads = new ArrayList<Future<?>>();
    LogicProcessor newHandler = null;
    for (int startIndex = 0; startIndex < records.size(); startIndex += batchSize + 1) {
        if (records.size() < batchSize) {
            newHandler = new LogicProcessor(mainGUI, records.subList(startIndex, records.size()));
        } else {
            int bound = (startIndex + batchSize);
            if (bound > records.size()) {
                bound = records.size();
            }
            newHandler = new LogicProcessor(mainGUI, records.subList(startIndex, bound));
        }
        Thread newThread = new Thread(newHandler);
        Future<?> f = executor.submit(newThread);
        threads.add(f);
    }
    executor.shutdown();
    int completedThreads = 0;
    while (!executor.isTerminated()) {//monitors threads and waits until completion
        completedThreads = 0;
        for (Future<?> f : threads) {
            if (f.isDone()) {
                completedThreads++;
            }
        }
        //currentProgress = completedThreads;
    }

    for (ContainerRecord record : records) {//checks if each record has been processed
        System.out.println(record.getContainer() + ":" + record.isTouched());
    }
}

这是LogicProcessor类,它启动其线程实例

    private List<? extends ContainerRecord> archive;
private GUI mainGUI;

public LogicProcessor(GUI mainGUI, List<? extends ContainerRecord> records) {
    this.mainGUI = mainGUI;
    this.archive = records;
}

@Override
public void run() {
    handleLogic();
}

private void handleLogic() {
    Iterator iterator = archive.iterator();
    while (iterator.hasNext()) {
        ContainerRecord record = (ContainerRecord) iterator.next();
        record.touch();//sets a boolean in the object to validate if it has been processed yet.
    }
}

输出:在处理的710条记录(对象)中,有691条从未处理/触摸过,只有19条。

这是怎么回事?我尝试了很多方法,甚至制作了一个类LogicProcessor的数组,并将实例保留在数组中,以避免任何类型的GC删除实例。我不确定它为什么不处理这些记录。

共有1个答案

习胤运
2023-03-14

我现在没有电脑来运行测试,但从查看您的代码来看,因此我的答案基于个人经验,不能将其视为代码审查,因为缺乏代码清晰度是错误的根源:)

>

  • 不要将新线程提交到executor服务中。执行器的全部目的是向用户隐藏带有线程的单词。相反,您的逻辑处理器应该根据是否要返回值来实现可运行/可调用接口。

    再次检查按批次划分的逻辑。如果您使用guava,它已经有了分区逻辑的实现。请参见本教程。我承认这更多是个人喜好,您的代码可能也不错,我还没有深入检查。

    关闭方法和未来处理可能会简化。

    调用shutdown方法会导致executor服务停止接受新的任务来执行,但它不会立即关闭服务,而是会等待它已经拥有的所有任务都将被执行。通常,这样的线程池是在应用程序生命周期的开始创建的,并且只要应用程序运行,线程池就一直处于活动状态。创建池非常昂贵,因为它分配线程。

    如果希望保持池的打开状态,但确保所有任务都已完成,则可以使用循环来迭代未来。

    所以我真的看不出有什么理由两者兼用。如果分配池只是为了提交一系列任务,那么调用shutdown就足够了。否则,您可以使用循环并将池视为全局对象,并在其他地方调用关闭,正如我上面解释的那样。

  •  类似资料:
    • 主要内容:命令行参数,set命令,使用数字值,局部与全局变量,使用环境变量批处理文件中有两种类型的变量。 其中一个参数是在调用批处理文件时可以传递的参数,另一个是通过命令完成的。 命令行参数 批处理脚本支持命令行参数的概念,其中参数可以在被调用时传递给批处理文件。参数可以通过变量,,等从批处理文件中调用。 以下示例显示了一个批处理文件,它接受3个命令行参数,并将它们回显到命令行屏幕。 如果上面的批处理脚本存储在一个名为的文件中,我们将运行该批处理 - 以下是批处理文件执

    • 主要内容:使用Statement对象进行批处理,使用PrepareStatement对象进行批处理批量处理允许将相关的SQL语句分组到批处理中,并通过对数据库的一次调用来提交它们,一次执行完成与数据库之间的交互。 一次向数据库发送多个SQL语句时,可以减少通信开销,从而提高性能。 不需要JDBC驱动程序来支持此功能。应该使用方法来确定目标数据库是否支持批量更新处理。如果JDBC驱动程序支持此功能,该方法将返回。 ,和的方法用于将单个语句添加到批处理。 用于执行组成批量的所有语句。 返回一个整数

    • 我的数据库中有大约1000万个blob格式的文件,我需要转换并以pdf格式保存它们。每个文件大小约为0.5-10mb,组合文件大小约为20 TB。我正在尝试使用spring批处理实现该功能。然而,我的问题是,当我运行批处理时,服务器内存是否可以容纳那么多的数据?我正在尝试使用基于块的处理和线程池任务执行器。请建议运行作业的最佳方法是否可以在更短的时间内处理如此多的数据

    • 我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。

    • 我最近开始使用java配置方式编写spring批处理程序,并使用spring批处理和starter包。我使用了分区的步骤和任务执行器来完成我的工作,我面临的问题是,一旦作业完成,批处理过程就不会停止,它一直在我的eclipse和Linux盒子中运行。我手动找到并终止作业。你能帮个忙吗。当我在没有分区步骤的情况下以单线程的方式运行作业时,这工作很好。 我的作业配置: