当前位置: 首页 > 工具软件 > Parallel > 使用案例 >

并行流parallel 和 parallelStream

王高超
2023-12-01

parallel 和 parallelStream 都是用于增加并行计算的能力的方法,但二者略有不同。

parallel 是 Java8 中的一个方法,可用于对一个 Stream 进行并行计算,该方法使得 Stream 中的元素在多个线程中被处理。例如:

List<String> list = new ArrayList<>();
list.parallelStream().forEach(System.out::println);

parallelStream 是 Stream 接口的一个方法,它返回一个并行流,也可以用于对一个 Stream 进行并行计算。例如:

List<String> list = new ArrayList<>();
list.stream().parallel().forEach(System.out::println);

两者的区别在于,parallel 是在一个已经存在的串行 Stream 上调用的方法,而 parallelStream 是返回一个新的并行流。因此,parallelStream 在一些情况下可能更加方便,因为它可以避免在多个方法之间传递 Stream 对象。但是,在某些情况下,parallel 可能更加灵活,因为它可以在一个方法中轻松地从串行流转换为并行流。

parallel 和 parallelStream 都是 Java8 中的新特性,用于实现并行计算。

parallel 是用于普通集合类的方法,可将串行流转换为并行流,实现多线程计算。

parallelStream 是用于 Stream 类的方法,可以直接创建一个并行流,实现多线程计算。

区别:

使用方式不同,parallel 是对普通集合进行并行化处理,需要先创建串行流,再调用 parallel 方法;而 parallelStream 则是直接创建并行流。 2.适用范围不同,parallel适用于集合类的处理,而 parallelStream适用于 Stream 流的处理。
使用的线程池不同,parallel 使用的是 ForkJoinPool 中的公共线程池,而 parallelStream 可以指定线程池大小来实现更细粒度的控制。
需要注意的是,并行化处理也并不是万能的,虽然并行化可以提高处理效率,但是如果数据量过小或者计算开销较小,反而会增加并行化的开销,导致处理速度变慢。因此,在使用并行化处理时需要根据具体情况进行评估和调整。

并发的三种方式对比

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

public class ParallelStreamDemo {
    public static void main(String[] args) {
        //size = 57
        List<String> words = Arrays.asList("apple", "testest", "cherry", "date", "elderberry", "testest",
                "cherry", "date", "elderberry", "testest", "cherry", "date", "elderberry",
                "testest", "cherry", "date", "elderberry", "testest", "cherry",
                "date", "elderberry", "testest", "cherry", "date", "elderberry",
                "testest", "cherry", "date", "elderberry", "testest", "cherry", "date",
                "elderberry", "testest", "cherry", "date", "elderberry", "testest",
                "cherry", "date", "elderberry", "testest", "cherry", "date", "elderberry",
                "testest", "cherry", "date", "elderberry", "testest", "cherry", "date", "elderberry",
                "testest", "cherry", "date", "elderberry");

        //串行 耗时:28715ms
//        List<String> list = words(words);
        //线程池 耗时:1056ms
        List<String> list2 = wordsExecutors(words);
        //StreamParallel 耗时:2563ms
//        List<String> list3 = wordsStreamParallel(words);
        //Parallel 耗时:2592ms
//        List<String> list4 = wordsParallel(words);

    }

    /**
     * 模拟复杂的业务逻辑耗时500ms
     * @param item
     * @return
     */
    public static String toStr(String item){
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String str = item;
        if(item.equals("testest")){
            str = item.toUpperCase();
        }
        return str;
    }

    /**
     * parallelStream 并行流
     * @param list
     * @return
     */
    public static List<String> wordsParallel(List<String> list){
        long startTime = System.currentTimeMillis();
        // 并行操作
        List<String> length5WordsParallel = list.parallelStream()
                //过滤数据
                .filter(Objects::nonNull)
                //代码业务逻辑
                .map(ParallelStreamDemo::toStr)
                .collect(Collectors.toList());
        long endTime = System.currentTimeMillis();
        System.out.println("并行操作结果耗时:" + (endTime-startTime) + "内容:" + length5WordsParallel);
        return length5WordsParallel;
    }

    /**
     * 串行
     * @param list
     * @return
     */
    public static List<String> words(List<String> list){
        long startTime = System.currentTimeMillis();
        //串行操作
        List<String> length5Words = list.stream()
                //过滤数据
                .filter(Objects::nonNull)
                //代码业务逻辑
                .map(ParallelStreamDemo::toStr)
                .collect(Collectors.toList());
        long endTime = System.currentTimeMillis();
        System.out.println("串行操作结果耗时:" + (endTime-startTime) + "内容:" + length5Words);
        return length5Words;
    }

    /**
     * parallel 并行流
     * @param list
     * @return
     */
    public static List<String> wordsStreamParallel(List<String> list){
        long startTime = System.currentTimeMillis();
        //串行操作
        List<String> length5Words = list.stream().parallel()
                //过滤数据
                .filter(Objects::nonNull)
                //代码业务逻辑
                .map(ParallelStreamDemo::toStr)
                .collect(Collectors.toList());
        long endTime = System.currentTimeMillis();
        System.out.println("串行操作结果耗时:" + (endTime-startTime) + "内容:" + length5Words);
        return length5Words;
    }

    //需要注意的是,并行操作并不一定比串行操作更快,这要取决于具体的情况和数据规模。在某些情况下,使用并行操作可能会增加额外的开销并降低性能。因此,在实际应用中,我们应该根据具体情况权衡使用串行操作还是并行操作。
    public static List<String> wordsExecutors(List<String> list){
        ExecutorService executor = Executors.newFixedThreadPool(30);
        long startTime = System.currentTimeMillis();
        //结果集
        ArrayList<String> arrayList = new ArrayList<>();
        // 并行操作
        List<Future<String>> futures = new ArrayList<>();
        for (String item : list) {
            Future<String> future = executor.submit(() -> {
                return toStr(item);
            });
            futures.add(future);
        }

        for (Future<String> future : futures) {
            try {
                arrayList.add(future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        executor.shutdown();
        long endTime = System.currentTimeMillis();

        System.out.println("线程池操作结果耗时:" + (endTime-startTime)+"内容:" + arrayList);
        return arrayList;
    }

}

 类似资料: