当前位置: 首页 > 知识库问答 >
问题:

当只给出一个列表作为输入时,parallelStream() 创建了多少个线程?[重复]

轩辕佑运
2023-03-14

我正在处理一批100个对象,将它们分成10个分区,每个分区被发送到一个单独的线程进行并行处理。这是当前的代码:

1. var itemsToSave = new ConcurrentLinkedQueue<ItemToSave>();
2
3. Lists.partition(originalList, 10)
4.  .parallelStream()
5.  .forEach(partitionedList -> process(partitionedList, itemsToSave));

我的理解是

line1: creates a thread safe list to add the individual items once they are processed

line3: will return a number of lists, each of them with 10 entries from originalList

line4 will spawn a new thread for each of the lists created on line3

line5: for each list in its own thread, start the process that is supposed to be parallel.

如果我的理解有误,请纠正我,但无论如何,此代码仍按预期工作:并行化之前处理75秒,并行化后20秒。看起来不错。

但是过了一会儿,我注意到根据Kibana,这个过程现在实际上是~1ms或ven零。这是因为新代码处理项目的速度如此之快,以至于数据库没有时间达到100个要处理的项目,因此应该是100个的批次现在不到10个。

从这个意义上说,前面显示的代码的第3行将返回一个单一的列表,毕竟来自少于10个项目的列表中的10个项目的分区将是一个单一的列表。

然后,这个列表被发送到< code>parallelStream()。我的问题是:< code>parallelStream()是否还会产生一个新的线程来处理一个列表?还是只有在输入多个列表时才会产生线程?因为(对我来说)打开一个新线程来处理一批项目没有多大意义...这可以按顺序发生,并减少生成线程等的开销。

那么:当只给定一个列表作为输入时,parallelStream()会创建多少个线程呢?

抱歉问了这么长的问题,但我觉得必须解释一下我的想法

共有2个答案

史旺
2023-03-14

我想你正在使用列表.分区,这是番石榴的一部分

顺便说一下,并行流使用ForkJoinPool.commonPool()来获取处理所需的线程。

下面的代码行只是为您提供了多少线程可供您处理。

System.out.println(Runtime.getRuntime().availableProcessors()); 

现在,让我们举一个类似于您的案例的例子:

    List<List<String>> list =new ArrayList<List<String>>();
    List<String> w1=new ArrayList<String>();

    w1.add("A");w1.add("B");w1.add("C");w1.add("D");w1.add("E");w1.add("F");w1.add("G");w1.add("H");w1.add("I");w1.add("J"); list.add(w1);

    list.parallelStream().
    forEach(x-> System.out.println(Thread.currentThread().getName())); // this gives output as `main`, no other threads got used.

但是现在,假设您向并行流提供了要处理的项目数量:

IntStream.range(0, 10).boxed().parallel().forEach(x-> System.out.println(Thread.currentThread().getName()));

上面的输出类似于:

main
main
main
main
ForkJoinPool.commonPool-worker-2
main
main
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2

因此,决定线程数实际上取决于并行流的输入。

尹俊雅
2023-03-14

当只给一个列表作为输入时,会创建多少个线程⃣elStream()

Fork-Join框架是在并行流的外壳下使用的。它将任务(您输入到流中的所有元素)分成更小的子任务(可以进一步拆分),然后按照创建的顺序连接执行任务的结果(保证在并行处理期间会保留有序流中元素的遇到顺序,您可以在duce收集等的行为中观察到这一点)。

在执行并行流时,将分配来自所谓的 Common 池的线程(该池被延迟初始化,其最大大小取决于 CPU 内核的实际数量)。

将占用多少线程取决于输入的大小,取决于拆分器的实现方式(即如何将任务划分为子任务),公共池中的线程数(取决于实际的机器)。同样重要的是,每个项目的处理速度(即取决于你的逻辑),线程会在完成上一个任务时通过窃取工作来尽力而为。

并行流()的 Javadoc 表示“返回一个可能的并行流”。甚至不能保证您的任务会占用多个线程(如果元素数量很小 - 一个线程可能会更快地处理它而不会拆分和连接结果)。

您不应该关心线程的实际数量,因为您无法html" target="_blank">控制它。唯一的问题是,您需要决定并行流是否可以并行执行流。

 类似资料:
  • 我正在处理一批100个对象,将它们分成10个分区,每个分区都被发送到一个单独的线程进行并行处理。这是当前的代码: 我的理解是 如果我的理解有误,请纠正我,但这段代码仍按预期工作:并行化前75秒处理,并行化后20秒。看起来不错。 但过了一段时间,我注意到,根据Kibana的说法,这个过程实际上需要1ms或甚至零。这是因为新代码处理项目的速度非常快,以至于数据库没有时间处理100个项目,因此应该是10

  • 我编写了代码示例: 每100毫秒提交一个新任务(总任务量-20)。每个任务持续时间-0.5秒。因此,可以并行执行5个任务,最佳执行时间为:20*100 500=2.5秒,池应创建5个线程 但我的实验显示为9.6秒。我打开jsvisualvm查看池创建了多少线程,我看到只创建了一个线程: 请更正我的线程池配置不正确的地方。

  • 所以我不确定是0还是2。 如果有人知道请告诉我。

  • 在JDK8中,当我使用ParallelStream时会生成多少线程?例如,在代码中: 如果这个列表有100000个项目,那么会产生多少个线程? 另外,是每个线程都得到相同数量的项目来工作,还是随机分配?

  • 问题内容: 我有一个简单的线程是这样的: 编辑:开始运行的附加代码 它是主要活动的内部类。但是, 此线程 不是在主 活动 上 运行,而是在 另一个 在 主 活动上 运行的线程 内部 运行 。 无论如何,此示例与此处 完全相同 ,但是由于某种原因,它给了我java.lang.RuntimeException:每个线程只能创建一个Looper。 我没有创建任何其他循环程序,至少在任何地方都没有。 问题