当前位置: 首页 > 工具软件 > RXPY > 使用案例 >

【Python】 之深入讲解 RxPy 3.x 版本的重大改变

蓬意致
2023-12-01
这篇文章将详细介绍 RxPy 的重大升级版本 RxPy 3.1.0

开篇:


之前我写了一篇博客: python 之【超详细讲解 python 反应式编程】,介绍了什么是反应式编程,并以 RxPy 1.6 版本为基础详细地介绍了 RxPy 的使用方法。但随着 RxPy 的不断发展和升级,RxPy 经历了 2.x 版本并最终迎来了 3.x 的重大升级版本。3.x 版本对 Rx 的使用方式做出了很大改变,比如:

  • 操作符的链式操作改成了由 pip 操作符来集成构建
  • 可以自定义实现新的操作符
  • 操作链可以指定默认的调度器 scheduler
  • 通过补全支持更好地集成在 IDE 里面



RxPy 3.x 的变化:


下面就通过例子来给大家讲解一下,3.x 版本的操作带来了哪些变化!(为了方便理解,我会使用 v1 代替 RxPy 1.x 版本,使用 v3 表示 RxPy 3.x 版本

☕️ 更改了 Observable 对象的创建方式


在 v1 版本中创建 Observable 序列对象使用的 rx 里面的 Observable 模块来创建,比如:

from rx import Observable

def push_strings(observer):
    observer.on_next("Alpha")
    observer.on_next("Beta")

source = Observable.create(push_strings)

source.subscribe(lambda i: print("Received data: {0}".format(i)))

这里的 create 操作符相当于 Observable 类的一个方法,但在 v3 版本中则不再使用 Observable 模块来创建了,而是把这些工厂操作符封装在 rx 模块下一个个独立的方法,然后可以直接独立拿来使用,比如:

from rx import create

def push_strings(observer, scheduler):
    observer.on_next("Alpha")
    observer.on_next("Beta")

source = create(push_strings)

source.subscribe(lambda i: print("Received data: {0}".format(i)))

并且提供给 create 操作符的订阅函数 push_strings 也变成了两个参数了:一个 observer 对象和一个 scheduler 调度器。这个调度器参数是新实现的参数,如果在进行订阅 subscribe() 的时候,提供了 scheduler 参数,则这个 scheduler 就会传递给 create 操作符的订阅函数中,如果没有提供则默认为 None。


 改变了链式操作符的组装方式


在 v1 版本中,组装链式操作符的方式为一连串的点"."拼接操作,这些操作符的使用方式就像是调用 Observable 对象的一个方法一样,比如:

from rx import Observable

Observable.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon") \
    .map(lambda s: len(s)) \
    .filter(lambda i: i >= 5) \
    .subscribe(lambda value: print("Received {0}".format(value)))

但在 v3 版本中,则改成基于 pipe 操作符来进行链式组装。并且 pipe 操作符已经成为了 Observable 类的唯一方法了,换句话说,创建 Observable 对象的工厂操作符后面只能使用 pipe 这个操作符来进行数据的处理。

比如下面的例子:

import rx
from rx import operators

rx.of("Alpha", "Beta", "GammaRay").pipe(
    operators.map(lambda s: len(s)),
    operators.filter(lambda i: i >= 5)
).subscribe(lambda data: print("Received data: {0}".format(data)))

>>>>>
Received data: 5
Received data: 8

从这个例子中我们可以看出,v3 版本中,工厂操作符和非工厂操作符被划分为了两个不同的模块:

  • 工厂操作符(即创建 Observable 对象的操作符),比如:create、of、interval 等等被划分到了 rx 模块下,成为了 rx 的一个独立方法
  • 非工厂操作符(即处理 Observable 对象里面的数据的操作符),比如:map、filter 等等被划分到了 rx 下的 operators 模块下,即这些操作符属于 operators 模块下的一个独立方法

同时,从这个例子中我们也可以看出,非工厂操作符都封装在了 pipe 操作符中,使用逗号分隔。这种方式带来的改变是巨大的,正式 pipe 操作符的到来,使链式操作带来了很大的便利,比如,我们可以自己使用函数来对 pipe 进行封装:

import rx
from rx import operators

def filter_string_length():
    return rx.pipe(
        operators.map(lambda s: len(s)),
        operators.filter(lambda i: i >= 5)
    )

rx.of("Alpha", "Beta", "GammaRay").pipe(
    filter_string_length()
).subscribe(lambda data: print("Received data: {0}".format(data)))

>>>>>
Received data: 5
Received data: 8

从上面的例子可以看出,我们可以把多个操作符封装成一个函数(实际上是把多个操作符包装成一个独立的 pipe 操作符),然后把函数直接放到 pipe 操作符当中。当然 pipe 中可以放入多个这种类型的函数,这就实现了我们所说的模块化编程,这些都是得益于 pipe 操作符的强大之处。


 自定义新的操作符


在 v3 版本中,还允许我们自己创建一个新的操作符,允许我们完全控制订阅立逻辑和数据项的发射逻辑。比如下面的例子:

import rx

def lowercase():
    # source 为传进来 Observable
    def observable_handle(source):
        def subscribe(observer, scheduler=None):
            # 重写 on_next 来达到处理数据项的目的
            def on_next(value):
                observer.on_next(value.lower())

            return source.subscribe(
                on_next,    # 替换为自己的 on_next
                observer.on_error,
                observer.on_completed,
                scheduler
            )
        # 返回一个自定义的新的 Observable
        return rx.create(subscribe)

    return observable_handle

rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
    lowercase()
).subscribe(lambda data: print("Received data: {0}".format(data)))

