原则35:理解 PLINQ 并行算法的实现

优质
小牛编辑
140浏览
2023-12-01

在这个原则我希望我能说并行编程现在和在你循环添加 AsParall() 一样简单。虽然不是,但是 PLINQ 使得更容易利用多核并且程序仍然是正确的。创建多和程序绝不是微不足道,但 PLINQ 使得它更简单。

你已经理解访问数据必须是同步的。你仍需要衡量下声明在 ParallEnumerable 的并行和串行版本的方法的影响。LINQ 涉及的一些方法很容易就可以并行执行。其他要强制串行访问序列的每个元素——或者,至少,需要完整的序列(比如 Sort )。让我们通过一些使用 PLINQ 的例子学习是在怎么工作的,并且那些地方还存在缺陷。这节的所有例子和讨论都是对 Object 使用 LINQ 。标签甚至可以叫做 “ Enumerable ” 而不是“ Queryable ”。 PLINQ 不会帮助你 SQL 的 LINQ ,或者是实体框架的算法的并行执行。这不是一个真正的限制特性,因为这些的实现利用并行数据库引擎来执行并行查询。

这个例子是很简单的查询,使用语法方法调用对前150个计算 n! :

var nums = data.Where(m => m < 150). Select(n => Factorial(n));

你在查询第一个函数调用增加 AsParallel() 使得查询是并行的:

var numsParallel = data.AsParallel().Where(m => m < 150).Select(n => Factorial(n));

当然,你可以使用查询语法做类似的工作。

var nums = from n in data where n < 150 select Factorial(n);

并行的版本依赖于 data 序列增加的 AsParallel() :

var numsParallel = from n in data.AsParallel() where n < 150 select Factorial(n);

这个结果是调用方法的版本是一样的。

第一个例子很简单,但通过你调用的方法 PLINQ.AsParallel() 选择并行执行任何查询表达式说了几个重要的概念。一旦你调用 AsParallel() ,在多核就是使用 多线程 Thread.AsParallel() 分成子序列操作进行并且返回 IParallelEnumerable() 而不是 IEnumerable() 。 PLINQ 就是 IParallelEnumerable 实现的扩展方法集。它们大多数都和扩展 IEnumerable 的 Enumerable 类的扩展方法有一样的签名。IParallelEnumerable 只是简单替换 Enumerable 的参数和返回值。这样做的优势是 PLINQ 遵循所有 LINQ 遵循的模式。这使得 PLINQ 非常容易学习。你对 LINQ 的任何掌握,通常,都可以应用于 PLINQ 。

当然,这也不是那么简单。初始查询很容就能应用 PLINQ 。它没有任何共享数据。结果的次序没有任何影响。这就是为什么代码运行获得的加速直接和机器的核心数成正比。为了帮助 PLINQ 获得最好的性能,IParallelEnumerable 有很多控制并行任务类库函数。

每个并行查询都是从分区步骤开始。 PLINQ 需要对输入元素进行分区并分配它们一些任务去执行查询。分区是 PLINQ 最重要的一方面,所以理解 PLINQ 不同的方法, PLINQ 决定使用哪个方法,和每个方法都是如何工作的是非常重要的。首先,分区不能花太多时间。如果 PLINQ 类库花费太多时间在分区上,那么留给处理数据的时间就太少了。 PLINQ 会根据输入源和创建的查询类型决定使用不同的分区算法。最简单分区算法是范围分区。范围分区会根据任务数量划分输入序列并将每部分分配给一个任务。例如,1000个元素的序列运行在四核的机器上会被分为每250的四部分。范围分区只有当查询数据源支持索引序列和报告了序列的长度才会被使用。也就是说范围分区被限制在像 List<T> 数组,和其他支持 IList<T> 的序列的查询数据源。当查询数据源支持这些操作范围分区经常被使用。

第二个分区选择是块分区。这个算法给每个任务分配一个输入元素块,并且这会需要更多功夫。内部的块算法会与时俱进,所以我不会深入涉及当前的实现。你会认为块的开始会很小,因为输入源可能很小。这样就可以防止一个任务处理整个小序列。你也会认为随着工作的持续,块的大小会变大。这可以最小化线程的开销和有助于最大化吞吐量。块也有可能根据查询委托和 where 子句过滤的元素花费的时间调整大小。这样的目的是所有的任务能在接近的时间完成,也就是最大化整体的吞吐量。

另外两个分区方案会优化某些查询操作。第一个是带分区。带分区是范围分区的特例,它优化处理序列开始的元素。每个工作线程都跳过前面N项然后处理后面的M项。在处理M项之后,工作线程会接着跳过下N项。带算法很容易理解,假设带宽为1个元素。在四个工作任务的情况下,第一个任务获得项的下标为0,4,8,12等等,第二个任务获得项下标为1,5,9,13等等。整个查询过程,带算法避免任何线程内部的同步实现 TakeWhile() 和 SkipWhile() 。同时,每个工作线程移到下一个元素只需要很简单算术运算。

最后的算法是哈希分区。哈希分区是针对 Join , GroupJoin ,GroupBy ,Distinct ,Except ,Union ,和 Intersect 操作的查询设计的特殊目标的算法。这些耗时的操作,特定的分区算法可以使得这些查询更大的并行。Hash 算法保证所有产生的相同的哈希码被同一个任务处理。这就最小化任务之间的处理这些操作的交流。

除了分区算法, PLINQ 还是用三个不同的算法并行你的代码的任务: Pipelining , Stop & Go 以及 Inverted Enumeration 。 Pipelining 是默认的,所以我首先进行解释。 Pipelining 算法,一个线程处理枚举( foreach 或者 查询序列)。多个线程用来处理处理每个元素的查询。每个请求的新元素,都会在其他线程被处理。 PLINQ 在 Pipelining 模式使用的线程数经常是核心的数量(对于大多基于 CPU 查询)。在我的阶乘的例子,它在我的双核机器上使用两个线程。第一个元素被检索并在一个线程处理。紧接着第二个元素会被请求并在第二个线程处理。然后,当这两个有一个完成,第三个元素就会被请求,并且查询表达式会在这个线程处理。整个序列查询的执行过程中,两个线程都忙于元素的查询。机器愈多核,更多元素会被并行处理。

例如,在16核的机器上,前16项会在16不同线程上立即处理(推测运行在16个不同核心上)。我已经进行简化了,有一个线程处理枚举,这就说明通常 Pipelining 创建(核心数+1)的线程。在大多数情况下,枚举线程会等很长时间,所以创建额外线程是有意义的。

Stop & Go 算法意味着枚举的线程会加入到其他线程运行查询表达式。当你需要立即执行 ToList() 或 ToArray() 和任何时候 PLINQ 排序之前需要所有结果集等查询时,这个算法会使用。下面两个查询都使用 Stop & Go 算法:

var stopAndGoArray = (from n in data.AsParallel() 
    where n < 150 
    select Factorial(n)).ToArray();
var stopAndGoList = (from n in data.AsParallel() 
    where n < 150 
    select Factorial(n)).ToList();

使用 Stop & Go 算法处理会占用更多内存并获得更好的性能。然而,注意我在进行任何查询表达式之前已经构造整个查询。你要构成整个查询而不是处理每部分都使用 Stop & Go 算法然后在将最后的结果组合起来进行其他查询。这往往会引起引起线程开销而降低性能。像一个组合操作一样处理整个查询表达式总是一个更好的选择。

并行任务库使用的最后一个算法是倒计数算法。 Inverted Enumeration 不会产生任何结果集。而是每个查询的结果执行某些行为。在我前面的例子章,我向控制台输出阶乘计算的结果:

var numsParallel = from n in data.AsParallel() 
    where n < 150 
    select Factorial(n);
foreach (var item in numsParallel)
    Console.WriteLine(item);

