我偶然发现了一个相当烦人的问题:我有一个程序,它有很多数据源,可以流式传输相同类型的元素,而我想要“映射”程序中每个可用的元素(元素顺序不重要)。
由于元素顺序对我来说并不重要,所以我尝试使用.Parallel()
:Streamoft=StreamofStreamoft.Parallel().reduce(stream.empty(),stream::Concat);
来并行化reduce操作,但这会触发java.lang.IllegalStateException:stream已经被操作或关闭
要亲自体验它,只需通过注释/取消注释.parallel()
来使用以下main(java 1.8U20)即可
public static void main(String[] args) {
// GIVEN
List<Stream<Integer>> listOfStreamOfInts = new ArrayList<>();
for (int j = 0; j < 10; j++) {
IntStream intStreamOf10Ints = IntStream.iterate(0, i -> i + 1)
.limit(10);
Stream<Integer> genericStreamOf10Ints = StreamSupport.stream(
intStreamOf10Ints.spliterator(), true);
listOfStreamOfInts.add(genericStreamOf10Ints);
}
Stream<Stream<Integer>> streamOfStreamOfInts = listOfStreamOfInts
.stream();
// WHEN
Stream<Integer> streamOfInts = streamOfStreamOfInts
// ////////////////
// PROBLEM
// |
// V
.parallel()
.reduce(Stream.empty(), Stream::concat);
// THEN
System.out.println(streamOfInts.map(String::valueOf).collect(
joining(", ")));
}
有人能解释一下这种限制吗? <罢工> /找到一种更好的处理流的并行缩减的方法
在@smutje和@louiswasserman注释之后,.flatmap(function.identity())
似乎是一个更好的选项,可以容忍.parallel()
流
您正在使用的reduce
的形式采用一个标识值和一个关联组合函数。但stream.empty()
不是值;它有状态。流不是像数组或集合那样的数据结构;它们是通过可能并行的聚合操作推动数据的载体,并且它们具有某种状态(例如流是否已被消耗)想想这是如何运作的;你要建立一棵树,在那里,相同的“空”流出现在多个叶子中。当您尝试使用这个有状态的not-an-identity两次时(这不会顺序发生,而是并行发生),第二次尝试并遍历该空流时,将非常正确地看到它已经被使用了。
所以问题是,您只是错误地使用了reduce
方法。问题不在于并行性;这只是因为并行性暴露了潜在的问题。
其次,即使这“工作”的方式你认为它应该,你只会得到并行构建树,代表的是扁平的溪流;当你去做连接时,那是一个顺序流管道。哎呀。
第三,即使这种方法按照您认为的方式“工作”,您也会通过建立串联流而增加大量的元素访问开销,而且您不会得到您所寻求的并行性的好处。
简单的答案就是把溪流弄平:
String joined = streamOfStreams.parallel()
.flatMap(s -> s)
.collect(joining(", "));
问题内容: 我正在尝试使用Stream API生成Order实例。我有一个创建订单的工厂函数,并且使用DoubleStream初始化订单金额。 如果我使用文字(1.0)初始化Order实例,则可以正常工作。当我使用doubleStream创建随机数量时,将引发异常。 问题答案: 答案在(重点是我的)的javadoc中: 流仅应操作一次(调用中间流或终端流操作) 。例如,这排除了“分叉”流,其中相同
方法是 现在我试着像这样改成stream,但我不知道这种改是否正确: 如何避免出现例外,解决问题? 我应该以某种方式重构代码吗?
使用java8将对象的某个字段值与“_”连接起来。代码中的最后一行抛出一个“”。
如果我使用literal(1.0)初始化Order实例,那么这很好。当我使用doubleStream创建随机数量时,会引发异常。
我有一个java。util。流动包含键值对的流,如: 现在,我想合并所有具有相同密钥的条目: 数据已经排序,因此只需合并连续的数据集。 现在,我正在寻找一种方法来转换上述流的内容,而不将所有数据集加载到内存中。 我更喜欢得到一个java.util.stream.Stream,结果是一个不同的对象类型包含一个值列表,而不是一个单独的值。 我唯一的方法是一个自定义迭代器,它执行合并,但是转换为迭代器并