>>>>>
Received data: alpha
Received data: beta
Received data: gamma
Received data: delta
Received data: epsilon

在这个例子当中我们自定义了一个将字符串变成小写字母的操作符,自定义操作符是通过一个函数来实现的,这个函数需要返回一个处理 Observable 对象的函数句柄,比如 observable_handle 就是这样一个句柄,它接收一个传进来的 Observable 对象,然后返回一个新的自定义 Observable,在这个新的 Observable 中我们重写了观察者 oberver 的订阅函数中的 on_next 方法来达到处理每个数据项的目的。(如果想要理解这个例子的原理,需要反复多看几遍


 移除了操作符中的数据项组合函数


在 v3 版本中,操作符里面的组合多个不同 Observable 数据项的函数被移除了,比如 v1 版本中的 zip 操作符:

import operator
from rx import Observable

a = Observable.of(1, 2, 3)
b = Observable.of(1, 2, 3)

a.zip(b, lambda i, j: operator.mul(i, j)) \
    .subscribe(lambda data: print("Received data: {0}".format(data)))

>>>>>
Received data: 1
Received data: 4
Received data: 9

这里的 zip 操作符使用了一个 lambda 函数组合了两个 Observable 的数据。但在 v3 版本中则被移除了,改成了如下方式:

import operator
import rx
from rx import operators

a = rx.of(1, 2, 3)
b = rx.of(1, 2, 3)

a.pipe(
    operators.zip(b),   # 返回一个由 a 和 b 数据项组成的元组
    operators.map(lambda z: operator.mul(z[0], z[1]))
).subscribe(lambda data: print("Received data: {0}".format(data)))

>>>>>
Received data: 1
Received data: 4
Received data: 9

当然,例子中的 operators.map(lambda z: operator.mul(z[0], z[1])) 对元组数据的解包可以改成能够自己解包元组数据的 starmap 操作符:operators.starmap(operator.mul),它们的结果都是一样的。


讀 移除了操作符中的 Observable 列表参数


在 v1 版本中,有一些操作符,比如:merge、zip、combine_latest 等等,把 Observable 列表作为参数,然后把列表中的 Observable 进行组合:

from rx import Observable

obs1 = Observable.from_([1, 2])
obs2 = Observable.from_([3, 4])

res = Observable.merge([obs1, obs2])
res.subscribe(lambda data: print("Received data: {0}".format(data)))

>>>>>
Received data: 1
Received data: 2
Received data: 3
Received data: 4

在 v3 版本中则改成了可变参:

import rx

obs1 = rx.of(1, 2)
obs2 = rx.of(3, 4)

res = rx.merge(obs1, obs2)
res.subscribe(lambda data: print("Received data: {0}".format(data)))

>>>>>
Received data: 1
Received data: 2
Received data: 3
Received data: 4

☁️ 移除了阻塞 Observable


在 v1 版本中,我们通常会对 Observable 对象的数据进行阻塞,比如:

from rx import Observable

res = Observable.of(1, 2, 3, 4, 6).to_blocking().last()
print("Received last data: {0}".format(res))

>>>>>
Received last data: 6

这个例子中我们使用 to_blocking().last() 对 Observable 进行阻塞直到最后一个数据。在 v3 版本中,使用的是 run 操作符:

import rx

res = rx.of(1, 2, 3, 4, 6).run()
print("Received last data: {0}".format(res))

>>>>>
Received last data: 6

run 操作符会返回 Observable 发射的最后一个数据,当然还可以应用到其它阻塞操作符中:

  • obs.pipe(operations.first()).run()
  • obs.pipe(operations.to_list()).run()

☕️ 时间单位的更换


在 v1 版本中,把时间作为单位的操作符,比如:interval、delay、debounce 等等,使用的是毫秒作为单位。而在 v3 版本中变成了秒,比如:interval(1) 表示间隔为1秒。


 包名的更改


在 v3 版本中,对一些包进行了重命名,比如:

rx.concurrency >>> rx.scheduler
rx.disposables >>> rx.disposable
rx.subjects >>> rx.subject

同时,先前的包 rx.concurrency.mainloopscheduler 已经被分割成了两部分:rx.scheduler.mainloop 和 rx.scheduler.eventloop



对于 RxPy 3.x 的版本就暂时介绍到这么多,如果想要了解信息可以访问官方文档 RxPy Doc
 类似资料: