我正在处理一批100个对象,将它们分成10个分区,每个分区都被发送到一个单独的线程进行并行处理。这是当前的代码:
1. var itemsToSave = new ConcurrentLinkedQueue<ItemToSave>();
2
3. Lists.partition(originalList, 10)
4. .parallelStream()
5. .forEach(partitionedList -> process(partitionedList, itemsToSave));
我的理解是
line1: creates a thread safe list to add the individual items once they are processed
line3: will return a number of lists, each of them with 10 entries from originalList
line4 will spawn a new thread for each of the lists created on line3
line5: for each list in its own thread, start the process that is supposed to be parallel.
如果我的理解有误,请纠正我,但这段代码仍按预期工作:并行化前75秒处理,并行化后20秒。看起来不错。
但过了一段时间,我注意到,根据Kibana的说法,这个过程实际上需要1ms或甚至零。这是因为新代码处理项目的速度非常快,以至于数据库没有时间处理100个项目,因此应该是100个的批次现在少于10个。
从这个意义上说,前面显示的代码的第3行将返回一个单个列表,毕竟从少于10个项目的列表中包含10个项目的分区将是一个单个列表。
然后,这个列表被发送到parallelStream()。这里我的问题是:parallelStream()
是否仍然生成一个新线程来处理一个列表?或者它只在输入多个列表时生成线程吗?因为(对我来说)打开一个新线程来处理一批项目没有多大意义……这可能会按顺序发生,并减少生成线程等的开销。
所以:当只给一个列表作为输入时,并行流()创建了多少线程?
很抱歉提了这么长的问题,但我觉得必须解释一下我的想法
我想你用的是Lists.partition
是番石榴的一部分
顺便说一下,parallelstream
使用了ForkJoinPool。commonPool()
以获取处理所需的线程。
下面的代码行只是告诉您有多少线程可供您处理。
System.out.println(Runtime.getRuntime().availableProcessors());
现在,让我们举一个与您的案例类似的例子:
List<List<String>> list =new ArrayList<List<String>>();
List<String> w1=new ArrayList<String>();
w1.add("A");w1.add("B");w1.add("C");w1.add("D");w1.add("E");w1.add("F");w1.add("G");w1.add("H");w1.add("I");w1.add("J"); list.add(w1);
list.parallelStream().
forEach(x-> System.out.println(Thread.currentThread().getName())); // this gives output as `main`, no other threads got used.
但是现在,让我们假设您为要处理的并行流提供了项数:
IntStream.range(0, 10).boxed().parallel().forEach(x-> System.out.println(Thread.currentThread().getName()));
上述输出类似于:
main
main
main
main
ForkJoinPool.commonPool-worker-2
main
main
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2
因此,决定线程数实际上取决于
并行流
的输入
当只有一个列表作为输入时,创建了多少线程?
在并行流的框架下,使用了Fork-Join框架。它将任务(输入到流中的全部元素)分成更小的子任务(可以进一步拆分),然后按照创建顺序连接已执行任务的结果(这保证了在并行处理过程中,有序流中元素的相遇顺序将保持不变,您可以在减少、收集
等行为中观察到)。
在执行并行流时,来自所谓的调用公共池(延迟初始化,其最大大小取决于CPU内核的实际数量)的线程将被分配任务。
占用多少线程取决于输入的大小、拆分器的实现方式(即任务如何划分为子任务)、公共池中有多少线程(取决于实际机器)。同样重要的是,处理每个项目的速度有多快(即取决于您的逻辑),线程在完成前一个任务时会尽量通过窃取工作来做到最好。
Javadoc说“返回一个可能并行的流”。甚至不能保证会有多个线程占用您的任务(如果元素数量很小,那么一个线程可能会更快地处理它,而不需要分割和连接结果)。
您不应该关心线程的实际数量,因为您无法控制是否。唯一的问题是,您需要决定并行流是否可以并行执行流。
我正在处理一批100个对象,将它们分成10个分区,每个分区被发送到一个单独的线程进行并行处理。这是当前的代码: 我的理解是 如果我的理解有误,请纠正我,但无论如何,此代码仍按预期工作:并行化之前处理75秒,并行化后20秒。看起来不错。 但是过了一会儿,我注意到根据Kibana,这个过程现在实际上是或ven零。这是因为新代码处理项目的速度如此之快,以至于数据库没有时间达到100个要处理的项目,因此应
所以我不确定是0还是2。 如果有人知道请告诉我。
我编写了代码示例: 每100毫秒提交一个新任务(总任务量-20)。每个任务持续时间-0.5秒。因此,可以并行执行5个任务,最佳执行时间为:20*100 500=2.5秒,池应创建5个线程 但我的实验显示为9.6秒。我打开jsvisualvm查看池创建了多少线程,我看到只创建了一个线程: 请更正我的线程池配置不正确的地方。
我正在用RxJava实现一个事件总线(RxBus)。 RxBus。Java语言 从RecyclerView的viewHolder发布事件 从片段订阅事件 问题是:当我点击或时,只被调用一次(这是正确的),但是被多次调用(甚至不是常量时间)。请参阅下面的日志。 我怎样才能让它只打一次电话? 编辑:我发现如果这次被调用N次,下次当我单击该项目时将被调用N 1次。似乎可观察对象正在向订阅者发出历史记录中
在JDK8中,当我使用ParallelStream时会生成多少线程?例如,在代码中: 如果这个列表有100000个项目,那么会产生多少个线程? 另外,是每个线程都得到相同数量的项目来工作,还是随机分配?
问题内容: 我有一个简单的线程是这样的: 编辑:开始运行的附加代码 它是主要活动的内部类。但是, 此线程 不是在主 活动 上 运行,而是在 另一个 在 主 活动上 运行的线程 内部 运行 。 无论如何,此示例与此处 完全相同 ,但是由于某种原因,它给了我java.lang.RuntimeException:每个线程只能创建一个Looper。 我没有创建任何其他循环程序,至少在任何地方都没有。 问题