create 方法
Observable.create(emitter -> emitter.onNext(saveFile()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(o ->
{
}, throwable ->{
}
);
just 方法
Observable.just(saveFile())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(o ->
{
}, throwable ->{
}
);
fromArray 方法:
Observable.fromArray(new int[]{1,2,3,4,5,6})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
defer方法:
只有调用subscribe之后才会生成对象。
Observable.defer(() -> Observable.just(saveFile())).subscribe()
interval:相当于定时器
Observable.interval(1000, TimeUnit.DAYS)
.subscribe();
range: 范围从一个范围到另一个范围
Observable.range(1,5).subscribe();
repeat: 重复发射
Observable.just().repeat(2).subscribe();
timer:定时每隔多久重复一次
Observable.timer()
RxJava操作符:
map:把一个对象转为另一个对象。
Observable.just(123).map(integer -> String.valueOf(integer))
.subscribe(s -> System.out.println(s.getClass()), throwable -> {
});
FlatMap:返回一个Observable
Observable.just(123).flatMap(integer -> Observable.just(integer+"a"))
.subscribe(s -> System.out.println(s.getClass()), throwable -> {
});
GroupBy: 分组
Observable.just(1,2,3).groupBy(integer -> integer>2).subscribe(integerGroupedObservable ->
integerGroupedObservable.subscribe(integer -> System.out.println(integer+"group"+integerGroupedObservable.getKey()))
, throwable -> {});
Buffer: 一次指定发射数据量
Observable.range(1, 7).buffer(5).subscribe(integers -> System.out.println(integers), throwable -> {
});
Scan:将数据累加‘
Observable.range(1, 5).scan((integer, integer2) -> integer + integer2).subscribe(integer -> System.out.println(integer), throwable -> {
});
Window: 将制定数据串成链表 返回值是Observable
Observable.range(1,6).window(3).subscribe(integerObservable ->{
integerObservable.subscribe(integer -> {
System.out.println(integer);
});
}, throwable -> {});