当前位置: 首页 > 知识库问答 >
问题:

级联并行流

宣瀚
2023-03-14
int[] result = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0), 
                   Arrays.stream(input2).distinct()).sorted().toArray();

我想加快任务的速度,所以我考虑让流并行。通常,这只是意味着我可以在流构造和终端操作之间的任何地方插入.parallel(),结果将是相同的。IntStream.Concat的JavaDoc表示,如果任何输入流是并行的,结果流将是并行的。因此,我认为使parallear()input1流或input2流或级联流生成相同的结果。

实际上我错了:如果我将.parallel()添加到结果流中,那么输入流似乎仍然是连续的。此外,我可以将输入流(其中任何一个或两个)标记为.paralle(),然后将结果流转换为.sequential(),但输入保持并行。所以实际上有8种可能性:input1、input2和级联流可以并行,也可以不并行:

int[] sss = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
                Arrays.stream(input2).distinct()).sorted().toArray();
int[] ssp = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
                Arrays.stream(input2).distinct()).parallel().sorted().toArray();
int[] sps = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0), 
                Arrays.stream(input2).parallel().distinct()).sequential().sorted().toArray();
int[] spp = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0), 
                Arrays.stream(input2).parallel().distinct()).sorted().toArray();
int[] pss = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
                Arrays.stream(input2).distinct()).sequential().sorted().toArray();
int[] psp = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
                Arrays.stream(input2).distinct()).sorted().toArray();
int[] pps = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
                Arrays.stream(input2).parallel().distinct()).sequential().sorted().toArray();
int[] ppp = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
                Arrays.stream(input2).parallel().distinct()).sorted().toArray();

我对所有版本进行了不同输入大小的基准测试(在Core i5 4xCPU上使用JDK 8U45 64bit,Win7),每个情况下都得到了不同的结果:

Benchmark           (n)  Mode  Cnt       Score       Error  Units
ConcatTest.SSS      100  avgt   20       7.094 ±     0.069  us/op
ConcatTest.SSS    10000  avgt   20    1542.820 ±    22.194  us/op
ConcatTest.SSS  1000000  avgt   20  350173.723 ±  7140.406  us/op
ConcatTest.SSP      100  avgt   20       6.176 ±     0.043  us/op
ConcatTest.SSP    10000  avgt   20     907.855 ±     8.448  us/op
ConcatTest.SSP  1000000  avgt   20  264193.679 ±  6744.169  us/op
ConcatTest.SPS      100  avgt   20      16.548 ±     0.175  us/op
ConcatTest.SPS    10000  avgt   20    1831.569 ±    13.582  us/op
ConcatTest.SPS  1000000  avgt   20  500736.204 ± 37932.197  us/op
ConcatTest.SPP      100  avgt   20      23.871 ±     0.285  us/op
ConcatTest.SPP    10000  avgt   20    1141.273 ±     9.310  us/op
ConcatTest.SPP  1000000  avgt   20  400582.847 ± 27330.492  us/op
ConcatTest.PSS      100  avgt   20       7.162 ±     0.241  us/op
ConcatTest.PSS    10000  avgt   20    1593.332 ±     7.961  us/op
ConcatTest.PSS  1000000  avgt   20  383920.286 ±  6650.890  us/op
ConcatTest.PSP      100  avgt   20       9.877 ±     0.382  us/op
ConcatTest.PSP    10000  avgt   20     883.639 ±    13.596  us/op
ConcatTest.PSP  1000000  avgt   20  257921.422 ±  7649.434  us/op
ConcatTest.PPS      100  avgt   20      16.412 ±     0.129  us/op
ConcatTest.PPS    10000  avgt   20    1816.782 ±    10.875  us/op
ConcatTest.PPS  1000000  avgt   20  476311.713 ± 19154.558  us/op
ConcatTest.PPP      100  avgt   20      23.078 ±     0.622  us/op
ConcatTest.PPP    10000  avgt   20    1128.889 ±     7.964  us/op
ConcatTest.PPP  1000000  avgt   20  393699.222 ± 56397.445  us/op

