我有一个记录课程:
public class Record implements Comparable<Record>
{
private String myCategory1;
private int myCategory2;
private String myCategory3;
private String myCategory4;
private int myValue1;
private double myValue2;
public Record(String category1, int category2, String category3, String category4,
int value1, double value2)
{
myCategory1 = category1;
myCategory2 = category2;
myCategory3 = category3;
myCategory4 = category4;
myValue1 = value1;
myValue2 = value2;
}
// Getters here
}
我创建了一个包含很多记录的大列表。只有第二个和第五个值,即i/10000和i,稍后分别由getter使用。
List<Record> list = new ArrayList<>();
for (int i = 0; i < 115000; i++)
{
list.add(new Record("A", i / 10000, "B", "C", i, (double) i / 100 + 1));
}
请注意,前10000条记录的类别2为0,接下来的10000条记录的类别1等,而值1按顺序为0-114999。
我创建了一个既并行又排序的流。
Stream<Record> stream = list.stream()
.parallel()
.sorted(
//(r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2())
)
//.parallel()
;
我有一个ForkJoinPool,它维护8个线程,这是我电脑上的内核数。
ForkJoinPool pool = new ForkJoinPool(8);
我使用这里描述的技巧将流处理任务提交给我自己的ForkJoinPool
,而不是常见的ForkJoinPool
。
List<Record> output = pool.submit(() ->
stream.collect(Collectors.toList()
)).get();
我希望并行排序操作将尊重流的相遇顺序,并且它将是一个稳定的排序,因为由数组列表返回的拆分器是有序的。
然而,按顺序打印出结果列表的元素的简单代码表明情况并非如此。
for (Record record : output)
{
System.out.println(record.getValue1());
}
输出,压缩:
0
1
2
3
...
69996
69997
69998
69999
71875 // discontinuity!
71876
71877
71878
...
79058
79059
79060
79061
70000 // discontinuity!
70001
70002
70003
...
71871
71872
71873
71874
79062 // discontinuity!
79063
79064
79065
79066
...
114996
114997
114998
114999
输出的
size()
为115000,所有元素似乎都在那里,只是顺序略有不同。
所以我写了一些检查代码来查看
排序
是否稳定。如果它是稳定的,那么所有value e1
值都应该保持顺序。此代码验证订单,打印任何差异。
int prev = -1;
boolean verified = true;
for (Record record : output)
{
int curr = record.getValue1();
if (prev != -1)
{
if (prev + 1 != curr)
{
System.out.println("Warning: " + prev + " followed by " + curr + "!");
verified = false;
}
}
prev = curr;
}
System.out.println("Verified: " + verified);
输出:
Warning: 69999 followed by 71875!
Warning: 79061 followed by 70000!
Warning: 71874 followed by 79062!
Warning: 99999 followed by 100625!
Warning: 107811 followed by 100000!
Warning: 100624 followed by 107812!
Verified: false
如果我执行以下任一操作,这种情况将持续存在:
>
用ThreadPoolExecutor替换ForkJoinPool。
ThreadPoolExecutor pool = new ThreadPoolExecutor(8, 8, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
通过直接处理
Stream
来使用常见的ForkJoinPool
。
List<Record> output = stream.collect(Collectors.toList());
在我调用排序后调用parallel()
。
Stream<Record> stream = list.stream().sorted().parallel();
调用并行流()
而不是stream()。并行()
。
Stream<Record> stream = list.parallelStream().sorted();
使用比较器
进行排序。请注意,此排序标准与我为可比较
接口定义的“自然”顺序不同,尽管从一开始就已经按顺序排列的结果开始,但结果仍然应该是相同的。
Stream<Record> stream = list.stream().parallel().sorted(
(r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2())
);
如果我不在流上执行以下操作之一,我只能获得这个以保持遭遇顺序:
不要调用parallel()
有趣的是,没有排序的parallel()保留了顺序。
在上述两种情况下,输出为:
Verified: true
我的Java版本是1.8.0_05。这种异常也发生在Ideone上,它似乎运行Java 8u25。
使现代化
在撰写本文时,我已经将JDK升级到最新版本1.8.045,问题没有改变。
问题
结果列表中的记录顺序(输出)是否由于排序不稳定、未保留相遇顺序或其他原因而无序?
当我创建并行流并对其排序时,如何确保保留相遇顺序?
它看起来像数组。parallelSort在某些情况下不稳定。很好的发现。流并行排序是根据<代码>数组实现的。parallelSort,因此它也会影响流。下面是一个简化示例:
public class StableSortBug {
static final int SIZE = 50_000;
static class Record implements Comparable<Record> {
final int sortVal;
final int seqNum;
Record(int i1, int i2) { sortVal = i1; seqNum = i2; }
@Override
public int compareTo(Record other) {
return Integer.compare(this.sortVal, other.sortVal);
}
}
static Record[] genArray() {
Record[] array = new Record[SIZE];
Arrays.setAll(array, i -> new Record(i / 10_000, i));
return array;
}
static boolean verify(Record[] array) {
return IntStream.range(1, array.length)
.allMatch(i -> array[i-1].seqNum + 1 == array[i].seqNum);
}
public static void main(String[] args) {
Record[] array = genArray();
System.out.println(verify(array));
Arrays.sort(array);
System.out.println(verify(array));
Arrays.parallelSort(array);
System.out.println(verify(array));
}
}
在我的机器(2核x 2线程)上,这将打印以下内容:
true
true
false
当然,它应该打印true
三次。这是在当前的JDK 9开发版本上。鉴于您已经尝试过的,如果它发生在迄今为止的所有JDK 8版本中,我不会感到惊讶。奇怪的是,减小大小或除数会改变行为。20,000的大小和10,000的除数是稳定的,50,000的大小和1,000的除数也是稳定的。似乎问题与比较相等与并行拆分大小的足够大的值运行有关。
OpenJDK问题JDK-8076446涵盖了这个bug。
返回的迭代器是否保证按该顺序提供值 、、? 我知道和保证集合的值顺序正确。此外,我并不是在问如何从迭代器生成流。
我正在实现一个分页器(在Java),它应该允许并行访问。 我有以下测试用例(测试在Groovy中,带有Spock): 此testcase失败,出现以下错误: 拆分器具有 当我不使用并行时,代码可以工作。所以我不理解: 如果设置了,流框架是否应该保证顺序,并且应该在使用并行生成的块时对结果进行排序?如果是,为什么不在我的情况下排序? 还是我的实现中有错误,必须按照给定的顺序拆分?(当前我在打开页面的
问题内容: Javadoc 表示(重点是我): 该操作的行为明确地是不确定的。 对于并行流管道,此操作不能保证遵守流的遇到顺序 ,因为这样做会牺牲并行性的好处。对于任何给定的元素,可以在库选择的任何时间和线程中执行操作。如果操作访问共享状态,则它负责提供所需的同步。 Java 9 Early Access Javadoc中提供了相同的文本。 第一句话(“明确地不确定”)表明(但未明确说明)此方法未
的Javadoc表示(强调是我的): 此操作的行为显式不确定。对于并行流管道,此操作不能保证尊重流的相遇顺序,因为这样做会牺牲并行性的好处。对于任何给定的元素,操作可以在库选择的任何时间和线程中执行。如果操作访问共享状态,则它负责提供所需的同步。 同样的文本也出现在Java9早期访问Javadoc中。 如果forEach不保留遭遇顺序,则会引入bug。在报告针对NetBeans的bug之前,我想知
受这个问题的启发,我开始研究有序流与无序流、并行流与顺序流以及终端操作,它们考虑的是相遇顺序,而终端操作则不考虑相遇顺序。 在链接问题的一个答案中,显示了一个类似于此的代码: 名单确实不同。列表甚至从一次运行更改到另一次运行,表明结果实际上是不确定的。 因此,我创建了另一个示例: 我希望看到类似的结果,因为流既并行又无序(可能是冗余的,因为它已经并行了)。但是,结果列表是有序的,即它等于源列表。
是否有任何保证在顺序和有序流上的操作是按遇到顺序处理的? 我是说,如果我有这样的代码: 是否可以保证它将按照生成范围的遇到顺序执行myFunction()调用? 我找到了Stream类的JavaDocs草案,它明确地说明了以下内容: 对于顺序流管道,如果管道源具有已定义的遇到顺序,则所有操作都按照管道源的遇到顺序执行。 但是它没有提到顺序流,这个例子是针对并行流的(我的理解是,顺序流和并行流都是正