当前位置: 首页 > 知识库问答 >
问题:

为什么溪流不能减少不平行?/stream已被操作或关闭

郝玄天
2023-03-14

我偶然发现了一个相当烦人的问题:我有一个程序,它有很多数据源,可以流式传输相同类型的元素,而我想要“映射”程序中每个可用的元素(元素顺序不重要)。

由于元素顺序对我来说并不重要,所以我尝试使用.Parallel():Streamoft=StreamofStreamoft.Parallel().reduce(stream.empty(),stream::Concat);来并行化reduce操作,但这会触发java.lang.IllegalStateException:stream已经被操作或关闭

要亲自体验它,只需通过注释/取消注释.parallel()来使用以下main(java 1.8U20)即可

public static void main(String[] args) {
    // GIVEN
    List<Stream<Integer>> listOfStreamOfInts = new ArrayList<>();
    for (int j = 0; j < 10; j++) {
        IntStream intStreamOf10Ints = IntStream.iterate(0, i -> i + 1)
                .limit(10);
        Stream<Integer> genericStreamOf10Ints = StreamSupport.stream(
                intStreamOf10Ints.spliterator(), true);
        listOfStreamOfInts.add(genericStreamOf10Ints);
    }
    Stream<Stream<Integer>> streamOfStreamOfInts = listOfStreamOfInts
            .stream();
    // WHEN
    Stream<Integer> streamOfInts = streamOfStreamOfInts
            // ////////////////
            // PROBLEM
            //    |
            //    V
            .parallel()
            .reduce(Stream.empty(), Stream::concat);

    // THEN
    System.out.println(streamOfInts.map(String::valueOf).collect(
            joining(", ")));
}

有人能解释一下这种限制吗? <罢工> /找到一种更好的处理流的并行缩减的方法

在@smutje和@louiswasserman注释之后,.flatmap(function.identity())似乎是一个更好的选项,可以容忍.parallel()

共有1个答案

金何平
2023-03-14

您正在使用的reduce的形式采用一个标识值和一个关联组合函数。但stream.empty()不是值;它有状态。流不是像数组或集合那样的数据结构;它们是通过可能并行的聚合操作推动数据的载体,并且它们具有某种状态(例如流是否已被消耗)想想这是如何运作的;你要建立一棵树,在那里,相同的“空”流出现在多个叶子中。当您尝试使用这个有状态的not-an-identity两次时(这不会顺序发生,而是并行发生),第二次尝试并遍历该空流时,将非常正确地看到它已经被使用了。

所以问题是,您只是错误地使用了reduce方法。问题不在于并行性;这只是因为并行性暴露了潜在的问题。

其次,即使这“工作”的方式你认为它应该,你只会得到并行构建树,代表的是扁平的溪流;当你去做连接时,那是一个顺序流管道。哎呀。

第三,即使这种方法按照您认为的方式“工作”,您也会通过建立串联流而增加大量的元素访问开销,而且您不会得到您所寻求的并行性的好处。

简单的答案就是把溪流弄平:

String joined = streamOfStreams.parallel()
                               .flatMap(s -> s)
                               .collect(joining(", "));
 类似资料:
  • 问题内容: 我正在尝试使用Stream API生成Order实例。我有一个创建订单的工厂函数,并且使用DoubleStream初始化订单金额。 如果我使用文字(1.0)初始化Order实例,则可以正常工作。当我使用doubleStream创建随机数量时,将引发异常。 问题答案: 答案在(重点是我的)的javadoc中: 流仅应操作一次(调用中间流或终端流操作) 。例如,这排除了“分叉”流,其中相同

  • 方法是 现在我试着像这样改成stream,但我不知道这种改是否正确: 如何避免出现例外,解决问题? 我应该以某种方式重构代码吗?

  • 使用java8将对象的某个字段值与“_”连接起来。代码中的最后一行抛出一个“”。

  • 如果我使用literal(1.0)初始化Order实例,那么这很好。当我使用doubleStream创建随机数量时,会引发异常。

  • 我有一个java。util。流动包含键值对的流,如: 现在,我想合并所有具有相同密钥的条目: 数据已经排序,因此只需合并连续的数据集。 现在,我正在寻找一种方法来转换上述流的内容,而不将所有数据集加载到内存中。 我更喜欢得到一个java.util.stream.Stream,结果是一个不同的对象类型包含一个值列表,而不是一个单独的值。 我唯一的方法是一个自定义迭代器,它执行合并,但是转换为迭代器并