所以我有以下几个问题:

  1. 是否有关于如何更好地使用与级联流的并行化的官方指南?测试所有可能的组合并不总是可行的(尤其是当连接两个以上的流时),所以有一些“经验法则”是很好的。
  2. 似乎表明,如果我将直接从collection/array创建的流连接起来(在连接之前不执行中间操作),那么结果就不太依赖于parallel()的位置。这是真的吗?
  3. 除了级联之外,是否还有其他情况,结果取决于流管道在哪一点上并行化?

共有1个答案

花阳辉
2023-03-14

规范精确地描述了您得到的结果--考虑到与其他操作不同,我们讨论的不是单个管道,而是三个不同的,它们的属性独立于其他操作。

规范说:“如果输入流中的任何一个是并行的,结果流是[…]并行的。”这就是你得到的;如果任何一个输入流是并行的,那么结果流也是并行的(但是之后可以将其转换为顺序的)。但是,将结果流更改为并行流或序列流不会改变输入流的性质,也不会将并行流和序列流输入到concat中。

关于性能影响,请参阅文档中的“流操作和管道”段落:

有状态操作可能需要在产生结果之前处理整个输入。例如,在看到流的所有元素之前,无法通过对流进行排序产生任何结果。因此,在并行计算下,一些包含有状态中间操作的流水线可能需要对数据进行多次传递,或者可能需要缓冲重要数据。包含排他的无状态中间操作的管道可以在单次传递中处理,无论是顺序的还是并行的,数据缓冲最小。

您已经选择了两个命名的有状态操作并将它们组合在一起。因此,结果流的.sorted()操作需要在开始排序之前缓冲整个内容,这意味着完成了distinct操作。distinct操作显然很难并行化,因为线程必须对已经看到的值进行同步。

所以要回答第一个问题,这不是关于concat,而是关于distinct不能从并行执行中受益。

 类似资料:
  • 问题内容: 这是我的JPA结构: 电影(看级联类型): 评分: RatingId: 当我致电时,我得到了。如果删除级联,则合并调用不会引发错误。哪里可能有问题? 我认为这个问题与复合主键有关。在具有相同一对多关系但没有复合ID的另一个实体上执行时没有错误。 问题答案: StackOverflow是由循环关系引起的。为了避免出现异常,我在多对多表中将键标记为。

  • 问题内容: 有没有一种方法可以将两个主键合并为一个,然后级联更新所有受影响的关系?这是场景: 客户(idCustomer int PK,公司varchar(50)等) CustomerContacts(idCustomerContact int PK,idCustomer int FK,名称varchar(50)等) CustomerNotes(idCustomerNote int PK,idCu

  • 映射器支持可配置的概念 cascade 行为对 relationship() 构造。这是指相对于特定对象在“父”对象上执行的操作 Session 应传播到该关系引用的项(例如“子”对象),并受 relationship.cascade 选择权。 层叠的默认行为仅限于所谓的层叠 保存更新 和 合并 设置。级联的典型“可选”设置是添加 删除 和 删除孤儿 选项;这些设置适用于相关对象,这些对象仅在附加

  • 在这个项目中,我有两个实体A和B,它们都与实体C有OneToOne关系。A和B引用了一些C。 到目前为止,使用CascadeType一切都很好。实体B中字段c上的所有内容。我们现在要删除实体B而不删除实体C。因此,我们将实体B中的级联更改为 现在,删除行为符合预期,但当持久化实体B时,实体C没有持久化级联,相反,我们得到了一个组织。springframework。道。InvalidDataAcce

  • 中间件函数是可以在应用程序的请求 - 响应周期中访问context object和下一个中间件函数的函数。 这些函数用于修改任务的请求和响应对象,例如解析请求主体,添加响应头等。Koa更进一步,产生'downstream' ,然后将控制回流'upstream' 。 此效果称为cascading 。 以下是一个中间件功能的简单示例。 var koa = require('koa'); var app

  • 我没有那么多的冬眠。所以,当反转和级联进入画面时,陷入一个点。我知道,inverse告诉hibernate拥有实体,该实体负责更新它们的关系,Cascade会告诉hibernate首先保存实体,然后保存其从属实体。 我只是想知道是否必须在同一个实体中声明级联="all"和反转="true"。我们可以通过其他实体中的反转和级联属性将一个声明为拥有实体吗?请说明这一点? 谢谢。