当前位置: 首页 > 工具软件 > Streams > 使用案例 >

Java8新特性之Streams和Parallel Streams

令狐良骏
2023-12-01

一、Streams(流)

java.util.Stream 表示能应用在一组元素上一次执行的操作序列。Stream 操作分为中间操作或者最终操作两种,最终操作返回一特定类型的计算结果,而中间操作返回Stream本身,这样你就可以将多个操作依次串起来。Stream 的创建需要指定一个数据源,比如java.util.Collection 的子类,List 或者 Set, Map 不支持。Stream 的操作可以串行执行或者并行执行。

首先看看Stream是怎么用,首先创建实例代码的用到的数据stringList:

public class StreamTest {
	private List<String> stringList = new ArrayList<>();
	public List<String> list() {
    	stringList.add("ddd2");
    	stringList.add("aaa2");
      	stringList.add("bbb1");
     	stringList.add("aaa1");
     	stringList.add("bbb3");
        stringList.add("ccc");
       	stringList.add("bbb2");
        stringList.add("ddd1");
        return stringList;
    }
}

Java 8扩展了集合类,可以通过 Collection.stream() 或者 Collection.parallelStream() 来创建一个Stream。下面几节将详细解释常用的Stream操作:

1、Filter(过滤)

过滤通过一个predicate接口来过滤并只保留符合条件的元素,该操作属于中间操作,所以我们可以在过滤后的结果来应用其他Stream操作(比如forEach)。forEach需要一个函数来对过滤后的元素依次执行。forEach是一个最终操作,所以我们不能在forEach之后来执行其他Stream操作。

        //创建streamTest对象
        StreamTest streamTest = new StreamTest();
        //给list赋值
        List<String> list = streamTest.list();
        System.out.println(list.parallelStream());
        //测试filter过滤和foreach
        list
                .stream()
                .filter(s -> s.startsWith("a"))
                .forEach(System.out::println);
aaa2
aaa1

forEach 是为 Lambda 而设计的,保持了最紧凑的风格。而且 Lambda 表达式本身是可以重用的,非常方便。

2、Sorted(排序)

排序是一个 中间操作,返回的是排序好后的 Stream。如果你不指定一个自定义的 Comparator 则会使用默认排序。

        //测试sort排序,sort函数不带参数默认排序也是从小到大
        list.
                stream().
                sorted((s1, s2) -> s1.compareTo(s2)).
                filter(s -> s.startsWith("a")).
                forEach(System.out::println);
aaa1
aaa2

需要注意的是,排序只创建了一个排列好后的Stream,而不会影响原有的数据源,排序之后原数据stringCollection是不会被修改的:

    System.out.println(list);// [ddd2, aaa2, bbb1, aaa1, bbb3, ccc, bbb2, ddd1]

3、Map(映射)

中间操作 map 会将元素根据指定的 Function 接口来依次将元素转成另外的对象。

下面的示例展示了将字符串转换为大写字符串。你也可以通过map来将对象转换成其他类型,map返回的Stream类型是根据你map传递进去的函数的返回值决定的。

        //测试map,将元素根据指定的Function接口来依次转换成另外的对象
        list.
                stream().
                map(String::toUpperCase).
                sorted((a, b) -> b.compareTo(a)).
                forEach(System.out::println);
DDD2
DDD1
CCC
BBB3
BBB2
BBB1
AAA2
AAA1

4、Match(匹配)

Stream提供了多种匹配操作,允许检测指定的Predicate是否匹配整个Stream。所有的匹配操作都是 最终操作 ,并返回一个 boolean 类型的值。

        boolean anyStartWithA = list.stream().anyMatch(s -> s.startsWith("a"));
        System.out.println(anyStartWithA);

        boolean allStartWithA = list.stream().allMatch(s -> s.startsWith("a"));
        System.out.println(allStartWithA);

        boolean noneStartWithZ = list.stream().noneMatch(s -> s.startsWith("z"));
        System.out.println(noneStartWithZ);
true
false
true

5、Count(计数)

计数是一个 最终操作,返回Stream中元素的个数,返回值类型是 long

		long startWithB = list.stream().filter(s -> s.startsWith("b")).count();
    	System.out.println(startWithB);
3

6、Reduce(规约)

这是一个 最终操作 ,允许通过指定的函数来将stream中的多个元素规约为一个元素,规约后的结果是通过Optional 接口表示的:

	Optional<String> reduce = list.stream().sorted().reduce((a, b) -> a + "#" + b);
	System.out.println(reduce);
Optional[aaa1#aaa2#bbb1#bbb2#bbb3#ccc#ddd1#ddd2]

译者注: 这个方法的主要作用是把 Stream 元素组合起来。它提供一个起始值(种子),然后依照运算规则(BinaryOperator),和前面 Stream 的第一个、第二个、第 n 个元素组合。从这个意义上说,字符串拼接、数值的 sum、min、max、average 都是特殊的 reduce。例如 Stream 的 sum 就相当于Integer sum = integers.reduce(0, (a, b) -> a+b);也有没有起始值的情况,这时会把 Stream 的前面两个元素组合起来,返回的是 Optional。

// 字符串连接,concat = "ABCD"
        String concat = Stream.of("A", "B", "C", "D").reduce("", String::concat);
        System.out.println(concat);
        // 求最小值,minValue = -3.0
        double minValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double.MAX_VALUE, Double::min);
        System.out.println(minValue);
        // 求和,sumValue = 10, 有起始值
        int sumValue = Stream.of(1, 2, 3, 4).reduce(0, Integer::sum);
        System.out.println(sumValue);
        // 求和,sumValue = 10, 无起始值
        sumValue = Stream.of(1, 2, 3, 4).reduce(Integer::sum).get();
        System.out.println(sumValue);
ABCD
-3.0
10
10
ace

上面代码例如第一个示例的 reduce(),第一个参数(空白字符)即为起始值,第二个参数(String::concat)为 BinaryOperator。这类有起始值的 reduce() 都返回具体的对象。而对于第四个示例没有起始值的 reduce(),由于可能没有足够的元素,返回的是 Optional,请留意这个区别。更多内容查看: IBM:Java 8 中的 Streams API 详解

二、Parallel Streams(并行流)

前面提到过Stream有串行和并行两种,串行Stream上的操作是在一个线程中依次完成,而并行Stream则是在多个线程上同时执行。

下面的例子展示了是如何通过并行Stream来提升性能:

首先我们创建一个没有重复元素的大表:

    private int max = 1000000;
    private List<String> values = new ArrayList<>(max);

    public List<String> setValues() {
        for (int i = 0; i < max; i++) {
            UUID uuid = UUID.randomUUID();
            this.values.add(uuid.toString());
        }
        return this.values;
    }

我们分别用串行和并行两种方式对其进行排序,最后看看所用时间的对比。

1、Sequential Sort(串行排序)

//Sequential Sort(串行排序)
long t0 = System.nanoTime();
long count = streamTest.setValues().stream().sorted().count();
System.out.println(count);
long t1 = System.nanoTime();
long millis = TimeUnit.NANOSECONDS.toMillis(t1-t0);
System.out.println(String.format("串行排序所用的时间--sequential sort took: %d ms",millis));
1000000
串行排序所用的时间--sequential sort took: 2778 ms

2、Parallel Sort(并行排序)

//Parallel Sort(并行排序)
streamTest.values.clear();
long t2 = System.nanoTime();
long count2 = streamTest.setValues().parallelStream().sorted().count();
System.out.println(count2);
long t3 = System.nanoTime();
long millis2 = TimeUnit.NANOSECONDS.toMillis(t3 - t2);
System.out.println(String.format("并行排序所用的时间--parallel sort took: %d ms", millis2));

1000000
并行排序所用的时间--parallel sort took: 1453 ms

上面两个代码几乎是一样的,但是并行版的快了 50% 左右,唯一需要做的改动就是将 stream() 改为parallelStream()

应该在什么时候使用Parallel Streams

  1. 确保要执行的任务对线程环境没有依赖
  2. 任务消耗时间长/数据量大到不用思考是否要用parallel
  3. 结果没有顺序要求
  4. 只是对数据进行操作不需要考虑死锁、变量共享等多线程问题

滥用Parallel Streams也可能导致一些问题,可以看一下这篇文章不要滥用parallel stream


完整的测试运行代码:

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

/**
 * 测试stream
 * @author XieRW
 * @email 1429382875@qq.com
 */
public class StreamTest {

    private List<String> stringList = new ArrayList<>();

    private int max = 1000000;
    private List<String> values = new ArrayList<>(max);

    public List<String> setValues() {
        for (int i = 0; i < max; i++) {
            UUID uuid = UUID.randomUUID();
            this.values.add(uuid.toString());
        }
        return this.values;
    }
    public List<String> list() {
        stringList.add("ddd2");
        stringList.add("aaa2");
        stringList.add("bbb1");
        stringList.add("aaa1");
        stringList.add("bbb3");
        stringList.add("ccc");
        stringList.add("bbb2");
        stringList.add("ddd1");
        return stringList;
    }

    public static void main(String[] args) {
        //创建streamTest对象
        StreamTest streamTest = new StreamTest();
        //给list赋值
        List<String> list = streamTest.list();
        System.out.println(list.parallelStream());
        //测试filter过滤和foreach
        list
                .stream()
                .filter(s -> s.startsWith("a"))
                .forEach(System.out::println);

        System.out.println("filter==========分界线==========sort");

        //测试sort排序,sort函数不带参数默认排序也是从小到大
        list.
                stream().
                sorted((s1, s2) -> s1.compareTo(s2)).
                filter(s -> s.startsWith("a")).
                forEach(System.out::println);
        //sort不会改变list原本的顺序,filter也是
        System.out.println(list);

        System.out.println("sort==========分界线==========map");

        //测试map,将元素根据指定的Function接口来依次转换成另外的对象
        list.
                stream().
                map(String::toUpperCase).
                sorted((a, b) -> b.compareTo(a)).
                forEach(System.out::println);

        System.out.println("map==========分界线==========Match");

        //Match匹配
        boolean anyStartWithA = list.stream().anyMatch(s -> s.startsWith("a"));
        System.out.println(anyStartWithA);

        boolean allStartWithA = list.stream().allMatch(s -> s.startsWith("a"));
        System.out.println(allStartWithA);

        boolean noneStartWithZ = list.stream().noneMatch(s -> s.startsWith("z"));
        System.out.println(noneStartWithZ);

        System.out.println("Match==========分界线==========count");

        //count计数
        long startWithB = list.stream().filter(s -> s.startsWith("b")).count();
        System.out.println(startWithB);

        //Reduce规约,允许通过指定的函数来将stream中的多个元素规约为一个元素,规约后的结果是通过Optional 接口表示的
        Optional<String> reduce = list.stream().sorted().reduce((a, b) -> a + "#" + b);
        System.out.println(reduce);

        System.out.println("count==========分界线==========Stream");

        //Stream接口
        // 字符串连接,concat = "ABCD"
        String concat = Stream.of("A", "B", "C", "D").reduce("", String::concat);
        System.out.println(concat);
        // 求最小值,minValue = -3.0
        double minValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double.MAX_VALUE, Double::min);
        System.out.println(minValue);
        // 求和,sumValue = 10, 有起始值
        int sumValue = Stream.of(1, 2, 3, 4).reduce(0, Integer::sum);
        System.out.println(sumValue);
        // 求和,sumValue = 10, 无起始值
        sumValue = Stream.of(1, 2, 3, 4).reduce(Integer::sum).get();
        System.out.println(sumValue);
        // 过滤,字符串连接,concat = "ace"
        concat = Stream.of("a", "B", "c", "D", "e", "F").
                filter(x -> x.compareTo("Z") > 0).
                reduce("", String::concat);
        System.out.println(concat);

        System.out.println("Stream==========分界线==========Sequential Sort(串行排序)");

        //Sequential Sort(串行排序)
        long t0 = System.nanoTime();
        long count = streamTest.setValues().stream().sorted().count();
        System.out.println(count);
        long t1 = System.nanoTime();
        long millis = TimeUnit.NANOSECONDS.toMillis(t1-t0);
        System.out.println(String.format("串行排序所用的时间--sequential sort took: %d ms",millis));

        //Parallel Sort(并行排序)
        streamTest.values.clear();
        long t2 = System.nanoTime();
        long count2 = streamTest.setValues().parallelStream().sorted().count();
        System.out.println(count2);
        long t3 = System.nanoTime();
        long millis2 = TimeUnit.NANOSECONDS.toMillis(t3 - t2);
        System.out.println(String.format("并行排序所用的时间--parallel sort took: %d ms", millis2));
    }
}

 类似资料: