parallel 和 parallelStream 都是用于增加并行计算的能力的方法,但二者略有不同。
parallel 是 Java8 中的一个方法,可用于对一个 Stream 进行并行计算,该方法使得 Stream 中的元素在多个线程中被处理。例如:
List<String> list = new ArrayList<>();
list.parallelStream().forEach(System.out::println);
parallelStream 是 Stream 接口的一个方法,它返回一个并行流,也可以用于对一个 Stream 进行并行计算。例如:
List<String> list = new ArrayList<>();
list.stream().parallel().forEach(System.out::println);
两者的区别在于,parallel 是在一个已经存在的串行 Stream 上调用的方法,而 parallelStream 是返回一个新的并行流。因此,parallelStream 在一些情况下可能更加方便,因为它可以避免在多个方法之间传递 Stream 对象。但是,在某些情况下,parallel 可能更加灵活,因为它可以在一个方法中轻松地从串行流转换为并行流。
parallel 和 parallelStream 都是 Java8 中的新特性,用于实现并行计算。
parallel 是用于普通集合类的方法,可将串行流转换为并行流,实现多线程计算。
parallelStream 是用于 Stream 类的方法,可以直接创建一个并行流,实现多线程计算。
区别:
使用方式不同,parallel 是对普通集合进行并行化处理,需要先创建串行流,再调用 parallel 方法;而 parallelStream 则是直接创建并行流。 2.适用范围不同,parallel适用于集合类的处理,而 parallelStream适用于 Stream 流的处理。
使用的线程池不同,parallel 使用的是 ForkJoinPool 中的公共线程池,而 parallelStream 可以指定线程池大小来实现更细粒度的控制。
需要注意的是,并行化处理也并不是万能的,虽然并行化可以提高处理效率,但是如果数据量过小或者计算开销较小,反而会增加并行化的开销,导致处理速度变慢。因此,在使用并行化处理时需要根据具体情况进行评估和调整。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
public class ParallelStreamDemo {
public static void main(String[] args) {
//size = 57
List<String> words = Arrays.asList("apple", "testest", "cherry", "date", "elderberry", "testest",
"cherry", "date", "elderberry", "testest", "cherry", "date", "elderberry",
"testest", "cherry", "date", "elderberry", "testest", "cherry",
"date", "elderberry", "testest", "cherry", "date", "elderberry",
"testest", "cherry", "date", "elderberry", "testest", "cherry", "date",
"elderberry", "testest", "cherry", "date", "elderberry", "testest",
"cherry", "date", "elderberry", "testest", "cherry", "date", "elderberry",
"testest", "cherry", "date", "elderberry", "testest", "cherry", "date", "elderberry",
"testest", "cherry", "date", "elderberry");
//串行 耗时:28715ms
// List<String> list = words(words);
//线程池 耗时:1056ms
List<String> list2 = wordsExecutors(words);
//StreamParallel 耗时:2563ms
// List<String> list3 = wordsStreamParallel(words);
//Parallel 耗时:2592ms
// List<String> list4 = wordsParallel(words);
}
/**
* 模拟复杂的业务逻辑耗时500ms
* @param item
* @return
*/
public static String toStr(String item){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
String str = item;
if(item.equals("testest")){
str = item.toUpperCase();
}
return str;
}
/**
* parallelStream 并行流
* @param list
* @return
*/
public static List<String> wordsParallel(List<String> list){
long startTime = System.currentTimeMillis();
// 并行操作
List<String> length5WordsParallel = list.parallelStream()
//过滤数据
.filter(Objects::nonNull)
//代码业务逻辑
.map(ParallelStreamDemo::toStr)
.collect(Collectors.toList());
long endTime = System.currentTimeMillis();
System.out.println("并行操作结果耗时:" + (endTime-startTime) + "内容:" + length5WordsParallel);
return length5WordsParallel;
}
/**
* 串行
* @param list
* @return
*/
public static List<String> words(List<String> list){
long startTime = System.currentTimeMillis();
//串行操作
List<String> length5Words = list.stream()
//过滤数据
.filter(Objects::nonNull)
//代码业务逻辑
.map(ParallelStreamDemo::toStr)
.collect(Collectors.toList());
long endTime = System.currentTimeMillis();
System.out.println("串行操作结果耗时:" + (endTime-startTime) + "内容:" + length5Words);
return length5Words;
}
/**
* parallel 并行流
* @param list
* @return
*/
public static List<String> wordsStreamParallel(List<String> list){
long startTime = System.currentTimeMillis();
//串行操作
List<String> length5Words = list.stream().parallel()
//过滤数据
.filter(Objects::nonNull)
//代码业务逻辑
.map(ParallelStreamDemo::toStr)
.collect(Collectors.toList());
long endTime = System.currentTimeMillis();
System.out.println("串行操作结果耗时:" + (endTime-startTime) + "内容:" + length5Words);
return length5Words;
}
//需要注意的是,并行操作并不一定比串行操作更快,这要取决于具体的情况和数据规模。在某些情况下,使用并行操作可能会增加额外的开销并降低性能。因此,在实际应用中,我们应该根据具体情况权衡使用串行操作还是并行操作。
public static List<String> wordsExecutors(List<String> list){
ExecutorService executor = Executors.newFixedThreadPool(30);
long startTime = System.currentTimeMillis();
//结果集
ArrayList<String> arrayList = new ArrayList<>();
// 并行操作
List<Future<String>> futures = new ArrayList<>();
for (String item : list) {
Future<String> future = executor.submit(() -> {
return toStr(item);
});
futures.add(future);
}
for (Future<String> future : futures) {
try {
arrayList.add(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
executor.shutdown();
long endTime = System.currentTimeMillis();
System.out.println("线程池操作结果耗时:" + (endTime-startTime)+"内容:" + arrayList);
return arrayList;
}
}