当前位置: 首页 > 知识库问答 >
问题:

只使用一个线程的Java并行流?

谷梁弘深
2023-03-14

我正在使用最新的Java8 lambdas和并行流处理数据。我的代码如下:

ForkJoinPool forkJoinPool = new ForkJoinPool(10);
List<String> files = Arrays.asList(new String[]{"1.txt"}); 
List<String> result = forkJoinPool.submit(() ->
    files.stream().parallel()
        .flatMap(x -> stage1(x)) //at this stage we add more elements to the stream
        .map(x -> stage2(x))
        .map(x -> stage3(x))
        .collect(Collectors.toList())
).get();

流以一个元素开始,但在第二阶段会添加更多的元素。我的假设是这个流应该并行运行,但在这种情况下只使用一个工作线程。

如果我从2个元素开始(即我在初始列表中添加第二个元素),那么就会产生2个线程来处理流,依此类推...如果我没有显式地将流提交给ForkJoinPool,也会发生这种情况。

问题是:它是记录在案的行为还是可能在实现中改变?有没有什么方法可以控制这种行为,允许更多的线程,而不管初始列表是什么?

共有1个答案

从景曜
2023-03-14

您所观察的是特定于实现的行为,而不是指定的行为。

当前的JDK8实现查看最外层流的spliterator,并将其用作分割并行工作负载的基础。由于该示例在原始源流中只有单个元素,因此无法对其进行拆分,流以单线程方式运行。这对于flatmap返回零个、一个或几个元素的常见(但绝不是唯一)情况很好,但对于返回大量元素的情况,它们都是按顺序处理的。实际上,flatmap函数返回的流被强制进入顺序模式。请参阅referencePipeline.java的第270行。

要做的“显而易见”的事情是使这个流平行,或者至少不强迫它是顺序的。这可能会也可能不会改善事情。最有可能的是,它会改善一些事情,但会使其他事情变得更糟。这里当然需要一个更好的政策,但我不确定它会是什么样子。

还要注意,通过向并行流提交运行管道的任务,强制并行流在您选择的fork-join池中运行的技术也是特定于实现的行为。它在JDK8中是这样工作的,但将来可能会改变。

 类似资料:
  • #include <stdio.h> #include <pthread.h> int a = 0; int b = 0; void *thread1_func(void *p_arg) { while (1) { a++; sleep(1); } } void *thread2_fu

  • 以下是问题陈述: 编写一个java程序,使用线程计算前25个素数,并计算前50个斐波那契数。将计算斐波那契数的线程的优先级设置为8,将另一个设置为5。在计算了30个斐波那契数之后,让这个线程进入睡眠状态,开始计算素数。计算完25个素数后,继续斐波那契数计算。 我的代码: 我本以为当斐波那契线停止时,其余的素数会被打印出来,但那没有发生,这背后的原因可能是什么?

  • 我试着运行一个程序,使用线程显示带有数字的乘法、除法、加法和减法表。 但是我希望数字被乘以或相加等。由用户选择。 也就是说,程序应该在用户为每个操作选择一个数字后运行,然后显示结果。

  • 我用的是Netty camel-Netty:jar:2 . 10 . 0 . red hat-60024。下面是我对Netty监听器的配置 荨麻:tcp://10.1.33.204:9001?textline=true 在这里,我看到基于调试日志,Netty只创建一个工作线程,所以传入的消息被阻塞,直到现有的消息被处理。 喜欢: 2014-08-23 12:36:48,394|DEBUG|w I/

  • 在我的应用程序中,在程序的整个生命周期中,有n个操作必须依次发生。我决定为每个操作创建一个线程,让它们执行一次run方法,然后等待所有其他线程都执行相同的操作,等待轮到它,然后再次执行,依此类推,而不是创建实现这些操作的方法并在while(true)循环中按顺序调用它们。。。 为了实现这个机制,我创建了一个名为StatusHolder的类,它有一个名为threadTurn的字段(表示应该执行哪个线