我们平时写的代码大多数都是串行的,也就是传统编程,就会带来了以下的问题:
引入了异步编程,然后也会带来了一下问题:
官方文档:Reactor 3 Reference Guide
后边引入了Reactive Programming概念,出现了Rxjava,Reactor等,其实他们都是用的同一个api的,也就是Reactor-streams。spring5 webFlux 其实就是用的Reactor的API,所以本文主要介绍Reactor的。
在 Java 程序中使用 Reactor 库非常的简单,只需要通过 Maven 或 Gradle 来添加对 io.projectreactor:reactor-core 的依赖即可,下面是我这次演示引用的包。
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.2.3.RELEASE</version>
<scope>test</scope>
</dependency>
Flux 和 Mono 是 Reactor 中的两个基本概念。其实Flux 和 Mono 之间可以进行转换,对一个 Flux 序列进行计数操作,得到的结果是一个 Mono对象。把两个 Mono 序列合并在一起,得到的是一个 Flux 对象。下面我主要是介绍了flux的api使用。
private static void simple() {
System.out.println("*******");
Flux.just("Hello", "World").subscribe(System.out::println);
Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
Flux.intervalMillis(1000).subscribe(System.out::println);
}
private static void generate() {
System.out.println("*******");
Flux.generate(sink -> {
sink.next("Hello");
sink.complete();
}).subscribe(System.out::println);
final Random random = new Random();
Flux.generate(ArrayList::new, (list, sink) -> {
int value = random.nextInt(100);
list.add(value);
sink.next(value);
if (list.size() == 10) {
sink.complete();
}
return list;
}).subscribe(System.out::println);
}
private static void create() {
System.out.println("*******");
Flux.create(sink -> {
for (int i = 0; i < 10; i++) {
sink.next(i);
}
sink.complete();
}).subscribe(System.out::println);
}
private static void buffer() {
Flux.range(1, 100).buffer(10).subscribe(System.out::println);
Flux.intervalMillis(100).bufferMillis(1001).take(1).toStream().forEach(System.out::println);
Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);
Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);
Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);
}
private static void filter() {
Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);
}
private static void window() {
Flux.range(1, 100).window(20).subscribe(System.out::println);
Flux.intervalMillis(100).windowMillis(1001).take(2).toStream().forEach(System.out::println);
}
private static void zipWith() {
Flux.just("a", "b")
.zipWith(Flux.just("c", "d"))
.subscribe(System.out::println);
Flux.just("a", "b")
.zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2))
.subscribe(System.out::println);
}
private static void take() {
Flux.range(1, 1000).take(10).subscribe(System.out::println);
Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);
Flux.range(1, 1000).takeWhile(i -> i < 10).subscribe(System.out::println);
Flux.range(1, 1000).takeUntil(i -> i == 10).subscribe(System.out::println);
}
private static void reduce() {
Flux.range(1, 100).reduce(Integer::sum).subscribe(System.out::println);
Flux.range(1, 100).reduceWith(() -> 100, Integer::sum).subscribe(System.out::println);
}
Flux.merge(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))
.toStream()
.forEach(System.out::println);
Flux.mergeSequential(Flux.intervalMillis(0, 10000).take(5), Flux.intervalMillis(50, 100).take(5))
.toStream()
.forEach(System.out::println);
private static void flatMap() {
Flux.just(5, 10)
.flatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))
.toStream()
.forEach(System.out::println);
Flux.just(5, 10)
.flatMapSequential(x -> Flux.intervalMillis(x * 10, 100).take(x))
.toStream()
.forEach(System.out::println);
}
private static void concatMap() {
Flux.just(5, 10)
.concatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))
.toStream()
.forEach(System.out::println);
}
private static void combineLatest() {
Flux.combineLatest(
Arrays::toString,
Flux.intervalMillis(100).take(5),
Flux.intervalMillis(50, 10000).take(5)
).toStream().forEach(System.out::println);
Flux.intervalMillis(100).take(1).toStream().forEach(System.out::println);
}
private static void subscribe() {
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.subscribe(System.out::println, System.err::println);
}
private static void onErrorReturn() {
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.onErrorReturn(0)
.subscribe(System.out::println);
}
private static void switchOnError() {
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.switchOnError(Flux.just(1,3))
.subscribe(System.out::println);
}
private static void onErrorResumeWith() {
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalArgumentException()))
.onErrorResumeWith(e -> {
if (e instanceof IllegalStateException) {
return Mono.just(0);
} else if (e instanceof IllegalArgumentException) {
return Mono.just(-1);
}
return Mono.empty();
})
.subscribe(System.out::println);
}
private static void retry() {
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.retry(1)
.subscribe(System.out::println,System.err::println);
}
public static void main(String[] args) {
Flux.create(sink -> {
sink.next(Thread.currentThread().getName());
sink.complete();
})
.publishOn(Schedulers.single())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x)).log()
.publishOn(Schedulers.elastic())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x)).log()
.subscribeOn(Schedulers.parallel())
.toStream()
.forEach(System.out::println);
}
public void simpleTest() {
StepVerifier.create(Flux.just("a", "b"))
.expectNext("a")
.expectNext("b")
.verifyComplete();
}
public void testWithTime() {
StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofHours(4), Duration.ofDays(1)).take(2))
.expectSubscription()
.expectNoEvent(Duration.ofHours(4))
.expectNext(0L)
.thenAwait(Duration.ofDays(1))
.expectNext(1L)
.verifyComplete();
}
public void withTestPublisher() {
final TestPublisher<String> testPublisher = TestPublisher.create();
testPublisher.next("a");
testPublisher.next("b");
testPublisher.complete();
StepVerifier.create(testPublisher)
.expectNext("a")
.expectNext("b")
.expectComplete();
}
public static void main(String[] args) {
Hooks.onOperator(providedHook -> providedHook.operatorStacktrace());
Flux.just(1, 0).map(x -> 1 / x).checkpoint("test").log("wdh").subscribe(System.out::println);
// Flux.just(1, 0).map(x -> 1 / x).subscribe(System.out::println);
}
public static void main(final String[] args) {
Flux.range(1, 2).log("wangdehui").subscribe(System.out::println);
}
public static void main(String[] args) throws InterruptedException {
final Flux<Long> source = Flux.intervalMillis(1000)
.take(10)
.publish()
.autoConnect();
source.subscribe();
Thread.sleep(5000);
source.toStream().forEach(System.out::println);
}