当前位置: 首页 > 工具软件 > real-interval > 使用案例 >

rxjava : interval、 intervalRange 、timer 、range、 defer

祁景山
2023-12-01
  • interval():创建一个按固定时间间隔发射整数序列的Observable,类似于TimerTask定时器的功能;

  • timer():新版本的timer()只是用来创建一个Observable,并延迟发送一次的操作符;

  • delay():延迟一段指定的时间再发送来自Observable的发送结果,常规使用跟timer()一致,区别在于delay()是用于流中的操作,跟map()、flatMap()的级别是一样的。而timer()是用于创建Observable,跟just()、from()的级别是一样的。

示例:

//interval
//每隔一段时间就会发送一个事件,这个事件是从0开始,不断增1的数字。
//心跳,周期执行
public void interval() {
    Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
    Disposable disposable = observable.subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            System.out.println("aLong==============" + aLong);
        }
    });
}
//aLong==============0
//aLong==============1
//aLong==============2
//aLong==============3
//aLong==============4
//aLong==============5
//aLong==============6
//aLong==============7
//aLong==============8
//aLong==============9
//aLong==============10
//.........



//intervalRange
//可以指定发送事件的开始值和数量,其他与 interval() 的功能一样。
//延迟 initialDelay 时间,每隔 period 时间执行 , 从 start 开始 执行 count 次
//从0开始发射11个数字为:0-10依次输出,延时0s执行,每1s发射一次。
private void intervalRange() {
    Disposable disposable2 = Observable.intervalRange(0, 11, 0, 1, TimeUnit.SECONDS)
            .doOnNext(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println("倒计时=================" + (10 - aLong));
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("倒计时完成======================");
                }
            })
            .subscribe();
}


//timer
//当到指定时间后就会发送一个 0L 的值给观察者。
//延时执行,只执行一次
private void timer() {
    Disposable disposable1 = Observable.timer(10, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println("timer============" + aLong);
                }
            });
}


//延迟操作,和 time 不同的是 delay 不能创建出 observable 对象来
private void delay() {
    Disposable disposable = Observable.just(1, 2, 3, 4, 5, 6)
            .delay(1, TimeUnit.SECONDS)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("integer==================" + integer);
                }
            });
}
//integer==================1
//integer==================2
//integer==================3
//integer==================4
//integer==================5
//integer==================6




//range
//同时发送一定范围的事件序列,相当于for循环
private void range() {
    Disposable disposable = Observable.range(1, 10)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("range============" + integer);
                }
            });
}


//defer
//这个方法的作用就是直到被观察者被订阅后才会创建被观察者。
private Integer i = 100;
private void defer() {
    Observable<Integer> observable = Observable.defer(
    					new Callable<ObservableSource<? extends Integer>>() {
        @Override
        public ObservableSource<? extends Integer> call() throws Exception {
            return Observable.just(i);
        }
    });
    Consumer<Integer> consumer = new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println("range============" + integer);
        }
    };

    i = 200;
    Disposable subscribe1 = observable.subscribe(consumer);
    //range============200

    i = 300;
    Disposable subscribe2 = observable.subscribe(consumer);
    //range============300
}
//因为 defer() 只有观察者订阅的时候才会创建新的被观察者,
//所以每订阅一次就会打印一次,并且都是打印 i 最新的值。
 类似资料: