Rx被用于生成异步数据的流程和序列
Observable像推送,订阅者被动接收消息
Iterable像拉取,订阅者主动拉取消息
将异步调用本身定义为 Observable。
对 onNext 的调用通常称为项目的“emissions”,而对 onComplete 和 onError 的调用称为“notifications”。
unsubscription并不能保证立即取消订阅,即使没有subscriber,observable仍有可能在一段时间内生成并尝试发射items
Observable类型:
ColdObservable:直到有Observer订阅它才会开始发射
HotObservable:创建后立即开始发射,不管有没有被订阅
ConnectableObservable:其connect方法被调用后才会开始发射,与是否被订阅无关
链式操作
##Observable Contract##
其实dispose就是退订
对于同一个Observable的多个Observer,后来订阅的Observer可能收到以前所有的item,可能只收到此后的item,也可能收到跟前面的Observer完全不一样的item序列,这都由Observable决定
背压:如果一个Observable支持背压并且订阅它的Observer使用了背压,Observable不会在订阅后立即开始发出item,而是先发出onSubscribe通知。
Observer收到onSubscribe通知后可以发出request通知给Observable,请求一定数量的item。若该Observable不支持背压会直接发出onError通知去响应请求
Observable自己定义策略去处理过量的item,丢弃、保留等等
操作符分类
##创建Observable##
**以下在rxjava3中均为静态方法,除了repeat**
Create:传入你自定义的ObservableSource,每有一个Observer订阅都会走一遍subscribe(一般是相同的逻辑),所以一般每个Observer会收到相同的item序列。每个Observer的emitter不一样
惰性计算,订阅之后才会调用代理函数,而且是最新item; just, from 方法发出的都是创建Observable时候的item
defer:每有一个observer,都会用你指定的工厂函数为其生成一个新的Observable并让该observer订阅它。
Empty/Never/Throw:不发送任何item,分别是 正常终止(onCompleted)/永不终止也不发出/异常终止(onError),一般作为测试用。
just:只发送作为参数的一个item(object),可以发送null。
from:传递数组、Iterable、Future、Action、Callable、Runnable等作为参数,转换为Observable,订阅后依次拉取并传递item。
interval:通过给定的时间间隔创建一个Observable,每经过给定的时间间隔发送从0开始的递增的整数;可以用来替代计时器。未指定首次延迟是订阅前开始计算延迟;指定首次延迟是订阅后计算延迟并发出
range:发出给定范围内的连续整数。若数量参数传入0则不发出item,若传入负数则会抛出异常
start:将参数函数的返回值作为item发出。其实就是from里的那些方法
timer:根据给定延迟发出单个item,订阅后开始计算延迟
repeat:通过现有的Observable实例重复发出该Observable的item(s),可以指定重复的次数
##转换Observable##
转换Observable发出的items的操作符
**以下在rxjava3中均为实例方法**
buffer:定期将items收集到bundles,一次发出一个bundle
flatMap:应用你指定的函数将源Observable发出的每个item转换成Observable并返回,flatMap合并这些Observable的发射并通过他们生成自己的序列发射。可能乱序,这样可以将itemA转换为itemB
内部原理:源Observable每发出一个item(onNext),用传入的function将其转换成Observable,生成InnerObserver并订阅它,在InnerObserver的onNext里调用MergeObserver的tryEmit其内调用持有的源Observer的onNext
对应关系:源Observable每发出一个item,转换后的Observable可能会发出多个item,由你指定函数的内部实现决定(函数生成的Observable可发出多个item)
concatMap:flatMap的有序版
groupBy:将源Observable发出的items通过你指定的函数中的规则分组到GroupedObservable里面,然后将GroupedObservable作为item发出,每一个GroupedObservable仅能有一个观察者订阅它
注意:你应该对你不关心的GroupedObservable调用类似于ignoreElements的操作符,让它们丢弃其缓存的items
map:生成一个Observable,将源Observable发出的每个item应用你指定的函数,然后将函数返回的结果作为item发出,也就是说源Observable发出的item与转换后的Observable发出的item是一对一的关系
scan:通过上次转换Observable发出的item与源Observable发出的item,你的给定函数利用这两个item做一些计算产生一个新的item并返回,然后发出,以此反复;通常这种操作符也被称为累加器
特殊情况:源Observable发出的第一个item不做处理,直接发出
window:将源Observable发出的你指定的数量的item放入一个Observable(内部包含一个window,源item由window发给观察者)里,转换Observable将这些Observable(window)作为item发出
##过滤Observable##
。。。Observable可以阻塞。。。。。。
**以下在rxjava3中均为实例方法**
debounce:丢弃后面有紧跟着的item的item
实现:当源Observable发出一个item,由用户指定超时时间的计时器开始计时,如果规定时间内有下一个item发出,则丢弃前一个item并重新计时。
distinct:过滤掉重复的item,有的重载函数可以只过滤连续重复的item
distinctUntilChanged:发送源Observable发射的与直接前驱不同的item
elementAt:返回一个只发出源Observable指定下标(从0开始)item的Observable
filter:返回一个Observable,只发出源Observable发出的满足 判断式/布尔值函数 (Predicate) 的item
first:返回一个Observable,只发出源Observable发出的第一个item
ignoreElements:忽略源Observable发出的任何item,只传递发出其终止通知(onError和onCompleted)
last:只发出源Observable发出的最后一个item。有时作为阻塞Observable使用
sample:对源Observable发出的items定期采样,只发出每个周期中最后一个item。一个周期采样一个
skip:跳过前n个item
skipLast:跳过后n个item
take:只选取前n个item发出
takeLast:只选取后n个item发出
##结合Observable##
zip:应用你指定的函数将各个Observable发出的第n个item合并在一起,最终发出的item的数量与发出最少item的源Observable发出的item数量相同 **静态方法**
combineLatest:从每个源Observable都发出了至少一个item开始,应用你指定的函数将每个源Observable最近发出的item结合到一起。每有一个Observable发出item,都会触发 **静态方法**
join:传入另一个Observable;当ObservableA(B)发出item时,如果ObservableB(A)在你为ObservableA(B)定义的时间窗口内发出了item,则应用你指定的函数合并这两个item并返回
merge:合并多个Observable的item到一个Observable里,没有任何转换,可能会乱序,当有onError通知时,会立即传递并终止合并后的Observable。与flatMap内部原理类似 **静态方法**
startWith:在源Observable开始发射前发出你指定的item序列。prepend
switch:发出源Observable最近发出的Observable发出的item。每次只订阅一个最新发出的Observable,有Observable发出时前面的将会被退订,这可能会导致某些item被丢弃
##错误处理操作符##
catch(onErrorResumeNext):当源Observable发出onError通知时,通过你指定的函数返回的Observable恢复数据流。就是内部让Observer转而订阅函数返回的Observable
retry:如果源Observable发出onError通知则重新订阅源Observable,重新从头开始发送item。onError前面的item会被重复发送
##Utility操作符##
delay:源Observable每发出一个item都会延迟你指定的时间再发送
delaySubscription:延迟指定时间再订阅
do:注册对各种Observable生命周期事件采取的行动。例如doOnNext就是在源Observable调用Observer的onNext之前调用你指定的函数
materialize/dematerialize:materialize将源Observable发出的item和通知(onError和onCompleted)都作为item发送,dematerialize逆转了这个过程
observeOn:指定observer将会在哪个Scheduler上面观察该Observable,即在哪个Scheduler上调用observer的onNext、onError、onCompleted方法。指定Observer执行线程
如果observeOn在操作符链中,则是指定此observeOn之后、下一个(如果有)observeOn之前的所有Observable的执行线程
注意如果Scheduler是异步的,会立即转发onError通知,如果遇到慢消耗的observer,会跳过之前发出的item。##应该怎么用?##
serialize:强制当前Observable的发射和通知为序列化的(串行化),例如同步在两个线程中的发射和通知(可能是并发的)
subscribe:将Observer连接到该Observable,Observer可以接收Observable发出的item和通知
subscribeOn:指定起初的Observable将会在哪个Scheduler上面执行它的工作。指定源Observable操作线程
timeInterval:源Observable发出一个item时,结果Observable发出此item距离上个item的时间间隔量。对于第一个item,时间间隔从订阅开始算起
timeout:镜像源Observable,如果经过你指定的时间跨度,源Observable没有发出任何item,则发出onError通知
timestamp:给源Observable发出的每个item加一个时间戳并发送
using:创建一个与Observable有着相同生命周期的resource(其实就是一个实体),当Observable终止时该resource也会被扔掉(dispose) **静态方法**
##条件和布尔操作符##
all:判断所有的item是否满足给定判断式,返回一个Observable发出true或false
amb:只传递第一个发出item或通知的Observable的发射。 **静态方法**
contains:判断源Observable是否包含指定item,返回一个Observable发出true或false
defaultIfEmpty:镜像源Observable,但如果源Observable未发出任何item只发出onComplete通知,则发出你指定的默认item
sequenceEqual:判断两个Observable的发射是否完全相同(item,顺序,终止状态),返回一个Observable发出true或false。 **静态方法**
skipUntil:你指定的Observable发出item前,跳过源Observable的发射
skipWhile:满足你指定的条件前跳过源Observable的发射
takeUntil:你指定的Observable发出iten或通知前,传递源Observable的发射
takeWhile:满足你指定的条件前传递源Observable的发射
##数学和聚合操作符##
average:计算源Observable发出的item的平均值并发出。**在rxjava3里没看到该方法**
concat:挨个源Observable发出该Observable发出的所有item,按顺序进行。append **静态方法**
count:计算源Observable发出的item总数并发出
max:确定源Observable发出的item的最大值并发出。**在rxjava3里没看到该方法**
min:确定源Observable发出的item的最小值并发出。**在rxjava3里没看到该方法**
reduce:将你给定的函数应用在源Observable发出的每个item,然后将该结果与下一个item一起应用到给定函数,直到源Observable发出onComplete通知,将最终的唯一结果发出。
也称为聚合,折叠,累加,压缩,或注入
sum:计算源Observable发出的item的总和并发出。**在rxjava3里没看到该方法**
##背压操作符##
。。。针对Observable产生item的速度大于Observer消耗item的速度的策略。。。
toFlowable:通过应用你指定的背压策略将当前的Observable转换为Flowable
##Connectable Observable操作符##
。。。更加精确地控制订阅动态。。。
connect:使用此操作符后Connectable Observable才会发出items,与是否被订阅无关。
通过这种方式,你可以在所有预期Observer订阅Observable之后再让Observable发出item
publish:将一个普通的Observable转换为Connectable Observable
refCount/share:让一个Connectable Observable表现得像一个普通的Observable。内部直接订阅源Connectable Observable,有Observer订阅时使用connect操作符,所有Observer退订后再内部退订
replay:返回一个Connectable Observable,确保每个Observer都可以接收相同完整的发射序列,即使是在Observable发出item后订阅。还是得使用connect操作符。可以指定缓冲区大小
##转换Observable为对象或数据结构操作符##
to:将Observable转换为另一个对象或者数据结构。有可能会阻塞
例如:blockingIterable,blockingLatest,blockingMostRecent,blockingNext,to,toFuture,toList,toMap等
Single:是Observable的一个变体,只发出一个item或者一个错误通知,对应onSuccess和onError,只能二选一
Subject:既是Observable又是Observer。例如可以将cold Observable变为hot Observable(自己作为)。通过hide方法隐藏作为Observer的接口,变为单纯的Observable
AsyncSubject:在onComplete之后发出最后一个item,然后发出onComplete通知。如果一个item都没有或者以onError结束则只发出最后的通知
BehaviorSubject:BehaviorSubject被Observer订阅后,先发出一个最近发出过的item,再继续原来的发射。如果还没有发出过任何item,则发出你指定的默认值。如果已经终止了,则只发出终止通知
PublishSubject:只会发出在订阅后,源Observable发出的item,所以之前的item可能会漏掉,类似于组播非粘性消息
ReplaySubject:发出源Observable发出的所有item(包括之前的),不管Observer什么时候订阅,类似于组播粘性消息
UnicastSubject:单播粘性消息