代码实现
package com.hirain.higale.framework.learnProject;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.stream.Stream;
public class Reactor3 {
/**
* 创建操作
*/
public static void createFlux() {
// 1.just():可以指定序列中包含的全部元素,创建出来的Flux序列会在发布这些元素之后自动结束
Flux<String> f1 = Flux.just("str1", "str2", "str3","1","2","3");
// 这里若输入俩参数,则会报错,Mono是一个或者空序列
Mono<String> m1 = Mono.just("1");
// 2.fromIterable()从一个Iterable对象中创建一个Flux对象
Flux<String> f2 = Flux.fromIterable(Arrays.asList("str4", "str5", "str6","4","5","6"));
// 3.fromStream()从一个Stream对象中创建一个Flux对象
ArrayList<Integer> numList = new ArrayList<>(){{add(1); add(2);}};
Flux<Integer> f3 = Flux.fromStream(numList.stream());
// 4.fromArray()从一个数组对象中创建一个Flux对象
Integer[] arr = new Integer[]{1, 2, 3};
Flux<Integer> f4 = Flux.fromArray(arr);
// 5.range(start,count) 表示从start开始,递增的生成count个数字,都是int类型的参数
Flux<Integer> f5 = Flux.range(2, 5);
// 6.创建一个不包含任何元素,只发布结束消息的序列。
// 并且这种方式不会进行后续传递,需要switchIfEmpty()方法来进行处理。
// 因为响应式编程中,流的处理是基于元素的,而empty()是没有元素的!
Flux<Object> empty = Flux.empty();
// 7.创建一个只包含错误消息的序列,里面的参数类型是Throwable
Flux<Object> error = Flux.error(new Exception("error!"));
// 8.创建一个不包含任何消息通知的序列,注意区别empty(),empty还是会发布结束消息的。
Flux<Object> never = Flux.never();
System.out.println("--------------f1---------------");
f1.subscribe(System.out::println);
System.out.println("--------------m1---------------");
m1.subscribe(System.out::println);
System.out.println("--------------f2---------------");
f2.subscribe(System.out::println);
System.out.println("--------------f3---------------");
f3.subscribe(System.out::println);
System.out.println("--------------f4---------------");
f4.subscribe(System.out::println);
System.out.println("--------------f5---------------");
f5.subscribe(System.out::println);
System.out.println("--------------empty---------------");
empty.subscribe(System.out::println);
System.out.println("--------------error---------------");
error.subscribe(System.out::println);
System.out.println("--------------never---------------");
never.subscribe(System.out::println);
}
/**
* 联合操作
*/
public static void mergeFlux() {
Flux<String> f1 = Flux.just("str1", "str2", "str3","1","2","3");
Flux<String> f2 = Flux.fromIterable(Arrays.asList("str4", "str5", "str6","4","5","6"));
f1.mergeWith(f2).subscribe(f -> System.out.println("Here's some number: " + f));
}
public static void zipFlux() {
Flux<String> f1 = Flux.just("str1", "str2", "str3","1","2","3");
Flux<String> f2 = Flux.fromIterable(Arrays.asList("str4", "str5", "str6","4","5","6"));
//zip操作将合并两个Flux流,并且生成一个Tuple2对象,Tuple2中包含两个流中同顺序的元素各一个。
Flux.zip(f1, f2)
.take(3).subscribe(f -> System.out.println("直接zip:"+f));
Flux.zip(f1, f2, (x, y) -> x + y)
.take(3).subscribe(f -> System.out.println("sum后zip:"+f));
}
public static void firstElement() {
Flux<String> f1 = Flux.just("str1", "str2", "str3","1","2","3");
Flux<String> f2 = Flux.fromIterable(Arrays.asList("str4", "str5", "str6","4","5","6"));
Flux.first(f1, f2)
.take(3).subscribe(f -> System.out.println("先发布的元素:"+f));
}
/**
* 转换&过滤
*/
public static void skipElement() {
Flux.range(0, 10)
.skip(5)
.subscribe(f -> System.out.println("剩下的元素:"+f));
}
public static void takeElement() {
Flux.range(0, 10)
.take(3)
.subscribe(f -> System.out.println("取走的元素:"+f));
}
public static void filterElement() {
Flux.range(0, 10)
.filter(n -> n < 4)
.subscribe(f -> System.out.println("过滤后的元素:"+f));
}
public static void distinctElement() {
Flux.just(1, 2, 2, 3, 3, 4, 5, 5)
.distinct()
.subscribe(f -> System.out.println("去重后的元素:"+f));
}
public static void mapElement() {
Flux.just(1, 2, 2, 3, 3, 4, 5, 5)
.distinct()
.map(v-> String.valueOf(v+10))
.subscribe(f -> System.out.println("映射后的元素:"+f));
}
public static void flatMapElement() {
//map返回一个对象,flatmap返回一个stream
//一对多的处理,把每个字符串拆成一个个字符,输出,这点map就无法做到。
Flux.just("1", "2", "3", "4")
.flatMap(m -> Mono.just(m).map(c -> Integer.valueOf(c)+10))
.subscribe(f -> System.out.println("映射后的元素:"+f));
Stream.of("a1","a2","a3")
.flatMap(s -> Stream.of(s.split("")))
.forEach(f -> System.out.println("映射后的元素:"+f));
}
/**
* 缓冲操作
*/
public static void bufferElement() {
Flux.just("apple", "orange", "banana", "kiwi", "strawberry")
.buffer(3)
// .flatMap(x ->Flux.fromIterable(x).map(y -> y.toUpperCase()))
.subscribe(f -> System.out.println("缓存后的元素:"+f));
}
/**
* collectList操作
*/
public static void getCollectList() {
Flux<Integer> flux1 = Flux.range(1, 6);
//collectList方法用于将含有多个元素的Flux转换为含有一个元素列表的Mono
flux1.collectList()
.subscribe(f -> System.out.println("组成list后的元素:"+f));
}
public static void getCollectMap(){
Flux<Integer> flux1 = Flux.range(1, 6);
flux1.collectMap(f -> "key" + f)
.subscribe(f -> System.out.println("组成map后的元素:"+f));
}
public static void logicalFlux(){
Flux.range(1, 6)
.any(f -> f < 0)
.subscribe(t -> System.out.println("any逻辑操作:"+t));
Flux.range(1, 6)
.all(f -> f > 0)
.subscribe(t -> System.out.println("all逻辑操作:"+t));
}
public static void main(String args[]){
// createFlux();
// mergeFlux();
// zipFlux();
// firstElement();
// skipElement();
// takeElement();
// filterElement();
// distinctElement();
// mapElement();
// flatMapElement();
// bufferElement();
// getCollectList();
// getCollectMap();
// logicalFlux();
}
}