java.util.Stream
表示能应用在一组元素上一次执行的操作序列。Stream 操作分为中间操作或者最终操作两种,最终操作返回一特定类型的计算结果,而中间操作返回Stream本身,这样你就可以将多个操作依次串起来。Stream 的创建需要指定一个数据源,比如java.util.Collection
的子类,List 或者 Set, Map 不支持。Stream 的操作可以串行执行或者并行执行。
首先看看Stream是怎么用,首先创建实例代码的用到的数据stringList:
public class StreamTest {
private List<String> stringList = new ArrayList<>();
public List<String> list() {
stringList.add("ddd2");
stringList.add("aaa2");
stringList.add("bbb1");
stringList.add("aaa1");
stringList.add("bbb3");
stringList.add("ccc");
stringList.add("bbb2");
stringList.add("ddd1");
return stringList;
}
}
Java 8扩展了集合类,可以通过 Collection.stream() 或者 Collection.parallelStream() 来创建一个Stream。下面几节将详细解释常用的Stream操作:
过滤通过一个predicate接口来过滤并只保留符合条件的元素,该操作属于中间操作,所以我们可以在过滤后的结果来应用其他Stream操作(比如forEach)。forEach需要一个函数来对过滤后的元素依次执行。forEach是一个最终操作,所以我们不能在forEach之后来执行其他Stream操作。
//创建streamTest对象
StreamTest streamTest = new StreamTest();
//给list赋值
List<String> list = streamTest.list();
System.out.println(list.parallelStream());
//测试filter过滤和foreach
list
.stream()
.filter(s -> s.startsWith("a"))
.forEach(System.out::println);
aaa2
aaa1
forEach 是为 Lambda 而设计的,保持了最紧凑的风格。而且 Lambda 表达式本身是可以重用的,非常方便。
排序是一个 中间操作,返回的是排序好后的 Stream。如果你不指定一个自定义的 Comparator 则会使用默认排序。
//测试sort排序,sort函数不带参数默认排序也是从小到大
list.
stream().
sorted((s1, s2) -> s1.compareTo(s2)).
filter(s -> s.startsWith("a")).
forEach(System.out::println);
aaa1
aaa2
需要注意的是,排序只创建了一个排列好后的Stream,而不会影响原有的数据源,排序之后原数据stringCollection是不会被修改的:
System.out.println(list);// [ddd2, aaa2, bbb1, aaa1, bbb3, ccc, bbb2, ddd1]
中间操作 map 会将元素根据指定的 Function 接口来依次将元素转成另外的对象。
下面的示例展示了将字符串转换为大写字符串。你也可以通过map来将对象转换成其他类型,map返回的Stream类型是根据你map传递进去的函数的返回值决定的。
//测试map,将元素根据指定的Function接口来依次转换成另外的对象
list.
stream().
map(String::toUpperCase).
sorted((a, b) -> b.compareTo(a)).
forEach(System.out::println);
DDD2
DDD1
CCC
BBB3
BBB2
BBB1
AAA2
AAA1
Stream提供了多种匹配操作,允许检测指定的Predicate是否匹配整个Stream。所有的匹配操作都是 最终操作 ,并返回一个 boolean 类型的值。
boolean anyStartWithA = list.stream().anyMatch(s -> s.startsWith("a"));
System.out.println(anyStartWithA);
boolean allStartWithA = list.stream().allMatch(s -> s.startsWith("a"));
System.out.println(allStartWithA);
boolean noneStartWithZ = list.stream().noneMatch(s -> s.startsWith("z"));
System.out.println(noneStartWithZ);
true
false
true
计数是一个 最终操作,返回Stream中元素的个数,返回值类型是 long。
long startWithB = list.stream().filter(s -> s.startsWith("b")).count();
System.out.println(startWithB);
3
这是一个 最终操作 ,允许通过指定的函数来将stream中的多个元素规约为一个元素,规约后的结果是通过Optional 接口表示的:
Optional<String> reduce = list.stream().sorted().reduce((a, b) -> a + "#" + b);
System.out.println(reduce);
Optional[aaa1#aaa2#bbb1#bbb2#bbb3#ccc#ddd1#ddd2]
译者注: 这个方法的主要作用是把 Stream 元素组合起来。它提供一个起始值(种子),然后依照运算规则(BinaryOperator),和前面 Stream 的第一个、第二个、第 n 个元素组合。从这个意义上说,字符串拼接、数值的 sum、min、max、average 都是特殊的 reduce。例如 Stream 的 sum 就相当于Integer sum = integers.reduce(0, (a, b) -> a+b);
也有没有起始值的情况,这时会把 Stream 的前面两个元素组合起来,返回的是 Optional。
// 字符串连接,concat = "ABCD"
String concat = Stream.of("A", "B", "C", "D").reduce("", String::concat);
System.out.println(concat);
// 求最小值,minValue = -3.0
double minValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double.MAX_VALUE, Double::min);
System.out.println(minValue);
// 求和,sumValue = 10, 有起始值
int sumValue = Stream.of(1, 2, 3, 4).reduce(0, Integer::sum);
System.out.println(sumValue);
// 求和,sumValue = 10, 无起始值
sumValue = Stream.of(1, 2, 3, 4).reduce(Integer::sum).get();
System.out.println(sumValue);
ABCD
-3.0
10
10
ace
上面代码例如第一个示例的 reduce(),第一个参数(空白字符)即为起始值,第二个参数(String::concat)为 BinaryOperator。这类有起始值的 reduce() 都返回具体的对象。而对于第四个示例没有起始值的 reduce(),由于可能没有足够的元素,返回的是 Optional,请留意这个区别。更多内容查看: IBM:Java 8 中的 Streams API 详解
前面提到过Stream有串行和并行两种,串行Stream上的操作是在一个线程中依次完成,而并行Stream则是在多个线程上同时执行。
下面的例子展示了是如何通过并行Stream来提升性能:
首先我们创建一个没有重复元素的大表:
private int max = 1000000;
private List<String> values = new ArrayList<>(max);
public List<String> setValues() {
for (int i = 0; i < max; i++) {
UUID uuid = UUID.randomUUID();
this.values.add(uuid.toString());
}
return this.values;
}
我们分别用串行和并行两种方式对其进行排序,最后看看所用时间的对比。
//Sequential Sort(串行排序)
long t0 = System.nanoTime();
long count = streamTest.setValues().stream().sorted().count();
System.out.println(count);
long t1 = System.nanoTime();
long millis = TimeUnit.NANOSECONDS.toMillis(t1-t0);
System.out.println(String.format("串行排序所用的时间--sequential sort took: %d ms",millis));
1000000
串行排序所用的时间--sequential sort took: 2778 ms
//Parallel Sort(并行排序)
streamTest.values.clear();
long t2 = System.nanoTime();
long count2 = streamTest.setValues().parallelStream().sorted().count();
System.out.println(count2);
long t3 = System.nanoTime();
long millis2 = TimeUnit.NANOSECONDS.toMillis(t3 - t2);
System.out.println(String.format("并行排序所用的时间--parallel sort took: %d ms", millis2));
1000000
并行排序所用的时间--parallel sort took: 1453 ms
上面两个代码几乎是一样的,但是并行版的快了 50% 左右,唯一需要做的改动就是将 stream()
改为parallelStream()
。
滥用Parallel Streams也可能导致一些问题,可以看一下这篇文章不要滥用parallel stream
完整的测试运行代码:
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
/**
* 测试stream
* @author XieRW
* @email 1429382875@qq.com
*/
public class StreamTest {
private List<String> stringList = new ArrayList<>();
private int max = 1000000;
private List<String> values = new ArrayList<>(max);
public List<String> setValues() {
for (int i = 0; i < max; i++) {
UUID uuid = UUID.randomUUID();
this.values.add(uuid.toString());
}
return this.values;
}
public List<String> list() {
stringList.add("ddd2");
stringList.add("aaa2");
stringList.add("bbb1");
stringList.add("aaa1");
stringList.add("bbb3");
stringList.add("ccc");
stringList.add("bbb2");
stringList.add("ddd1");
return stringList;
}
public static void main(String[] args) {
//创建streamTest对象
StreamTest streamTest = new StreamTest();
//给list赋值
List<String> list = streamTest.list();
System.out.println(list.parallelStream());
//测试filter过滤和foreach
list
.stream()
.filter(s -> s.startsWith("a"))
.forEach(System.out::println);
System.out.println("filter==========分界线==========sort");
//测试sort排序,sort函数不带参数默认排序也是从小到大
list.
stream().
sorted((s1, s2) -> s1.compareTo(s2)).
filter(s -> s.startsWith("a")).
forEach(System.out::println);
//sort不会改变list原本的顺序,filter也是
System.out.println(list);
System.out.println("sort==========分界线==========map");
//测试map,将元素根据指定的Function接口来依次转换成另外的对象
list.
stream().
map(String::toUpperCase).
sorted((a, b) -> b.compareTo(a)).
forEach(System.out::println);
System.out.println("map==========分界线==========Match");
//Match匹配
boolean anyStartWithA = list.stream().anyMatch(s -> s.startsWith("a"));
System.out.println(anyStartWithA);
boolean allStartWithA = list.stream().allMatch(s -> s.startsWith("a"));
System.out.println(allStartWithA);
boolean noneStartWithZ = list.stream().noneMatch(s -> s.startsWith("z"));
System.out.println(noneStartWithZ);
System.out.println("Match==========分界线==========count");
//count计数
long startWithB = list.stream().filter(s -> s.startsWith("b")).count();
System.out.println(startWithB);
//Reduce规约,允许通过指定的函数来将stream中的多个元素规约为一个元素,规约后的结果是通过Optional 接口表示的
Optional<String> reduce = list.stream().sorted().reduce((a, b) -> a + "#" + b);
System.out.println(reduce);
System.out.println("count==========分界线==========Stream");
//Stream接口
// 字符串连接,concat = "ABCD"
String concat = Stream.of("A", "B", "C", "D").reduce("", String::concat);
System.out.println(concat);
// 求最小值,minValue = -3.0
double minValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double.MAX_VALUE, Double::min);
System.out.println(minValue);
// 求和,sumValue = 10, 有起始值
int sumValue = Stream.of(1, 2, 3, 4).reduce(0, Integer::sum);
System.out.println(sumValue);
// 求和,sumValue = 10, 无起始值
sumValue = Stream.of(1, 2, 3, 4).reduce(Integer::sum).get();
System.out.println(sumValue);
// 过滤,字符串连接,concat = "ace"
concat = Stream.of("a", "B", "c", "D", "e", "F").
filter(x -> x.compareTo("Z") > 0).
reduce("", String::concat);
System.out.println(concat);
System.out.println("Stream==========分界线==========Sequential Sort(串行排序)");
//Sequential Sort(串行排序)
long t0 = System.nanoTime();
long count = streamTest.setValues().stream().sorted().count();
System.out.println(count);
long t1 = System.nanoTime();
long millis = TimeUnit.NANOSECONDS.toMillis(t1-t0);
System.out.println(String.format("串行排序所用的时间--sequential sort took: %d ms",millis));
//Parallel Sort(并行排序)
streamTest.values.clear();
long t2 = System.nanoTime();
long count2 = streamTest.setValues().parallelStream().sorted().count();
System.out.println(count2);
long t3 = System.nanoTime();
long millis2 = TimeUnit.NANOSECONDS.toMillis(t3 - t2);
System.out.println(String.format("并行排序所用的时间--parallel sort took: %d ms", millis2));
}
}