这篇文章将详细介绍 RxPy 的重大升级版本 RxPy 3.1.0 |
开篇:
之前我写了一篇博客: python 之【超详细讲解 python 反应式编程】,介绍了什么是反应式编程,并以 RxPy 1.6 版本为基础详细地介绍了 RxPy 的使用方法。但随着 RxPy 的不断发展和升级,RxPy 经历了 2.x 版本并最终迎来了 3.x 的重大升级版本。3.x 版本对 Rx 的使用方式做出了很大改变,比如:
RxPy 3.x 的变化:
下面就通过例子来给大家讲解一下,3.x 版本的操作带来了哪些变化!(为了方便理解,我会使用 v1 代替 RxPy 1.x 版本,使用 v3 表示 RxPy 3.x 版本)
☕️ 更改了 Observable 对象的创建方式
在 v1 版本中创建 Observable 序列对象使用的 rx 里面的 Observable 模块来创建,比如:
from rx import Observable
def push_strings(observer):
observer.on_next("Alpha")
observer.on_next("Beta")
source = Observable.create(push_strings)
source.subscribe(lambda i: print("Received data: {0}".format(i)))
这里的 create 操作符相当于 Observable 类的一个方法,但在 v3 版本中则不再使用 Observable 模块来创建了,而是把这些工厂操作符封装在 rx 模块下一个个独立的方法,然后可以直接独立拿来使用,比如:
from rx import create
def push_strings(observer, scheduler):
observer.on_next("Alpha")
observer.on_next("Beta")
source = create(push_strings)
source.subscribe(lambda i: print("Received data: {0}".format(i)))
并且提供给 create 操作符的订阅函数 push_strings 也变成了两个参数了:一个 observer 对象和一个 scheduler 调度器。这个调度器参数是新实现的参数,如果在进行订阅 subscribe() 的时候,提供了 scheduler 参数,则这个 scheduler 就会传递给 create 操作符的订阅函数中,如果没有提供则默认为 None。
改变了链式操作符的组装方式
在 v1 版本中,组装链式操作符的方式为一连串的点"."拼接操作,这些操作符的使用方式就像是调用 Observable 对象的一个方法一样,比如:
from rx import Observable
Observable.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon") \
.map(lambda s: len(s)) \
.filter(lambda i: i >= 5) \
.subscribe(lambda value: print("Received {0}".format(value)))
但在 v3 版本中,则改成基于 pipe 操作符来进行链式组装。并且 pipe 操作符已经成为了 Observable 类的唯一方法了,换句话说,创建 Observable 对象的工厂操作符后面只能使用 pipe 这个操作符来进行数据的处理。
比如下面的例子:
import rx
from rx import operators
rx.of("Alpha", "Beta", "GammaRay").pipe(
operators.map(lambda s: len(s)),
operators.filter(lambda i: i >= 5)
).subscribe(lambda data: print("Received data: {0}".format(data)))
>>>>>
Received data: 5
Received data: 8
从这个例子中我们可以看出,v3 版本中,工厂操作符和非工厂操作符被划分为了两个不同的模块:
同时,从这个例子中我们也可以看出,非工厂操作符都封装在了 pipe 操作符中,使用逗号分隔。这种方式带来的改变是巨大的,正式 pipe 操作符的到来,使链式操作带来了很大的便利,比如,我们可以自己使用函数来对 pipe 进行封装:
import rx
from rx import operators
def filter_string_length():
return rx.pipe(
operators.map(lambda s: len(s)),
operators.filter(lambda i: i >= 5)
)
rx.of("Alpha", "Beta", "GammaRay").pipe(
filter_string_length()
).subscribe(lambda data: print("Received data: {0}".format(data)))
>>>>>
Received data: 5
Received data: 8
从上面的例子可以看出,我们可以把多个操作符封装成一个函数(实际上是把多个操作符包装成一个独立的 pipe 操作符),然后把函数直接放到 pipe 操作符当中。当然 pipe 中可以放入多个这种类型的函数,这就实现了我们所说的模块化编程,这些都是得益于 pipe 操作符的强大之处。
自定义新的操作符
在 v3 版本中,还允许我们自己创建一个新的操作符,允许我们完全控制订阅立逻辑和数据项的发射逻辑。比如下面的例子:
import rx
def lowercase():
# source 为传进来 Observable
def observable_handle(source):
def subscribe(observer, scheduler=None):
# 重写 on_next 来达到处理数据项的目的
def on_next(value):
observer.on_next(value.lower())
return source.subscribe(
on_next, # 替换为自己的 on_next
observer.on_error,
observer.on_completed,
scheduler
)
# 返回一个自定义的新的 Observable
return rx.create(subscribe)
return observable_handle
rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
lowercase()
).subscribe(lambda data: print("Received data: {0}".format(data)))
>>>>>
Received data: alpha
Received data: beta
Received data: gamma
Received data: delta
Received data: epsilon
在这个例子当中我们自定义了一个将字符串变成小写字母的操作符,自定义操作符是通过一个函数来实现的,这个函数需要返回一个处理 Observable 对象的函数句柄,比如 observable_handle 就是这样一个句柄,它接收一个传进来的 Observable 对象,然后返回一个新的自定义 Observable,在这个新的 Observable 中我们重写了观察者 oberver 的订阅函数中的 on_next 方法来达到处理每个数据项的目的。(如果想要理解这个例子的原理,需要反复多看几遍)
移除了操作符中的数据项组合函数
在 v3 版本中,操作符里面的组合多个不同 Observable 数据项的函数被移除了,比如 v1 版本中的 zip 操作符:
import operator
from rx import Observable
a = Observable.of(1, 2, 3)
b = Observable.of(1, 2, 3)
a.zip(b, lambda i, j: operator.mul(i, j)) \
.subscribe(lambda data: print("Received data: {0}".format(data)))
>>>>>
Received data: 1
Received data: 4
Received data: 9
这里的 zip 操作符使用了一个 lambda 函数组合了两个 Observable 的数据。但在 v3 版本中则被移除了,改成了如下方式:
import operator
import rx
from rx import operators
a = rx.of(1, 2, 3)
b = rx.of(1, 2, 3)
a.pipe(
operators.zip(b), # 返回一个由 a 和 b 数据项组成的元组
operators.map(lambda z: operator.mul(z[0], z[1]))
).subscribe(lambda data: print("Received data: {0}".format(data)))
>>>>>
Received data: 1
Received data: 4
Received data: 9
当然,例子中的 operators.map(lambda z: operator.mul(z[0], z[1])) 对元组数据的解包可以改成能够自己解包元组数据的 starmap 操作符:operators.starmap(operator.mul),它们的结果都是一样的。
讀 移除了操作符中的 Observable 列表参数
在 v1 版本中,有一些操作符,比如:merge、zip、combine_latest 等等,把 Observable 列表作为参数,然后把列表中的 Observable 进行组合:
from rx import Observable
obs1 = Observable.from_([1, 2])
obs2 = Observable.from_([3, 4])
res = Observable.merge([obs1, obs2])
res.subscribe(lambda data: print("Received data: {0}".format(data)))
>>>>>
Received data: 1
Received data: 2
Received data: 3
Received data: 4
在 v3 版本中则改成了可变参:
import rx
obs1 = rx.of(1, 2)
obs2 = rx.of(3, 4)
res = rx.merge(obs1, obs2)
res.subscribe(lambda data: print("Received data: {0}".format(data)))
>>>>>
Received data: 1
Received data: 2
Received data: 3
Received data: 4
☁️ 移除了阻塞 Observable
在 v1 版本中,我们通常会对 Observable 对象的数据进行阻塞,比如:
from rx import Observable
res = Observable.of(1, 2, 3, 4, 6).to_blocking().last()
print("Received last data: {0}".format(res))
>>>>>
Received last data: 6
这个例子中我们使用 to_blocking().last() 对 Observable 进行阻塞直到最后一个数据。在 v3 版本中,使用的是 run 操作符:
import rx
res = rx.of(1, 2, 3, 4, 6).run()
print("Received last data: {0}".format(res))
>>>>>
Received last data: 6
run 操作符会返回 Observable 发射的最后一个数据,当然还可以应用到其它阻塞操作符中:
☕️ 时间单位的更换
在 v1 版本中,把时间作为单位的操作符,比如:interval、delay、debounce 等等,使用的是毫秒作为单位。而在 v3 版本中变成了秒,比如:interval(1) 表示间隔为1秒。
包名的更改
在 v3 版本中,对一些包进行了重命名,比如:
rx.concurrency >>> rx.scheduler
rx.disposables >>> rx.disposable
rx.subjects >>> rx.subject
同时,先前的包 rx.concurrency.mainloopscheduler 已经被分割成了两部分:rx.scheduler.mainloop 和 rx.scheduler.eventloop
对于 RxPy 3.x 的版本就暂时介绍到这么多,如果想要了解信息可以访问官方文档 RxPy Doc |