我正试图找到一种很好的并行化代码的方法,对大数据集进行处理,然后将结果数据导入RavenDb。
数据处理是CPU绑定和数据库导入IO绑定。
我正在寻找一种在环境上并行处理的解决方案。ProcessorCount线程数。然后应将生成的数据导入到与上述进程并行的x(假设10)池化线程上的RavenDb中。
这里的主要问题是我希望在导入完成的数据时继续处理,以便在等待导入完成时继续处理下一个数据集。
另一个问题是每个批次的内存需要在成功导入后丢弃,因为私有工作内存很容易到达
下面的代码是我目前掌握的代码。请注意,它没有满足上述并行化要求。
datasupplier.GetDataItems()
.Partition(batchSize)
.AsParallel()
.WithDegreeOfParallelism(Environment.ProcessorCount)
.ForAll(batch =>
{
Task.Run(() =>
{
...
}
}
GetDataItem生成可枚举的数据项,这些数据项被划分到批处理数据集中。GetDataItem将产生约2000000个项目,每个项目平均处理时间约为0.3ms。
该项目在最新的. NET 4.5 RC上运行。x64平台。
使现代化
我当前的代码(如上所示)将获取项目并将其分批分区。每个批在八个线程上并行处理(i7上的Environment.ProcessorCount)。处理速度慢,cpu受限,内存密集。当单个批处理完成时,将启动一个任务,将结果数据异步导入RavenDb。批量导入作业本身是同步的,看起来像:
using (var session = Store.OpenSession())
{
foreach (var data in batch)
{
session.Store(data);
}
session.SaveChanges();
}
这种方法存在一些问题:
>
每次批处理完成时,都会启动一个任务来运行导入作业。我想限制并行运行的任务数量(例如最大10个)。此外,即使许多任务已经启动,它们似乎永远不会并行运行。
内存分配是一个巨大的问题。处理/导入批次后,它似乎仍保留在内存中。
我正在寻找解决上述问题的方法。理想情况下,我想要:
解决方案
var batchSize = 10000;
var bc = new BlockingCollection<List<Data>>();
var importTask = Task.Run(() =>
{
bc.GetConsumingEnumerable()
.AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.ForAll(batch =>
{
using (var session = Store.OpenSession())
{
foreach (var i in batch) session.Store(i);
session.SaveChanges();
}
});
});
var processTask = Task.Run(() =>
{
datasupplier.GetDataItems()
.Partition(batchSize)
.AsParallel()
.WithDegreeOfParallelism(Environment.ProcessorCount)
.ForAll(batch =>
{
bc.Add(batch.Select(i => new Data()
{
...
}).ToList());
});
});
processTask.Wait();
bc.CompleteAdding();
importTask.Wait();
我最近构建了类似的东西,我在Parallel.Foreach中使用了Queue类vs List。我发现太多的线程实际上减慢了速度,这是一个最佳点。
对于每个批次,您都要开始一项任务。这意味着您的循环完成得非常快。它会留下(批次数)任务,而这些任务不是您想要的。您需要(CPU数量)。
解决方案:不要为每个批次启动新任务。for循环已经并行。
为了回应您的评论,这里有一个改进的版本:
//this runs in parallel
var processedBatches = datasupplier.GetDataItems()
.Partition(batchSize)
.AsParallel()
.WithDegreeOfParallelism(Environment.ProcessorCount)
.Select(x => ProcessCpuBound(x));
foreach (var batch in processedBatches) {
PerformIOIntensiveWorkSingleThreadedly(batch); //this runs sequentially
}
总体而言,您的任务听起来像是生产者-消费者工作流。批处理程序是生产者,RavenDB数据“导入”是生产者输出的消费者。
考虑使用BlockingCollection
批处理程序生产者可以全速运行,并且始终与处理先前完成的批的db导入器任务并行运行。如果您担心批处理程序可能远远领先于数据库导入程序(b/c数据库导入比处理每个批要长得多),您可以设置阻塞集合的上限,以便生产者在添加超过该限制时阻塞,从而给消费者一个追上的机会。
不过,你的一些评论令人担忧。启动任务实例以异步执行db导入到批处理并没有什么特别的错误。任务!=线创建新任务实例的开销与创建新线程的开销不同。
不要过于精确地控制线程。即使您指定您想要的存储桶数量与您拥有的核心数量完全相同,您也不能独占使用这些核心。来自其他进程的数百个其他线程仍将安排在您的时间片之间。使用任务指定逻辑工作单元,并让TPL管理线程池。避免因错误的控制感而受挫;
在你的评论中,你指出你的任务似乎没有异步运行(你是如何确定的?)并且在每批完成后内存似乎不会释放。我建议放弃所有内容,直到你能先弄清楚这两个问题的原因。你是不是忘记在某个地方调用Dispose()了?你是否保留了一个引用,该引用不必要地保持了一整棵对象树的活动?你测量的是正确的事情吗?并行任务是否由阻塞数据库或网络I/O序列化?在这两个问题解决之前,你的并行计划是什么并不重要。
本文向大家介绍数据并行与任务并行,包括了数据并行与任务并行的使用技巧和注意事项,需要的朋友参考一下 数据并行 数据并行意味着在每个多个计算核心上并发执行同一任务。 让我们举个例子,对大小为N的数组的内容求和。对于单核系统,一个线程将简单地对元素[0]求和。。。[N-1]。但是,对于双核系统,在核0上运行的线程A可以对元素[0]求和。。。[N / 2-1],而在核心1上运行的线程B可以求和元素[N
问题内容: 我有以下使用类的课程。所以我想做的是,在运行cp1实例处理方法的同时,我要并行运行。 但是,我要按顺序cp1,所以我要它运行并完成,如果cp2没有完成或失败,那就很好。如果确实失败,我想加入结果。该示例中未返回任何内容,但我想返回结果。 为此,应该使用TaskExecutor吗?还是线程? 我只希望cp2与cp1并行运行。或者,如果我添加更多内容,例如说cp3,我希望它也可以与cp1并
问题内容: 我有一些我想在JS中做的资源密集型任务。对于这个问题,让我们假设它们是一些繁重的计算,而不是系统访问。现在,我想同时运行任务A,B和C,并在完成后执行一些功能D。 该异步库为此提供了一个很好的脚手架: 如果我正在做的只是计算,那么它将仍然同步运行(除非库将任务本身放在不同的线程上,我希望情况并非如此)。我如何使它实际上是平行的?异步代码通常不阻止调用者的事情是什么(使用NodeJS时)
假设我有几个任务要在Java中并行运行。每个任务要么返回成功,要么返回失败。每个任务都有一个相关的截止日期。如果任务未在截止日期前完成,它将被中断(所有任务都可中断)并返回失败。 如果其中一个任务失败(即返回失败),我们将中断所有仍在运行的其他任务。 我们应该等到所有任务都完成,最后如果所有任务都返回成功,则返回成功;如果至少有一个任务返回失败,则返回失败。 你将如何实施它?我将使用util。同时
问题内容: 我正在编写一个新的Jenkins管道,并具有一组最终要并行运行的步骤。但是,在开发此管道时,我想强制其顺序运行。我没有看到任何指定并行步骤使用的线程数或类似方法的方法。这是到目前为止的基本代码: 我希望能够依次运行这些Shell脚本而无需更改很多代码。 问题答案: 而不是您可以这样使用:
问题内容: 我正在使用python 2.7,我有一些看起来像这样的代码: 此处唯一的依赖项如下:dependent1需要等待任务1-3,Dependent2需要等待任务4-6,而dependent3需要等待依赖项1-2 …以下是可以的:首先运行全部6个任务并行,然后是前两个从属。 我希望尽可能多的任务并行运行,我已经在Google上搜索了一些模块,但是我希望避免使用外部库,并且不确定队列线程技术如