当前位置: 首页 > 知识库问答 >
问题:

排序并行流时遇到顺序错误

查淮晨
2023-03-14

我有一个记录课程:

public class Record implements Comparable<Record>
{
   private String myCategory1;
   private int    myCategory2;
   private String myCategory3;
   private String myCategory4;
   private int    myValue1;
   private double myValue2;

   public Record(String category1, int category2, String category3, String category4,
      int value1, double value2)
   {
      myCategory1 = category1;
      myCategory2 = category2;
      myCategory3 = category3;
      myCategory4 = category4;
      myValue1 = value1;
      myValue2 = value2;
   }

   // Getters here
}

我创建了一个包含很多记录的大列表。只有第二个和第五个值,即i/10000和i,稍后分别由getter使用。

List<Record> list = new ArrayList<>();
for (int i = 0; i < 115000; i++)
{
    list.add(new Record("A", i / 10000, "B", "C", i, (double) i / 100 + 1));
}

请注意,前10000条记录的类别2为0,接下来的10000条记录的类别1等,而值1按顺序为0-114999。

我创建了一个既并行又排序的流。

Stream<Record> stream = list.stream()
   .parallel()
   .sorted(
       //(r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2())
   )
   //.parallel()
;

我有一个ForkJoinPool,它维护8个线程,这是我电脑上的内核数。

ForkJoinPool pool = new ForkJoinPool(8);

我使用这里描述的技巧将流处理任务提交给我自己的ForkJoinPool,而不是常见的ForkJoinPool

List<Record> output = pool.submit(() ->
    stream.collect(Collectors.toList()
)).get();

我希望并行排序操作将尊重流的相遇顺序,并且它将是一个稳定的排序,因为由数组列表返回的拆分器是有序的。

然而,按顺序打印出结果列表的元素的简单代码表明情况并非如此。

for (Record record : output)
{
     System.out.println(record.getValue1());
}

输出,压缩:

0
1
2
3
...
69996
69997
69998
69999
71875  // discontinuity!
71876
71877
71878
...
79058
79059
79060
79061
70000  // discontinuity!
70001
70002
70003
...
71871
71872
71873
71874
79062  // discontinuity!
79063
79064
79065
79066
...
114996
114997
114998
114999

输出的size()为115000,所有元素似乎都在那里,只是顺序略有不同。

所以我写了一些检查代码来查看排序是否稳定。如果它是稳定的,那么所有value e1值都应该保持顺序。此代码验证订单,打印任何差异。

int prev = -1;
boolean verified = true;
for (Record record : output)
{
    int curr = record.getValue1();
    if (prev != -1)
    {
        if (prev + 1 != curr)
        {
            System.out.println("Warning: " + prev + " followed by " + curr + "!");
            verified = false;
        }
    }
    prev = curr;
}
System.out.println("Verified: " + verified);

输出:

Warning: 69999 followed by 71875!
Warning: 79061 followed by 70000!
Warning: 71874 followed by 79062!
Warning: 99999 followed by 100625!
Warning: 107811 followed by 100000!
Warning: 100624 followed by 107812!
Verified: false

如果我执行以下任一操作,这种情况将持续存在:

>

  • 用ThreadPoolExecutor替换ForkJoinPool。

    ThreadPoolExecutor pool = new ThreadPoolExecutor(8, 8, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    

    通过直接处理Stream来使用常见的ForkJoinPool

    List<Record> output = stream.collect(Collectors.toList());
    

    在我调用排序后调用parallel()

    Stream<Record> stream = list.stream().sorted().parallel();
    

    调用并行流()而不是stream()。并行()

    Stream<Record> stream = list.parallelStream().sorted();
    

    使用比较器进行排序。请注意,此排序标准与我为可比较接口定义的“自然”顺序不同,尽管从一开始就已经按顺序排列的结果开始,但结果仍然应该是相同的。

    Stream<Record> stream = list.stream().parallel().sorted(
        (r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2())
    );
    

    如果我不在流上执行以下操作之一,我只能获得这个以保持遭遇顺序:

    • 不要调用parallel()

    有趣的是,没有排序的parallel()保留了顺序。

    在上述两种情况下,输出为:

    Verified: true
    

    我的Java版本是1.8.0_05。这种异常也发生在Ideone上,它似乎运行Java 8u25。

    使现代化

    在撰写本文时,我已经将JDK升级到最新版本1.8.045,问题没有改变。

    问题

    结果列表中的记录顺序(输出)是否由于排序不稳定、未保留相遇顺序或其他原因而无序?

    当我创建并行流并对其排序时,如何确保保留相遇顺序?


  • 共有1个答案

    柴磊
    2023-03-14

    它看起来像数组。parallelSort在某些情况下不稳定。很好的发现。流并行排序是根据<代码>数组实现的。parallelSort,因此它也会影响流。下面是一个简化示例:

    public class StableSortBug {
        static final int SIZE = 50_000;
    
        static class Record implements Comparable<Record> {
            final int sortVal;
            final int seqNum;
    
            Record(int i1, int i2) { sortVal = i1; seqNum = i2; }
    
            @Override
            public int compareTo(Record other) {
                return Integer.compare(this.sortVal, other.sortVal);
            }
        }
    
        static Record[] genArray() {
            Record[] array = new Record[SIZE];
            Arrays.setAll(array, i -> new Record(i / 10_000, i));
            return array;
        }
    
        static boolean verify(Record[] array) {
            return IntStream.range(1, array.length)
                            .allMatch(i -> array[i-1].seqNum + 1 == array[i].seqNum);
        }
    
        public static void main(String[] args) {
            Record[] array = genArray();
            System.out.println(verify(array));
            Arrays.sort(array);
            System.out.println(verify(array));
            Arrays.parallelSort(array);
            System.out.println(verify(array));
        }
    }
    

    在我的机器(2核x 2线程)上,这将打印以下内容:

    true
    true
    false
    

    当然,它应该打印true三次。这是在当前的JDK 9开发版本上。鉴于您已经尝试过的,如果它发生在迄今为止的所有JDK 8版本中,我不会感到惊讶。奇怪的是,减小大小或除数会改变行为。20,000的大小和10,000的除数是稳定的,50,000的大小和1,000的除数也是稳定的。似乎问题与比较相等与并行拆分大小的足够大的值运行有关。

    OpenJDK问题JDK-8076446涵盖了这个bug。

     类似资料:
    • 返回的迭代器是否保证按该顺序提供值 、、? 我知道和保证集合的值顺序正确。此外,我并不是在问如何从迭代器生成流。

    • 我正在实现一个分页器(在Java),它应该允许并行访问。 我有以下测试用例(测试在Groovy中,带有Spock): 此testcase失败,出现以下错误: 拆分器具有 当我不使用并行时,代码可以工作。所以我不理解: 如果设置了,流框架是否应该保证顺序,并且应该在使用并行生成的块时对结果进行排序?如果是,为什么不在我的情况下排序? 还是我的实现中有错误,必须按照给定的顺序拆分?(当前我在打开页面的

    • 问题内容: Javadoc 表示(重点是我): 该操作的行为明确地是不确定的。 对于并行流管道,此操作不能保证遵守流的遇到顺序 ,因为这样做会牺牲并行性的好处。对于任何给定的元素,可以在库选择的任何时间和线程中执行操作。如果操作访问共享状态,则它负责提供所需的同步。 Java 9 Early Access Javadoc中提供了相同的文本。 第一句话(“明确地不确定”)表明(但未明确说明)此方法未

    • 的Javadoc表示(强调是我的): 此操作的行为显式不确定。对于并行流管道,此操作不能保证尊重流的相遇顺序,因为这样做会牺牲并行性的好处。对于任何给定的元素,操作可以在库选择的任何时间和线程中执行。如果操作访问共享状态,则它负责提供所需的同步。 同样的文本也出现在Java9早期访问Javadoc中。 如果forEach不保留遭遇顺序,则会引入bug。在报告针对NetBeans的bug之前,我想知

    • 受这个问题的启发,我开始研究有序流与无序流、并行流与顺序流以及终端操作,它们考虑的是相遇顺序,而终端操作则不考虑相遇顺序。 在链接问题的一个答案中,显示了一个类似于此的代码: 名单确实不同。列表甚至从一次运行更改到另一次运行,表明结果实际上是不确定的。 因此,我创建了另一个示例: 我希望看到类似的结果,因为流既并行又无序(可能是冗余的,因为它已经并行了)。但是,结果列表是有序的,即它等于源列表。

    • 是否有任何保证在顺序和有序流上的操作是按遇到顺序处理的? 我是说,如果我有这样的代码: 是否可以保证它将按照生成范围的遇到顺序执行myFunction()调用? 我找到了Stream类的JavaDocs草案,它明确地说明了以下内容: 对于顺序流管道,如果管道源具有已定义的遇到顺序,则所有操作都按照管道源的遇到顺序执行。 但是它没有提到顺序流,这个例子是针对并行流的(我的理解是,顺序流和并行流都是正