当前位置: 首页 > 面试题库 >

在并行流上顺序调用使所有先前的操作顺序

卫浩瀚
2023-03-14
问题内容

我有大量数据,并且想要调用缓慢但干净的方法,而不是调用带有第一个结果的副作用的快速方法。我对中间结果不感兴趣,所以我不想收集它们。

明显的解决方案是创建并行流,进行慢速调用,再次使流顺序进行,然后进行快速调用。问题是,所有代码都在单个线程中执行,没有实际的并行性。

示例代码

@Test
public void testParallelStream() throws ExecutionException, InterruptedException
{
    ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2);
    Set<String> threads = forkJoinPool.submit(()-> new Random().ints(100).boxed()
            .parallel()
            .map(this::slowOperation)
            .sequential()
            .map(Function.identity())//some fast operation, but must be in single thread
            .collect(Collectors.toSet())
    ).get();
    System.out.println(threads);
    Assert.assertEquals(Runtime.getRuntime().availableProcessors() * 2, threads.size());
}

private String slowOperation(int value)
{
    try
    {
        Thread.sleep(100);
    }
    catch (InterruptedException e)
    {
        e.printStackTrace();
    }
    return Thread.currentThread().getName();
}

如果我删除sequential,代码将按预期执行,但是很明显,非并行操作将在多个线程中调用。

您能推荐一些有关这种行为的参考,或者某种避免临时收集的方法吗?


问题答案:

在最初的Stream
API设计中,将流从切换parallel()sequential()工作,但引起了许多问题,最终实现被更改,因此它只是打开和关闭整个管道的并行标志。当前文档确实含糊不清,但是在Java-9中进行了改进:

根据在其上调用终端操作的流的模式,顺序或并行执行流管道。可以使用该BaseStream.isParallel()方法确定流的顺序或并行模式,并可以使用BaseStream.sequential()BaseStream.parallel()操作修改流的模式。最新的顺序或并行模式设置适用于整个流管道的执行。

对于您的问题,您可以将所有内容收集到中间层List并启动新的顺序管道:

new Random().ints(100).boxed()
        .parallel()
        .map(this::slowOperation)
        .collect(Collectors.toList())
        // Start new stream here
        .stream()
        .map(Function.identity())//some fast operation, but must be in single thread
        .collect(Collectors.toSet());


 类似资料:
  • 因为在过滤2之后,我们还得再找到一个元素来分层极限(2),操作,那么为什么输出不像我解释的那样呢?

  • 问题内容: 我使用jQuery。而且我不想在我的应用程序上进行并行AJAX调用,每个调用都必须等待上一个调用之后才能开始。如何执行呢?有帮手吗? 更新 如果我想知道XMLHttpRequest或jQuery.post的任何同步版本。但是顺序!=同步,我想要一个异步和顺序解决方案。 问题答案: 有比使用同步ajax调用更好的方法。jQuery ajax返回一个延迟,因此您可以使用管道链接来确保每个a

  • 是否有任何保证在顺序和有序流上的操作是按遇到顺序处理的? 我是说,如果我有这样的代码: 是否可以保证它将按照生成范围的遇到顺序执行myFunction()调用? 我找到了Stream类的JavaDocs草案,它明确地说明了以下内容: 对于顺序流管道,如果管道源具有已定义的遇到顺序,则所有操作都按照管道源的遇到顺序执行。 但是它没有提到顺序流,这个例子是针对并行流的(我的理解是,顺序流和并行流都是正

  • 返回的迭代器是否保证按该顺序提供值 、、? 我知道和保证集合的值顺序正确。此外,我并不是在问如何从迭代器生成流。

  • 2.8 操作顺序 当表达式中出现了多个运算符的时候,计算顺序取决于优先级规则。一个完整的优先级说明是十分复杂的,出于让您尽快入门的目的,先列出以下几点: 乘除法运算优先于加减法运算。因此2*3-1得到5,而不是4。2/3-1得到-1,而不是1(记住在整型除法中2/3结果是0)。 如果运算符有相同的优先级,它们会按照从左往右的顺序计算。因此表达式minute*100/60中,乘法运算最先进行,得到5

  • 正如你所看到的,我所做的是:从Kafka那里获取消息,将其转换成一个意在新目的地的对象,然后将其发送到目的地,然后确认偏移量以标记消息为已消费和已处理。以与从Kafka消费的消息相同的顺序确认偏移量是非常关键的,这样我们就不会将偏移量移动到未完全处理的消息之外(包括将一些数据发送到目的地)。因此,我使用来确保这一点。 为了简单起见,我们假设方法是一个标识转换。 方法需要通过网络执行某些操作,例如调