publishOn 和 subscribeOn。这两个方法的作用是指定执行 Reactive Streaming 的 Scheduler(可简单理解为线程池)
为何需要指定执行 Scheduler 呢?组成一个反应式流的代码有快有慢如果将这些功能都放在一个线程里执行,快的就会被慢的影响,所以根据需要相互隔离
Scheduler:
Schedulers.elastic(): 调度器会动态创建工作线程,线程数无上界
单一的可复用的线程,通过 Schedulers.single()方法来创建。
省略Schedulers其他方法
@Override
public void run(ApplicationArguments args) throws Exception {
Flux.range(1, 6)
.doOnRequest(n -> log.info("Request {} number", n))
.publishOn(Schedulers.elastic())//指定elastic线程
.doOnComplete(() -> log.info("Publisher COMPLETE 1"))
.map(i -> {
log.info("Publish {}, {}", Thread.currentThread(), i);
// return 10 / (i - 3);故意创造错误调用onErrorResume、onErrorReturn
return i;
})
.doOnComplete(() -> log.info("Publisher COMPLETE 2"))
.subscribeOn(Schedulers.single())//指定single线程
.onErrorResume(e -> {
log.error("Exception {}", e.toString());
return Mono.just(-1);
})
// .onErrorReturn(-1)
.subscribe(i -> log.info("Subscribe {}: {}", Thread.currentThread(), i),
e -> log.error("error {}", e.toString()),
() -> log.info("Subscriber COMPLETE")//,
// s -> s.request(4)
);
Thread.sleep(2000);
}
输出结果
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.1.2.RELEASE)
2021-02-22 15:40:18.981 INFO 6500 --- [ main] g.s.d.s.SimpleReactorDemoApplication : Starting SimpleReactorDemoApplication on LAPTOP-2BP9NM9R with PID 6500 (D:\GeekTime\practice\Prac5\ReactiveRedis\target\classes started by 86138 in D:\GeekTime\practice\Prac5\ReactiveRedis)
2021-02-22 15:40:18.984 INFO 6500 --- [ main] g.s.d.s.SimpleReactorDemoApplication : No active profile set, falling back to default profiles: default
2021-02-22 15:40:19.419 INFO 6500 --- [ main] g.s.d.s.SimpleReactorDemoApplication : Started SimpleReactorDemoApplication in 0.705 seconds (JVM running for 1.771)
2021-02-22 15:40:19.452 INFO 6500 --- [ single-1] g.s.d.s.SimpleReactorDemoApplication : Request 256 number
2021-02-22 15:40:19.454 INFO 6500 --- [ elastic-2] g.s.d.s.SimpleReactorDemoApplication : Publish Thread[elastic-2,5,main], 1
2021-02-22 15:40:19.454 INFO 6500 --- [ elastic-2] g.s.d.s.SimpleReactorDemoApplication : Subscribe Thread[elastic-2,5,main]: -5
2021-02-22 15:40:19.454 INFO 6500 --- [ elastic-2] g.s.d.s.SimpleReactorDemoApplication : Publish Thread[elastic-2,5,main], 2
2021-02-22 15:40:19.454 INFO 6500 --- [ elastic-2] g.s.d.s.SimpleReactorDemoApplication : Subscribe Thread[elastic-2,5,main]: -10
2021-02-22 15:40:19.454 INFO 6500 --- [ elastic-2] g.s.d.s.SimpleReactorDemoApplication : Publish Thread[elastic-2,5,main], 3
2021-02-22 15:40:19.457 ERROR 6500 --- [ elastic-2] g.s.d.s.SimpleReactorDemoApplication : Exception java.lang.ArithmeticException: / by zero
2021-02-22 15:40:19.459 INFO 6500 --- [ elastic-2] g.s.d.s.SimpleReactorDemoApplication : Subscribe Thread[elastic-2,5,main]: -1
2021-02-22 15:40:19.459 INFO 6500 --- [ elastic-2] g.s.d.s.SimpleReactorDemoApplication : Subscriber COMPLETE
Process finished with exit code 0
简单说,两者的区别在于影响范围。publishOn 影响在其之后的 operator执行的线程池,而 subscribeOn 则会从源头影响整个执行过程。所以,publishOn 的影响范围和它的位置有关,而 subscribeOn的影响范围则和位置无关
subscribeOn 定义在publishOn之后,但是却从源头开始生效。而在 publishOn执行之后,线程池变更为 publishOn 所定义的
程序执行顺序与线程有关,不单单只是从上到下的执行顺序了————视频作者
//注入ReactiveStringRedisTemplate
@Autowired
private ReactiveStringRedisTemplate redisTemplate;
//omit
Flux.fromIterable(list)
.publishOn(Schedulers.single())
.doOnComplete(() -> log.info("list ok"))
.flatMap(c -> {
log.info("try to put {},{}", c.getName(), c.getPrice());
return hashOps.put(KEY, c.getName(), c.getPrice().toString());
})
.doOnComplete(() -> log.info("set ok"))
.concatWith(redisTemplate.expire(KEY, Duration.ofMinutes(1)))
.doOnComplete(() -> log.info("expire ok"))
.onErrorResume(e -> {
log.error("exception {}", e.getMessage());
return Mono.just(false);
})
.subscribe(b -> log.info("Boolean: {}", b),
e -> log.error("Exception {}", e.getMessage()),
() -> cdl.countDown());
@Autowired
private ReactiveMongoTemplate reactiveMongoTemplate;
private CountDownLatch countDownLatch = new CountDownLatch(2);
@Override
public void run(ApplicationArguments args) throws Exception {
startFromInsertion(() -> {
log.info("Runnable");
decreaseHighPrice();
});
log.info("after starting");
countDownLatch.await();
}
//开始插入方法
private void startFromInsertion(Runnable runnable) {
reactiveMongoTemplate.insertAll(initCoffee()).publishOn(Schedulers.elastic()).doOnNext(c -> log.info("Next: {}", c))
.doOnComplete(runnable).doFinally(s -> {
countDownLatch.countDown();
log.info("Finnally 1, {}", s);
}).count().subscribe(c -> log.info("Insert {} records", c));
}
//打折方法
private void decreaseHighPrice() {
reactiveMongoTemplate.updateMulti(query(where("price").gte(3000L)), new Update().inc("price", -500L).currentDate("updateTime"), Coffee.class)
.doFinally(s -> {
countDownLatch.countDown();
log.info("Finnally 2, {}", s);
}).subscribe(r -> log.info("Result is {}", r));
}
就是通过ReactiveMongoTemplate封装好的方法,本例insertAll进行操作
通过r2dbc一个在研发中的抽象,就是视频时尚未成熟,所以不写了