当前位置: 首页 > 工具软件 > Reactor.js > 使用案例 >

java flux api,JAVA Reactor API 简单使用(Flux和Mono)及WebFlux的应用

白智
2023-12-01

一. Reactor API 简单使用(Flux和Mono)

1. 常用创建Flux及Mono的方式

1.1. 使用just从现有的已知内容和大小的数据创建Flux或Mono

//使用数组创建一个被观察者(生产者,Flux)

Flux.just(new String[]{"hello",",","nihao","!"})

//观察者监听被观察者(消费者)

.subscribe(System.out::print);

//使用可变参数创建Flux

Flux.just("你","好","啊","!")

.subscribe(System.out::print);

//使用just创建Mo

Mono.just("asd").subscribe(System.out::println);

1.2. 使用fromIterable从可迭代对象中创建Flux

//从可迭代的对象中创建Flux

Flux.fromIterable(Arrays.asList("你好",",","fromIter","!"))

.subscribe(System.out::print);

var list = new ArrayList(List.of("你","好"));

Flux flux = Flux.fromIterable(list);

list.add("啊");//在创建Flux后追加元素

flux.subscribe(System.out::print);//这里输出: 你好啊

1.3. 使用fromStream从集合流中创建Flux

//流也可以是Arrays.asList("a", "b").stream()等方式返回的流

Flux.fromStream(Stream.of("从","流","中创建","Flux!"))

.subscribe(System.out::println);

1.4. 使用range中创建一个范围内迭代的Flux

Flux.range(0,10).subscribe(System.out::print);

1.5. 使用interval创建间隔某一时间异步执行的Flux

Flux.interval(Duration.ofMillis(100))

//限制执行10次

.take(10)

.subscribe(System.out::print);

//避免主线程提前结束

Thread.sleep(1100);

1.6. 从Mono转化而来的Flux

Mono.just("asd").flux().subscribe(System.out::print);

1.7. 从多个Mono组合而来的Flux

Mono.just("Mono1").concatWith(Mono.just("---Mono2"))

.subscribe(System.out::println);

1.8. 使用generate动态创建Flux只能执行一次的Flux

// 同步动态创建,next 只能被调用一次

Flux.generate(sink -> {

sink.next("第一次");

//第二次会报错:

//java.lang.IllegalStateException: More than one call to onNext

//sink.next("第二次");

sink.complete();

}).subscribe(System.out::print);

1.9. 使用create动态创建Flux可以执行多次的Flux,及Mono

// 同步动态创建,next 能被调用多次

Flux.create(sink -> {

for (int i = 0; i < 10; i++) {

sink.next("现在的次数:" + i);

}

sink.complete();

}).subscribe(System.out::println);

// 同步动态创建Mono

Mono.create(sink->{

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

sink.success("by create");

}).subscribe(System.out::println);

1.10. 使用fromCallable动态创建Mono

Mono.fromCallable(() -> {

Thread.sleep(1000);

return "asd";

}).subscribe(System.out::println);

2. 异常处理

//直接创建一个包含异常的Flux

Flux.error(new Exception());

//直接创建一个包含异常的Mono

Mono.error(new Exception());

Mono.just("mono")

//连接一个包含异常的Mono

.concatWith(Mono.error(new Exception("myExc")))

//异常监听

.doOnError(error -> System.out.println("错误: "+ error))

//在发生异常时将其入参传递给订阅者

.onErrorReturn("-excRet")

.subscribe(System.out::println);

/*最终输出:

mono

错误: java.lang.Exception: myExc

-excRet

*/

3. 常用方法

3.1. 使用concatWith合并及concatWithValues追加

//合并多个Mono为一个Flux

Mono.just("Mono1").concatWith(Mono.just("---Mono2"))

.subscribe(System.out::print);

//连接多个Flux

Flux.just("连接")

//连接两个Flux

.concatWith(Flux.just("两个"))

//将元素追加到Flux

.concatWithValues("或追加")

.subscribe(System.out::print);

3.2. 使用zipWith组合为元素

// 结合为元祖,两个取其端的那个,长的那个多余的被舍弃

Flux s1 = Flux.just("s1-0", "s1-1","s1-2");

Flux s2 = Flux.just("s2-0", "s2-1");

s1.zipWith(s2)

.subscribe(tuple -> System.out.println(tuple.getT1() + " -> " + tuple.getT2()));

3.3. 使用skip跳过元素

Flux.just(1,2,3,4,5)

//跳过前2两个

.skip(2)

//输出: 345

.subscribe(System.out::print);

3.4. 使用take截取元素

Flux just = Flux.just("截取", "前几个", "元素");

//截取前两个元素组成新的flux,不改变原flux

Flux take = just.take(2);

//输出: 截取前几个

take.subscribe(System.out::print);

System.out.println("\n=====");

//输出: 截取前几个元素

just.subscribe(System.out::print);

3.5. 使用filter过滤元素

Flux.just(1,2,3,4,5,6,7,8,9)

//过滤偶数

.filter(i->i%2==0)

//输出: 2468

.subscribe(System.out::print);

3.6. 使用distinct去重元素

//默认去重

Flux.just(1,1,2,2,3,3)

//去重

.distinct()

//输出: 123

.subscribe(System.out::print);

//将要去重的自定义的类

class MyClass{

public int key;

public String val;

MyClass(int k, String v){

key=k;val=v;

}

public String toString(){

return String.format("{%d, %s} ",key,val);

}

}

Flux.just(new MyClass(1,"asd"),new MyClass(1,"asdf"),new MyClass(2,"asd"))

//自定义对象的比较键(参与比较的字段)

.distinct(s->s.key)

//输出: {1, asd} {2, asd}

.subscribe(System.out::print);

3.7. 延迟执行(异步)

Flux.just("这是","延迟","执行")

//在一秒后输出: 这是延迟执行

.delayElements(Duration.ofSeconds(1)).subscribe(System.out::print);

Thread.sleep(1100);

3.8. 从Flux获取首个元素

Flux just = Flux.just("这是", "next", "执行");

//获取第一个元素为Mono,原Flux中的元素不变

Mono next = just.next();

//输出: 这是

next.subscribe(System.out::println);

System.out.println("=========");

//输出: 这是next执行

just.subscribe(System.out::print);

3.9. 从Flux阻塞式取一个元素

Flux flux = Flux.create(skin -> {

for(int i=0;i<2;++i){

try {

Thread.sleep(100);

} catch (InterruptedException e) {

e.printStackTrace();

}

skin.next("这是第"+i+"个元素");

}

skin.complete();

});

//flux订阅者所有操作都是无副作用的,即不会改变原flux对象数据

//阻塞式订阅,只要有一个元素进入Flux

String first = flux.blockFirst();

//输出: 这是第0个元素

System.out.println(first);

//还是输出: 这是第0个元素

System.out.println(flux.blockFirst());

//输出: 这是第1个元素

System.out.println(flux.blockLast());

//还是输出: 这是第1个元素

System.out.println(flux.blockLast());

3.10. Flux与Mono之间的相互转换

Flux flux = Flux.just("asd","asd");

//自定义收集器转换为Mono

Mono> collect = flux.collect(Collectors.toList());

//或使用默认收集器转换为Mono

Mono> collect2 = flux.collectList();

//将Mono转换为仅有一个元素的Flux

Flux> flux2 = collect.flux();

//将只有一个元素的Flux转换为Mono

Flux.just("1").single().subscribe(System.out::println);

3.11. 最终始终会执行的函数

Flux.just("asd", "qwe").concatWith(Flux.error(new Exception()))

//当流程完成后的第一件事,由于由异常这里不执行

.doOnComplete(()-> System.out.println("数据组装完成"))

.doFinally(t-> System.out.println("最后执行的:"+t))

.subscribe(System.out::println);

/*最终输出:

asd

qwe

最后执行的:onError

*/

4. 常用监听

每次创建监听都会返回一个新的Flux对象,监听也在新的Flux对象

4.1. 监听每次消费

Flux flux = Flux.just("asd", "qwe");

//每次消费前做什么,入参是将要消费的元素

flux = flux.doOnNext(s -> System.out.println("当前消费:" + s));

flux.subscribe(System.out::println);

/*输出为:

当前消费:asd

asd

当前消费:qwe

qwe

*/

4.2. 监听流程完成或错误

//监听正常流程完成

Flux.just("1","2").doOnComplete(()-> System.out.println("ok"))

.subscribe(System.out::println);

/*最终输出:

1

2

ok

*/

4.3. 监听消费者

//消费者参与前的最后一件事,入参为消费者对象

Flux.just("1","2").doOnSubscribe(System.out::println).subscribe(System.out::println);

/*最终输出:

reactor.core.publisher.FluxArray$ArraySubscription@66d18979

1

2

*/

5. 背压简单使用

使用 Subscription::request 主动控制订阅量

5.1 原始的 Subscriber::onNext

//生产者每10毫秒生产一个

Flux.interval(Duration.ofMillis(10))

//消费者每50毫秒消费一个

.subscribe(new Subscriber<>() {

Subscription subscription;

AtomicInteger count = new AtomicInteger(0);

@Override

public void onSubscribe(Subscription subscription) {

this.subscription = subscription;

subscription.request(5);//首先请求5个

count.set(5);

}

@Override

public void onNext(Long val) {

System.out.print(" val:"+val);

try {

//消费者每100毫秒消费一个

Thread.sleep(100);

} catch (InterruptedException e) {

e.printStackTrace();

}

if(count.decrementAndGet()<=0){

System.out.println(" 搞完,重新请求5个");

subscription.request(5);

count.set(5);

}

}

@Override

public void onError(Throwable throwable) {

}

@Override

public void onComplete() {

}

});

Thread.sleep(5000);

/*输出:

val:0 val:1 val:2 val:3 val:4 搞完,重新请求5个

val:5 val:6 val:7 val:8 val:9 搞完,重新请求5个

val:10 val:11 val:12 val:13 val:14 搞完,重新请求5个

val:15 val:16 val:17 val:18 val:19 搞完,重新请求5个

val:20 val:21 val:22 val:23 val:24 搞完,重新请求5个

......

*/

5.2. 使用 BaseSubscriber 简化操作

Flux.range(1,5).log().subscribe(new BaseSubscriber<>() {

private int count = 0;

private final int limit = 2;

@Override

protected void hookOnSubscribe(Subscription subscription) {

request(limit);

}

@Override

protected void hookOnNext(Integer value) {

if (++count == limit) {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

request(count);

count = 0;

}

}

});

/*日志输出:

16:42:39.401 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)

16:42:39.404 [main] INFO reactor.Flux.Range.1 - | request(2)

16:42:39.404 [main] INFO reactor.Flux.Range.1 - | onNext(1)

16:42:39.404 [main] INFO reactor.Flux.Range.1 - | onNext(2)

16:42:40.418 [main] INFO reactor.Flux.Range.1 - | request(2)

16:42:40.418 [main] INFO reactor.Flux.Range.1 - | onNext(3)

16:42:40.418 [main] INFO reactor.Flux.Range.1 - | onNext(4)

16:42:41.419 [main] INFO reactor.Flux.Range.1 - | request(2)

16:42:41.419 [main] INFO reactor.Flux.Range.1 - | onNext(5)

16:42:41.420 [main] INFO reactor.Flux.Range.1 - | onComplete()

*/

5.3. 通过 limitRate 进一步简化并链式操作

Flux.interval(Duration.ofMillis(100)).take(5)

.log()

//每次取2个

.limitRate(2)

.subscribe();

Thread.sleep(1000);

/*日志输出:

16:46:30.627 [main] INFO reactor.Flux.Take.1 - onSubscribe(FluxTake.TakeSubscriber)

16:46:30.630 [main] INFO reactor.Flux.Take.1 - request(2)

16:46:30.747 [parallel-1] INFO reactor.Flux.Take.1 - onNext(0)

16:46:30.840 [parallel-1] INFO reactor.Flux.Take.1 - onNext(1)

16:46:30.840 [parallel-1] INFO reactor.Flux.Take.1 - request(2)

16:46:30.936 [parallel-1] INFO reactor.Flux.Take.1 - onNext(2)

16:46:31.048 [parallel-1] INFO reactor.Flux.Take.1 - onNext(3)

16:46:31.048 [parallel-1] INFO reactor.Flux.Take.1 - request(2)

16:46:31.143 [parallel-1] INFO reactor.Flux.Take.1 - onNext(4)

16:46:31.144 [parallel-1] INFO reactor.Flux.Take.1 - onComplete()

*/

二. Spring中webflux的应用

1. 传统Controller的方式应用

后端 JAVA 代码

package com.example.wefluxdemo.web;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PathVariable;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Mono;

import java.util.HashMap;

@RestController

@RequestMapping("/hello")

public class TestHelloController {

@GetMapping("/1")

public Mono hello1(){

return Mono.create(sink -> {

var map = new HashMap();

map.put("haha","hehe");

map.put("wuwu","yingying");

sink.success(map);

});

}

@GetMapping("/2")

public Mono hello2(String s){

return Mono.just(String.format("{\"s\":\"%s\"}",s));

}

@GetMapping("/3/{s}")

public Mono hello3(@PathVariable String s){

var map = new HashMap();

map.put("txt",s);

return Mono.just(map);

}

}

前端 js 调用服务代码

async function get(url){

const resp = await fetch(url)

if(resp.ok){

console.log(await resp.json())

}

}

//测试请求

get("/hello/1");//{"haha":"hehe","wuwu":"yingying"}

get("/hello/2?s=qwe");//{"s":"qwe"}

get("/hello/3/asd");//{"txt":"asd"}

2. Router和Handler的方式应用

后端 JAVA 代码

package com.example.wefluxdemo.web;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.web.reactive.function.server.*;

import reactor.core.publisher.Mono;

import java.util.HashMap;

@Configuration

public class TestRouterAndHandler {

//处理方法正常应该和路由方法分在不同的类中

public Mono test1(ServerRequest request){

var map = new HashMap();

map.put("返回值","Mono");

map.put("形参","约束必须为ServerRequest");

map.put("获取查询参数s",request.queryParam("s").get());

//输出: 当前的SessionId:4fa0eea8-3f44-4e85-b501-2c05970876c2

request.session().subscribe(s-> System.out.println("当前的SessionId:"+s.getId()));

return ServerResponse.ok().bodyValue(map);

}

@Bean

public RouterFunction test1Router(){

//路由/test/1的处理方法为test1

return RouterFunctions.route(RequestPredicates.GET("/test/1"),this::test1);

}

}

前端 js 调用服务代码

get("/test/1?s=zxc");

/*控制台输出:

{

"返回值": "Mono",

"获取查询参数s": "zxc",

"形参": "约束必须为ServerRequest"

}

*/

 类似资料: