当前位置: 首页 > 工具软件 > Reactor.js > 使用案例 >

Reactor3反应式框架Flux和Mono基本用法

阴鸿才
2023-12-01

代码实现

package com.hirain.higale.framework.learnProject;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.stream.Stream;

public class Reactor3 {

        /**
         * 创建操作
         */
        public static void createFlux() {
                // 1.just():可以指定序列中包含的全部元素,创建出来的Flux序列会在发布这些元素之后自动结束
                Flux<String> f1 = Flux.just("str1", "str2", "str3","1","2","3");

                // 这里若输入俩参数,则会报错,Mono是一个或者空序列
                Mono<String> m1 = Mono.just("1");

                // 2.fromIterable()从一个Iterable对象中创建一个Flux对象
                Flux<String> f2 = Flux.fromIterable(Arrays.asList("str4", "str5", "str6","4","5","6"));

                // 3.fromStream()从一个Stream对象中创建一个Flux对象
                ArrayList<Integer> numList = new ArrayList<>(){{add(1); add(2);}};
                Flux<Integer> f3 = Flux.fromStream(numList.stream());

                // 4.fromArray()从一个数组对象中创建一个Flux对象
                Integer[] arr = new Integer[]{1, 2, 3};
                Flux<Integer> f4 = Flux.fromArray(arr);

                // 5.range(start,count) 表示从start开始,递增的生成count个数字,都是int类型的参数
                Flux<Integer> f5 = Flux.range(2, 5);

                // 6.创建一个不包含任何元素,只发布结束消息的序列。
                // 并且这种方式不会进行后续传递,需要switchIfEmpty()方法来进行处理。
                // 因为响应式编程中,流的处理是基于元素的,而empty()是没有元素的!
                Flux<Object> empty = Flux.empty();

                // 7.创建一个只包含错误消息的序列,里面的参数类型是Throwable
                Flux<Object> error = Flux.error(new Exception("error!"));

                // 8.创建一个不包含任何消息通知的序列,注意区别empty(),empty还是会发布结束消息的。
                Flux<Object> never = Flux.never();

                System.out.println("--------------f1---------------");
                f1.subscribe(System.out::println);
                System.out.println("--------------m1---------------");
                m1.subscribe(System.out::println);
                System.out.println("--------------f2---------------");
                f2.subscribe(System.out::println);
                System.out.println("--------------f3---------------");
                f3.subscribe(System.out::println);
                System.out.println("--------------f4---------------");
                f4.subscribe(System.out::println);
                System.out.println("--------------f5---------------");
                f5.subscribe(System.out::println);
                System.out.println("--------------empty---------------");
                empty.subscribe(System.out::println);
                System.out.println("--------------error---------------");
                error.subscribe(System.out::println);
                System.out.println("--------------never---------------");
                never.subscribe(System.out::println);
        }

        /**
         * 联合操作
         */
        public static void mergeFlux() {
                Flux<String> f1 = Flux.just("str1", "str2", "str3","1","2","3");
                Flux<String> f2 = Flux.fromIterable(Arrays.asList("str4", "str5", "str6","4","5","6"));

                f1.mergeWith(f2).subscribe(f -> System.out.println("Here's some number: " + f));
        }

        public static void zipFlux() {
                Flux<String> f1 = Flux.just("str1", "str2", "str3","1","2","3");
                Flux<String> f2 = Flux.fromIterable(Arrays.asList("str4", "str5", "str6","4","5","6"));

                //zip操作将合并两个Flux流,并且生成一个Tuple2对象,Tuple2中包含两个流中同顺序的元素各一个。
                Flux.zip(f1, f2)
                        .take(3).subscribe(f -> System.out.println("直接zip:"+f));
                Flux.zip(f1, f2, (x, y) -> x + y)
                        .take(3).subscribe(f -> System.out.println("sum后zip:"+f));
        }

        public static void firstElement() {
                Flux<String> f1 = Flux.just("str1", "str2", "str3","1","2","3");
                Flux<String> f2 = Flux.fromIterable(Arrays.asList("str4", "str5", "str6","4","5","6"));

                Flux.first(f1, f2)
                        .take(3).subscribe(f -> System.out.println("先发布的元素:"+f));
        }

        /**
         * 转换&过滤
         */
        public static void skipElement() {
                Flux.range(0, 10)
                        .skip(5)
                        .subscribe(f -> System.out.println("剩下的元素:"+f));
        }

        public static void takeElement() {
                Flux.range(0, 10)
                        .take(3)
                        .subscribe(f -> System.out.println("取走的元素:"+f));
        }

        public static void filterElement() {
                Flux.range(0, 10)
                        .filter(n -> n < 4)
                        .subscribe(f -> System.out.println("过滤后的元素:"+f));
        }

        public static void distinctElement() {
                Flux.just(1, 2, 2, 3, 3, 4, 5, 5)
                        .distinct()
                        .subscribe(f -> System.out.println("去重后的元素:"+f));
        }

        public static void mapElement() {
                Flux.just(1, 2, 2, 3, 3, 4, 5, 5)
                        .distinct()
                        .map(v-> String.valueOf(v+10))
                        .subscribe(f -> System.out.println("映射后的元素:"+f));
        }

        public static void flatMapElement() {
                //map返回一个对象,flatmap返回一个stream
                //一对多的处理,把每个字符串拆成一个个字符,输出,这点map就无法做到。
                Flux.just("1", "2", "3", "4")
                        .flatMap(m -> Mono.just(m).map(c -> Integer.valueOf(c)+10))
                        .subscribe(f -> System.out.println("映射后的元素:"+f));

                Stream.of("a1","a2","a3")
                        .flatMap(s -> Stream.of(s.split("")))
                        .forEach(f -> System.out.println("映射后的元素:"+f));
        }

        /**
         * 缓冲操作
         */
        public static void bufferElement() {
                Flux.just("apple", "orange", "banana", "kiwi", "strawberry")
                        .buffer(3)
//                        .flatMap(x ->Flux.fromIterable(x).map(y -> y.toUpperCase()))
                        .subscribe(f -> System.out.println("缓存后的元素:"+f));
        }

        /**
         * collectList操作
         */
        public static void getCollectList() {
                Flux<Integer> flux1 = Flux.range(1, 6);
                //collectList方法用于将含有多个元素的Flux转换为含有一个元素列表的Mono
                flux1.collectList()
                        .subscribe(f -> System.out.println("组成list后的元素:"+f));
        }

        public static void getCollectMap(){
                Flux<Integer> flux1 = Flux.range(1, 6);
                flux1.collectMap(f ->  "key" + f)
                        .subscribe(f -> System.out.println("组成map后的元素:"+f));
        }

        public static void logicalFlux(){
                Flux.range(1, 6)
                        .any(f -> f < 0)
                        .subscribe(t -> System.out.println("any逻辑操作:"+t));

                Flux.range(1, 6)
                        .all(f -> f > 0)
                        .subscribe(t -> System.out.println("all逻辑操作:"+t));
        }


        public  static  void main(String args[]){
//                createFlux();
//                mergeFlux();
//                zipFlux();
//                firstElement();
//                skipElement();
//                takeElement();
//                filterElement();
//                distinctElement();
//                mapElement();
//                flatMapElement();
//                bufferElement();
//                getCollectList();
//                getCollectMap();
//                logicalFlux();
        }
}

 类似资料: