一. Reactor API 简单使用(Flux和Mono)
1. 常用创建Flux及Mono的方式
1.1. 使用just从现有的已知内容和大小的数据创建Flux或Mono
//使用数组创建一个被观察者(生产者,Flux)
Flux.just(new String[]{"hello",",","nihao","!"})
//观察者监听被观察者(消费者)
.subscribe(System.out::print);
//使用可变参数创建Flux
Flux.just("你","好","啊","!")
.subscribe(System.out::print);
//使用just创建Mo
Mono.just("asd").subscribe(System.out::println);
1.2. 使用fromIterable从可迭代对象中创建Flux
//从可迭代的对象中创建Flux
Flux.fromIterable(Arrays.asList("你好",",","fromIter","!"))
.subscribe(System.out::print);
var list = new ArrayList(List.of("你","好"));
Flux flux = Flux.fromIterable(list);
list.add("啊");//在创建Flux后追加元素
flux.subscribe(System.out::print);//这里输出: 你好啊
1.3. 使用fromStream从集合流中创建Flux
//流也可以是Arrays.asList("a", "b").stream()等方式返回的流
Flux.fromStream(Stream.of("从","流","中创建","Flux!"))
.subscribe(System.out::println);
1.4. 使用range中创建一个范围内迭代的Flux
Flux.range(0,10).subscribe(System.out::print);
1.5. 使用interval创建间隔某一时间异步执行的Flux
Flux.interval(Duration.ofMillis(100))
//限制执行10次
.take(10)
.subscribe(System.out::print);
//避免主线程提前结束
Thread.sleep(1100);
1.6. 从Mono转化而来的Flux
Mono.just("asd").flux().subscribe(System.out::print);
1.7. 从多个Mono组合而来的Flux
Mono.just("Mono1").concatWith(Mono.just("---Mono2"))
.subscribe(System.out::println);
1.8. 使用generate动态创建Flux只能执行一次的Flux
// 同步动态创建,next 只能被调用一次
Flux.generate(sink -> {
sink.next("第一次");
//第二次会报错:
//java.lang.IllegalStateException: More than one call to onNext
//sink.next("第二次");
sink.complete();
}).subscribe(System.out::print);
1.9. 使用create动态创建Flux可以执行多次的Flux,及Mono
// 同步动态创建,next 能被调用多次
Flux.create(sink -> {
for (int i = 0; i < 10; i++) {
sink.next("现在的次数:" + i);
}
sink.complete();
}).subscribe(System.out::println);
// 同步动态创建Mono
Mono.create(sink->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
sink.success("by create");
}).subscribe(System.out::println);
1.10. 使用fromCallable动态创建Mono
Mono.fromCallable(() -> {
Thread.sleep(1000);
return "asd";
}).subscribe(System.out::println);
2. 异常处理
//直接创建一个包含异常的Flux
Flux.error(new Exception());
//直接创建一个包含异常的Mono
Mono.error(new Exception());
Mono.just("mono")
//连接一个包含异常的Mono
.concatWith(Mono.error(new Exception("myExc")))
//异常监听
.doOnError(error -> System.out.println("错误: "+ error))
//在发生异常时将其入参传递给订阅者
.onErrorReturn("-excRet")
.subscribe(System.out::println);
/*最终输出:
mono
错误: java.lang.Exception: myExc
-excRet
*/
3. 常用方法
3.1. 使用concatWith合并及concatWithValues追加
//合并多个Mono为一个Flux
Mono.just("Mono1").concatWith(Mono.just("---Mono2"))
.subscribe(System.out::print);
//连接多个Flux
Flux.just("连接")
//连接两个Flux
.concatWith(Flux.just("两个"))
//将元素追加到Flux
.concatWithValues("或追加")
.subscribe(System.out::print);
3.2. 使用zipWith组合为元素
// 结合为元祖,两个取其端的那个,长的那个多余的被舍弃
Flux s1 = Flux.just("s1-0", "s1-1","s1-2");
Flux s2 = Flux.just("s2-0", "s2-1");
s1.zipWith(s2)
.subscribe(tuple -> System.out.println(tuple.getT1() + " -> " + tuple.getT2()));
3.3. 使用skip跳过元素
Flux.just(1,2,3,4,5)
//跳过前2两个
.skip(2)
//输出: 345
.subscribe(System.out::print);
3.4. 使用take截取元素
Flux just = Flux.just("截取", "前几个", "元素");
//截取前两个元素组成新的flux,不改变原flux
Flux take = just.take(2);
//输出: 截取前几个
take.subscribe(System.out::print);
System.out.println("\n=====");
//输出: 截取前几个元素
just.subscribe(System.out::print);
3.5. 使用filter过滤元素
Flux.just(1,2,3,4,5,6,7,8,9)
//过滤偶数
.filter(i->i%2==0)
//输出: 2468
.subscribe(System.out::print);
3.6. 使用distinct去重元素
//默认去重
Flux.just(1,1,2,2,3,3)
//去重
.distinct()
//输出: 123
.subscribe(System.out::print);
//将要去重的自定义的类
class MyClass{
public int key;
public String val;
MyClass(int k, String v){
key=k;val=v;
}
public String toString(){
return String.format("{%d, %s} ",key,val);
}
}
Flux.just(new MyClass(1,"asd"),new MyClass(1,"asdf"),new MyClass(2,"asd"))
//自定义对象的比较键(参与比较的字段)
.distinct(s->s.key)
//输出: {1, asd} {2, asd}
.subscribe(System.out::print);
3.7. 延迟执行(异步)
Flux.just("这是","延迟","执行")
//在一秒后输出: 这是延迟执行
.delayElements(Duration.ofSeconds(1)).subscribe(System.out::print);
Thread.sleep(1100);
3.8. 从Flux获取首个元素
Flux just = Flux.just("这是", "next", "执行");
//获取第一个元素为Mono,原Flux中的元素不变
Mono next = just.next();
//输出: 这是
next.subscribe(System.out::println);
System.out.println("=========");
//输出: 这是next执行
just.subscribe(System.out::print);
3.9. 从Flux阻塞式取一个元素
Flux flux = Flux.create(skin -> {
for(int i=0;i<2;++i){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
skin.next("这是第"+i+"个元素");
}
skin.complete();
});
//flux订阅者所有操作都是无副作用的,即不会改变原flux对象数据
//阻塞式订阅,只要有一个元素进入Flux
String first = flux.blockFirst();
//输出: 这是第0个元素
System.out.println(first);
//还是输出: 这是第0个元素
System.out.println(flux.blockFirst());
//输出: 这是第1个元素
System.out.println(flux.blockLast());
//还是输出: 这是第1个元素
System.out.println(flux.blockLast());
3.10. Flux与Mono之间的相互转换
Flux flux = Flux.just("asd","asd");
//自定义收集器转换为Mono
Mono> collect = flux.collect(Collectors.toList());
//或使用默认收集器转换为Mono
Mono> collect2 = flux.collectList();
//将Mono转换为仅有一个元素的Flux
Flux> flux2 = collect.flux();
//将只有一个元素的Flux转换为Mono
Flux.just("1").single().subscribe(System.out::println);
3.11. 最终始终会执行的函数
Flux.just("asd", "qwe").concatWith(Flux.error(new Exception()))
//当流程完成后的第一件事,由于由异常这里不执行
.doOnComplete(()-> System.out.println("数据组装完成"))
.doFinally(t-> System.out.println("最后执行的:"+t))
.subscribe(System.out::println);
/*最终输出:
asd
qwe
最后执行的:onError
*/
4. 常用监听
每次创建监听都会返回一个新的Flux对象,监听也在新的Flux对象
4.1. 监听每次消费
Flux flux = Flux.just("asd", "qwe");
//每次消费前做什么,入参是将要消费的元素
flux = flux.doOnNext(s -> System.out.println("当前消费:" + s));
flux.subscribe(System.out::println);
/*输出为:
当前消费:asd
asd
当前消费:qwe
qwe
*/
4.2. 监听流程完成或错误
//监听正常流程完成
Flux.just("1","2").doOnComplete(()-> System.out.println("ok"))
.subscribe(System.out::println);
/*最终输出:
1
2
ok
*/
4.3. 监听消费者
//消费者参与前的最后一件事,入参为消费者对象
Flux.just("1","2").doOnSubscribe(System.out::println).subscribe(System.out::println);
/*最终输出:
reactor.core.publisher.FluxArray$ArraySubscription@66d18979
1
2
*/
5. 背压简单使用
使用 Subscription::request 主动控制订阅量
5.1 原始的 Subscriber::onNext
//生产者每10毫秒生产一个
Flux.interval(Duration.ofMillis(10))
//消费者每50毫秒消费一个
.subscribe(new Subscriber<>() {
Subscription subscription;
AtomicInteger count = new AtomicInteger(0);
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(5);//首先请求5个
count.set(5);
}
@Override
public void onNext(Long val) {
System.out.print(" val:"+val);
try {
//消费者每100毫秒消费一个
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(count.decrementAndGet()<=0){
System.out.println(" 搞完,重新请求5个");
subscription.request(5);
count.set(5);
}
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});
Thread.sleep(5000);
/*输出:
val:0 val:1 val:2 val:3 val:4 搞完,重新请求5个
val:5 val:6 val:7 val:8 val:9 搞完,重新请求5个
val:10 val:11 val:12 val:13 val:14 搞完,重新请求5个
val:15 val:16 val:17 val:18 val:19 搞完,重新请求5个
val:20 val:21 val:22 val:23 val:24 搞完,重新请求5个
......
*/
5.2. 使用 BaseSubscriber 简化操作
Flux.range(1,5).log().subscribe(new BaseSubscriber<>() {
private int count = 0;
private final int limit = 2;
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(limit);
}
@Override
protected void hookOnNext(Integer value) {
if (++count == limit) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
request(count);
count = 0;
}
}
});
/*日志输出:
16:42:39.401 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
16:42:39.404 [main] INFO reactor.Flux.Range.1 - | request(2)
16:42:39.404 [main] INFO reactor.Flux.Range.1 - | onNext(1)
16:42:39.404 [main] INFO reactor.Flux.Range.1 - | onNext(2)
16:42:40.418 [main] INFO reactor.Flux.Range.1 - | request(2)
16:42:40.418 [main] INFO reactor.Flux.Range.1 - | onNext(3)
16:42:40.418 [main] INFO reactor.Flux.Range.1 - | onNext(4)
16:42:41.419 [main] INFO reactor.Flux.Range.1 - | request(2)
16:42:41.419 [main] INFO reactor.Flux.Range.1 - | onNext(5)
16:42:41.420 [main] INFO reactor.Flux.Range.1 - | onComplete()
*/
5.3. 通过 limitRate 进一步简化并链式操作
Flux.interval(Duration.ofMillis(100)).take(5)
.log()
//每次取2个
.limitRate(2)
.subscribe();
Thread.sleep(1000);
/*日志输出:
16:46:30.627 [main] INFO reactor.Flux.Take.1 - onSubscribe(FluxTake.TakeSubscriber)
16:46:30.630 [main] INFO reactor.Flux.Take.1 - request(2)
16:46:30.747 [parallel-1] INFO reactor.Flux.Take.1 - onNext(0)
16:46:30.840 [parallel-1] INFO reactor.Flux.Take.1 - onNext(1)
16:46:30.840 [parallel-1] INFO reactor.Flux.Take.1 - request(2)
16:46:30.936 [parallel-1] INFO reactor.Flux.Take.1 - onNext(2)
16:46:31.048 [parallel-1] INFO reactor.Flux.Take.1 - onNext(3)
16:46:31.048 [parallel-1] INFO reactor.Flux.Take.1 - request(2)
16:46:31.143 [parallel-1] INFO reactor.Flux.Take.1 - onNext(4)
16:46:31.144 [parallel-1] INFO reactor.Flux.Take.1 - onComplete()
*/
二. Spring中webflux的应用
1. 传统Controller的方式应用
后端 JAVA 代码
package com.example.wefluxdemo.web;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import java.util.HashMap;
@RestController
@RequestMapping("/hello")
public class TestHelloController {
@GetMapping("/1")
public Mono hello1(){
return Mono.create(sink -> {
var map = new HashMap();
map.put("haha","hehe");
map.put("wuwu","yingying");
sink.success(map);
});
}
@GetMapping("/2")
public Mono hello2(String s){
return Mono.just(String.format("{\"s\":\"%s\"}",s));
}
@GetMapping("/3/{s}")
public Mono hello3(@PathVariable String s){
var map = new HashMap();
map.put("txt",s);
return Mono.just(map);
}
}
前端 js 调用服务代码
async function get(url){
const resp = await fetch(url)
if(resp.ok){
console.log(await resp.json())
}
}
//测试请求
get("/hello/1");//{"haha":"hehe","wuwu":"yingying"}
get("/hello/2?s=qwe");//{"s":"qwe"}
get("/hello/3/asd");//{"txt":"asd"}
2. Router和Handler的方式应用
后端 JAVA 代码
package com.example.wefluxdemo.web;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.*;
import reactor.core.publisher.Mono;
import java.util.HashMap;
@Configuration
public class TestRouterAndHandler {
//处理方法正常应该和路由方法分在不同的类中
public Mono test1(ServerRequest request){
var map = new HashMap();
map.put("返回值","Mono");
map.put("形参","约束必须为ServerRequest");
map.put("获取查询参数s",request.queryParam("s").get());
//输出: 当前的SessionId:4fa0eea8-3f44-4e85-b501-2c05970876c2
request.session().subscribe(s-> System.out.println("当前的SessionId:"+s.getId()));
return ServerResponse.ok().bodyValue(map);
}
@Bean
public RouterFunction test1Router(){
//路由/test/1的处理方法为test1
return RouterFunctions.route(RequestPredicates.GET("/test/1"),this::test1);
}
}
前端 js 调用服务代码
get("/test/1?s=zxc");
/*控制台输出:
{
"返回值": "Mono",
"获取查询参数s": "zxc",
"形参": "约束必须为ServerRequest"
}
*/