Object 的 LINQ (非并行)被认为是非常懒的。也就是只有请求了才会产生值。当你处理查询结果时,可以选择并行执行模型。这就是为什么你会需要 Inverted Enumeration 模型:

var nums2 = from n in data.AsParallel() 
    where n < 150 
    select Factorial(n);
nums2.ForAll(item => Console.WriteLine(item));

Inverted Enumeration 比 Stop & Go 方法使用更少内存。同时,它可以在结果集上并行操作。注意你在 ForAll() 查询仍需要使用 AsParallel() 。 ForAll() 比在 Stop & Go 模型占用更少内存。在某些情况,依赖于查询表达式的结果集上的行为的工作量,通常 Inverted Enumeration 会并枚举方法更快。

所有的 LINQ 查询的执行都很懒。你创建的查询,这些查询只有在请求查询结果的项时才执行。对于 Object 的 LINQ 有改进。Object 的 LINQ 在你需求元素时才对每个元素执行查询。 PINQ 进行不同的工作。它们的模型跟 SQL 的 LINQ ,或者实体框架很接近。在这些模型,当你请求第一个元素,整个结果序列都会产生。 PLINQ 跟这些模型很接近,但又不全相同。如果你对 PLINQ 怎样执行查询有误解,你会使用超过必须的资源,这样你实际的并行查询运行在多核机器上比 Object 的 LINQ 更慢。

为了演示一些区别,我针对给出简单的查询。我会给你展示增加的 AsParallel() 怎样改变执行模型。两个模型都是有效的。 LINQ 关注的它的结果,而不是它们是如何产生的。你将看到两个模型都产生一样的结果。如果你算法对查询自己有因为区别就会显现出来。

下面的查询用来演示区别:

var answers = from n in Enumerable.Range(0, 300) 
    where n.SomeTest() 
    select n.SomeProjection();

我会输出显示 SomeTest() 和 SomeProjection() 方法的调用:

public static bool SomeTest(this int inputValue) 
{
    Console.WriteLine("testing element: {0}", inputValue); 
    return inputValue % 10 == 0;
}
public static string SomeProjection(this int input) 
{
    Console.WriteLine("projecting an element: {0}", input); 
    return string.Format("Delivered {0} at {1}",input.ToString(), DateTime.Now.ToLongTimeString());
}

最后,用一个简单的循环,我使用 IEnumerator<string> 成员对结果进行遍历,可以看到不同行为的发生。这就对序列的产生(并行)和枚举(在枚举循环)显示的更清晰。在生成的代码,我更喜欢不同的实现。

var iter = answers.GetEnumerator();
Console.WriteLine("About to start iterating"); 
while (iter.MoveNext()) 
{
    Console.WriteLine("called MoveNext"); 
    Console.WriteLine(iter.Current);
}

使用标准的 Object 的 LINQ 实现,你会看到像下面的输出:

About to start iterating 
testing element: 0 
projecting an element: 0 
called MoveNext 
Delivered 0 at 1:46:08 PM 
testing element: 1 
testing element: 2 
testing element: 3 
testing element: 4 
testing element: 5 
testing element: 6 
testing element: 7 
testing element: 8 
testing element: 9 
testing element: 10 
projecting an element: 10 
called MoveNext 
Delivered 10 at 1:46:08 PM
testing element: 11 
testing element: 12 
testing element: 13 
testing element: 14 
testing element: 15 
testing element: 16 
testing element: 17 
testing element: 18 
testing element: 19 
testing element: 20 
projecting an element: 20 
called MoveNext 
Delivered 20 at 1:46:08 PM 
testing element: 21 
testing element: 22 
testing element: 23 
testing element: 24 
testing element: 25 
testing element: 26 
testing element: 27 
testing element: 28 
testing element: 29 
testing element: 30 
projecting an element: 30
testing element: 10 
projecting an element: 10 
called MoveNext 
Delivered 10 at 1:46:08 PM

查询知道第一次枚举的 MoveNext() 的调用开始执行。第一次 MoveNext() 的调用查询了足够的元素检索出第一个结果序列上的元素(恰好是查询的第一个元素)。第二个 MoveNext() 处理输入序列的元素知道下一个输出元素的产生。使用 Object 的 LINQ ,每次调用 MoveNext 执行的查询直到下一个输出元素的产生。

要是你将查询改为并行查询,规则就会改变:

var answers = from n in ParallelEnumerable.Range(0, 300) 
    where n.SomeTest() 
    select n.SomeProjection();

这个查询的输出看起来非常的不一样。下面是运行一次的采样(每次运行可能有些不同):

About to start iterating 
testing element: 150 
projecting an element: 150 
testing element: 0 
testing element: 151 
projecting an element: 0 
testing element: 1 
testing element: 2 
testing element: 3 
testing element: 4 
testing element: 5 
testing element: 6 
testing element: 7 
testing element: 8 
testing element: 9 
testing element: 10 
projecting an element: 10 
testing element: 11 
testing element: 12 
testing element: 13 
testing element: 14 
testing element: 15 
testing element: 16 
testing element: 17 
testing element: 18 
testing element: 19 
testing element: 152 
testing element: 153 
testing element: 154 
testing element: 155 
testing element: 156 
testing element: 157 
testing element: 20 
... Lots more here elided ... 
testing element: 286 
testing element: 287 
testing element: 288 
testing element: 289 
testing element: 290 
Delivered 130 at 1:50:39 PM
called MoveNext
Delivered 140 at 1:50:39 PM 
projecting an element: 290 
testing element: 291 
testing element: 292 
testing element: 293 
testing element: 294 
testing element: 295 
testing element: 296 
testing element: 297 
testing element: 298 
testing element: 299 
called MoveNext 
Delivered 150 at 1:50:39 PM 
called MoveNext 
Delivered 160 at 1:50:39 PM 
called MoveNext 
Delivered 170 at 1:50:39 PM 
called MoveNext 
Delivered 180 at 1:50:39 PM 
called MoveNext 
Delivered 190 at 1:50:39 PM 
called MoveNext 
Delivered 200 at 1:50:39 PM 
called MoveNext 
Delivered 210 at 1:50:39 PM 
called MoveNext 
Delivered 220 at 1:50:39 PM 
called MoveNext 
Delivered 230 at 1:50:39 PM 
called MoveNext 
Delivered 240 at 1:50:39 PM 
called MoveNext 
Delivered 250 at 1:50:39 PM 
called MoveNext 
Delivered 260 at 1:50:39 PM 
called MoveNext 
Delivered 270 at 1:50:39 PM 
called MoveNext 
Delivered 280 at 1:50:39 PM
called MoveNext
Delivered 290 at 1:50:39 PM

注意到多大的改变了吧。第一次 MoveNext() 的调用使得 PLINQ 启动所有参与的线程产生结果。这过程产生更少(在这个例子,几乎所有的)结果对象。后续每次 MoveNext() 的调用都是抓取下来的项都是已经产生好的。你不能断定具体的输入元素什么时候会被处理。你只)知道的是当你请求查询的第一个元素是查询就开始执行(在几个线程

上)。

PLINQ 的方法理解查询语法的行为并影响查询的执行。假设你修改查询使用 Skip() 和 Take() 选择第二页的结果:

var answers = (from n in ParallelEnumerable.Range(0, 300) 
    where n.SomeTest() 
    select n.SomeProjection()). 
    Skip(20).Take(20);

这个查询的执行结果和 Object 的 LINQ 的是相同的。这是因为 PLINQ 知道产生20个元素比300个更快。(我已经简化了,但是 PLINQ 的 Skip() 和 Take() 的实现更倾向于一个连续的算法而不是其他算法)。

你可以对查询再修改一点,而且 PLINQ 还是使用并行执行模型产生所有元素。只是增加 orderby 子句:

var answers = (from n in ParallelEnumerable.Range(0, 300) 
    where n.SomeTest() 
    orderby n.ToString().Length 
    select n.SomeProjection()). 
    Skip(20).Take(20);

orderby 的 lambda 参数一定不能被编译器优化的表达式(这就是为什么上面我使用 n.ToString().Length 而不是 n )。现在,查询引擎必须在排序之前产生所有输出序列的元素。一旦元素被排序之后 Skip() 和 Take() 方法才知道哪些元素会被返回。当然在多核机器上使用多线程产生所有输出比顺序进行更快。 PINQ 也知道这点,所以它会启动多个线程来创建输出。

PLINQ 尝试创建你的写的查询的最好实现,产生你需要的结果的花费最少工作和最少时间。有时 PLINQ 查询会和你期望的不一样的方式执行。有时,它会表现的项 Object 的 LINQ ,请求输出序列的下一项才执行代码产生它。有时,它会的行为更像 SQL 的 LINQ 或实体框架即请求第一个就会产生所有。有时它的行为更像两者的混合。你应该确保你不要引入 LINQ 查询的任何副作用。那些在 PLINQ 执行模型是不可靠的。你构建查询是需要确保你考虑大部分的底层技术。这就需要你理解它们的工作是怎样的不同。

并行算法被 Amdahl 法则限制:使用多处理器的程序被限制在程序的连续执行部分。 ParallelEnumerable 的扩展方法对于这个规则毫无例外。很多方法都并行执行,但是有些因为它们的性质会影响并行的程度。显然 OrderBy 和 ThenBy 需要在任务之间进行协调。 Skip , SkipWhile ,Take 和 TakeWhile 会影响并行程度。并行任务运行在不同的核心上完成的顺序可能不同。你可以使用 AsOrdered() 和 AsUnordered() 指示 PLINQ 是结果序列否受次序的影响。

有时你自己的算法会有副作用并不能并行。你可以使用扩展方法 ParallelEnumerable.AsSequential() 将并行序列解释为 IEnumerable 并强制顺序执行。

最后, ParallelEnumerable 包含允许你控制 PLINQ 执行并行查询的方法。你可以使用 WithExecutionMode() 建议并行执行,即使那会选择高开销的算法。默认情况下, PLINQ 会并行构造那些有并行需求的地方。你可以使用 WithDegreeOfParallelism() 建议在你的算法使用的线程数量。通常, PLINQ会分配和当前机器处理器数量一样多的线程。你可以使用 WithMergeOptions() 请求改变 PLINQ 查询过程中控制的的缓存结果。通常,PLINQ 会缓存每个线程的结果直到它们被消费者线程使用。你可以请求不缓存而是理解使用结果。你也可以请求权缓存,这会增加性能的高时延。默认的,自动缓存,很好的权衡时延和性能。缓存只是一个提示,而不是需求。PLINQ 可能忽略你的请求。

我没有给出这些设置的具体指导,因为最优方法高度依赖于你的算法。然而,你有这些设置可以改变,你可以在不同的目标机器上做实验查看是否会有利于你的算法。如果你没有多个不同的机器去实验,我建议你使用默认值。

PLINQ 使得并行计算比之前更简单。因为这些的增加时间会越来越重要;并行计算随着司空见惯的台式机和比较的核心数增加而越来越重要。这还不是很容易。设计糟糕的算法可能在并行上看不到性能的提高。你的任务就是反复查看这些算法并找出哪些是能并行的。尝试实现那些算法的并行版本。测试结果。和性能更好的算法一起工作。意识到一些算法很难实现并行,就让它们串行。

小结:

原创帖占位,未完待续!

工作中还没有怎么用到过 LINQ 和 PLINQ ,从本原则可以知道 PLINQ 可以实现并行,但是依赖于你的算法,所以既要对 PLINQ 有理解,也要能很好的设计你的算法才是最好的设计!

终于翻译完了(这个原则竟然有12页),太累了,坚持,坚持,再坚持!