当前位置: 首页 > 知识库问答 >
问题:

CPU受限任务的并行化与IO受限任务的并行化

解翰采
2023-03-14

我正试图找到一种很好的并行化代码的方法,对大数据集进行处理,然后将结果数据导入RavenDb。

数据处理是CPU绑定和数据库导入IO绑定。

我正在寻找一种在环境上并行处理的解决方案。ProcessorCount线程数。然后应将生成的数据导入到与上述进程并行的x(假设10)池化线程上的RavenDb中。

这里的主要问题是我希望在导入完成的数据时继续处理,以便在等待导入完成时继续处理下一个数据集。

另一个问题是每个批次的内存需要在成功导入后丢弃,因为私有工作内存很容易到达

下面的代码是我目前掌握的代码。请注意,它没有满足上述并行化要求。

datasupplier.GetDataItems()
    .Partition(batchSize)
    .AsParallel()
    .WithDegreeOfParallelism(Environment.ProcessorCount)
    .ForAll(batch =>
    {
        Task.Run(() =>
        {
            ...
        }
    }

GetDataItem生成可枚举的数据项,这些数据项被划分到批处理数据集中。GetDataItem将产生约2000000个项目,每个项目平均处理时间约为0.3ms。

该项目在最新的. NET 4.5 RC上运行。x64平台。

使现代化

我当前的代码(如上所示)将获取项目并将其分批分区。每个批在八个线程上并行处理(i7上的Environment.ProcessorCount)。处理速度慢,cpu受限,内存密集。当单个批处理完成时,将启动一个任务,将结果数据异步导入RavenDb。批量导入作业本身是同步的,看起来像:

using (var session = Store.OpenSession())
{
    foreach (var data in batch)
    {
        session.Store(data);
    }
    session.SaveChanges();
}

这种方法存在一些问题:

>

  • 每次批处理完成时,都会启动一个任务来运行导入作业。我想限制并行运行的任务数量(例如最大10个)。此外,即使许多任务已经启动,它们似乎永远不会并行运行。

    内存分配是一个巨大的问题。处理/导入批次后,它似乎仍保留在内存中。

    我正在寻找解决上述问题的方法。理想情况下,我想要:

    • 每个逻辑处理器一个线程,处理大量数据
    • 大约十个并行线程等待完成的批导入RavenDb
    • 将内存分配保持在最小值,这意味着在导入任务完成后取消批分配
    • 不在其中一个线程上运行导入作业以进行批处理。已完成批次的导入应与正在处理的下一批并行运行

    解决方案

    var batchSize = 10000;
    var bc = new BlockingCollection<List<Data>>();
    var importTask = Task.Run(() =>
    {
        bc.GetConsumingEnumerable()
            .AsParallel()
            .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
            .WithMergeOptions(ParallelMergeOptions.NotBuffered)
            .ForAll(batch =>
            {
                using (var session = Store.OpenSession())
                {
                    foreach (var i in batch) session.Store(i);
                    session.SaveChanges();
                }
            });
    });
    var processTask = Task.Run(() =>
    {
        datasupplier.GetDataItems()
            .Partition(batchSize)
            .AsParallel()
            .WithDegreeOfParallelism(Environment.ProcessorCount)
            .ForAll(batch =>
            {
                bc.Add(batch.Select(i => new Data()
                {
                    ...
                }).ToList());
            });
    });
    
    processTask.Wait();
    bc.CompleteAdding();
    importTask.Wait();
    
  • 共有3个答案

    柯曜文
    2023-03-14

    我最近构建了类似的东西,我在Parallel.Foreach中使用了Queue类vs List。我发现太多的线程实际上减慢了速度,这是一个最佳点。

    夏谦
    2023-03-14

    对于每个批次,您都要开始一项任务。这意味着您的循环完成得非常快。它会留下(批次数)任务,而这些任务不是您想要的。您需要(CPU数量)。

    解决方案:不要为每个批次启动新任务。for循环已经并行。

    为了回应您的评论,这里有一个改进的版本:

    //this runs in parallel
    var processedBatches = datasupplier.GetDataItems()
        .Partition(batchSize)
        .AsParallel()
        .WithDegreeOfParallelism(Environment.ProcessorCount)
        .Select(x => ProcessCpuBound(x));
    
    foreach (var batch in processedBatches) {
     PerformIOIntensiveWorkSingleThreadedly(batch); //this runs sequentially
    }
    
    柴声
    2023-03-14

    总体而言,您的任务听起来像是生产者-消费者工作流。批处理程序是生产者,RavenDB数据“导入”是生产者输出的消费者。

    考虑使用BlockingCollection

    批处理程序生产者可以全速运行,并且始终与处理先前完成的批的db导入器任务并行运行。如果您担心批处理程序可能远远领先于数据库导入程序(b/c数据库导入比处理每个批要长得多),您可以设置阻塞集合的上限,以便生产者在添加超过该限制时阻塞,从而给消费者一个追上的机会。

    不过,你的一些评论令人担忧。启动任务实例以异步执行db导入到批处理并没有什么特别的错误。任务!=线创建新任务实例的开销与创建新线程的开销不同。

    不要过于精确地控制线程。即使您指定您想要的存储桶数量与您拥有的核心数量完全相同,您也不能独占使用这些核心。来自其他进程的数百个其他线程仍将安排在您的时间片之间。使用任务指定逻辑工作单元,并让TPL管理线程池。避免因错误的控制感而受挫;

    在你的评论中,你指出你的任务似乎没有异步运行(你是如何确定的?)并且在每批完成后内存似乎不会释放。我建议放弃所有内容,直到你能先弄清楚这两个问题的原因。你是不是忘记在某个地方调用Dispose()了?你是否保留了一个引用,该引用不必要地保持了一整棵对象树的活动?你测量的是正确的事情吗?并行任务是否由阻塞数据库或网络I/O序列化?在这两个问题解决之前,你的并行计划是什么并不重要。

     类似资料:
    • 本文向大家介绍数据并行与任务并行,包括了数据并行与任务并行的使用技巧和注意事项,需要的朋友参考一下 数据并行 数据并行意味着在每个多个计算核心上并发执行同一任务。 让我们举个例子,对大小为N的数组的内容求和。对于单核系统,一个线程将简单地对元素[0]求和。。。[N-1]。但是,对于双核系统,在核0上运行的线程A可以对元素[0]求和。。。[N / 2-1],而在核心1上运行的线程B可以求和元素[N

    • 问题内容: 我有以下使用类的课程。所以我想做的是,在运行cp1实例处理方法的同时,我要并行运行。 但是,我要按顺序cp1,所以我要它运行并完成,如果cp2没有完成或失败,那就很好。如果确实失败,我想加入结果。该示例中未返回任何内容,但我想返回结果。 为此,应该使用TaskExecutor吗?还是线程? 我只希望cp2与cp1并行运行。或者,如果我添加更多内容,例如说cp3,我希望它也可以与cp1并

    • 问题内容: 我有一些我想在JS中做的资源密集型任务。对于这个问题,让我们假设它们是一些繁重的计算,而不是系统访问。现在,我想同时运行任务A,B和C,并在完成后执行一些功能D。 该异步库为此提供了一个很好的脚手架: 如果我正在做的只是计算,那么它将仍然同步运行(除非库将任务本身放在不同的线程上,我希望情况并非如此)。我如何使它实际上是平行的?异步代码通常不阻止调用者的事情是什么(使用NodeJS时)

    • 假设我有几个任务要在Java中并行运行。每个任务要么返回成功,要么返回失败。每个任务都有一个相关的截止日期。如果任务未在截止日期前完成,它将被中断(所有任务都可中断)并返回失败。 如果其中一个任务失败(即返回失败),我们将中断所有仍在运行的其他任务。 我们应该等到所有任务都完成,最后如果所有任务都返回成功,则返回成功;如果至少有一个任务返回失败,则返回失败。 你将如何实施它?我将使用util。同时

    • 问题内容: 我正在编写一个新的Jenkins管道,并具有一组最终要并行运行的步骤。但是,在开发此管道时,我想强制其顺序运行。我没有看到任何指定并行步骤使用的线程数或类似方法的方法。这是到目前为止的基本代码: 我希望能够依次运行这些Shell脚本而无需更改很多代码。 问题答案: 而不是您可以这样使用:

    • 问题内容: 我正在使用python 2.7,我有一些看起来像这样的代码: 此处唯一的依赖项如下:dependent1需要等待任务1-3,Dependent2需要等待任务4-6,而dependent3需要等待依赖项1-2 …以下是可以的:首先运行全部6个任务并行,然后是前两个从属。 我希望尽可能多的任务并行运行,我已经在Google上搜索了一些模块,但是我希望避免使用外部库,并且不确定队列线程技术如