当前位置: 首页 > 工具软件 > Streams > 使用案例 >

Reactive Streams规范

韩安顺
2023-12-01

Reactive Streams的目的是提供一个带有 非阻塞背压 特征的 异步流处理 标准

最新的发布版本如下

<dependency>
  <groupId>org.reactivestreams</groupId>
  <artifactId>reactive-streams</artifactId>
  <version>1.0.4</version>
</dependency>
<dependency>
  <groupId>org.reactivestreams</groupId>
  <artifactId>reactive-streams-tck</artifactId>
  <version>1.0.4</version>
  <scope>test</scope>
</dependency>

Goals, Design and Scope

在异步系统中处理数据流需要特别小心,尤其是容量不是预先确定的“实时”数据。 最突出的问题是,需要仔细控制资源消耗,以致快速生成的数据源不会压垮数据流的目标。 为了并行使用计算资源(在多台协作的网络主机一台机器内的多个CPU核心上),异步是必须的。

Reactive Streams的主要目标是管理跨异步边界(指将元素传递到另一个线程或线程池)流数据的交换,同时确保接收方不会被迫缓冲任意数量的数据。 换言之,背压是该模型的一个组成部分,以允许在线程之间进行数据缓冲的队列有界,如果背压信号是同步的,异步处理的好处将被否定(另请参见[Reactive宣言](http://reactivemanifesto.org/))。 因此,我们严格要求Reactive Streams实现的所有方面都具有完全的非阻塞和异步行为。

本规范的目的是让不同的实现具有相同的功能。 这些实现通过遵守Reactive Streams规范,在流处理应用程序的整个流程中将能够顺利地进行互操作,并保留上述优点和特征。

应该注意的是,流操作(转换、拆分、合并等)的精确性质不在本规范的范围内。Reactive Streams只涉及在不同的API组件之间调解数据流。在其开发过程中,Reactive Streams规范确保可以表达所有组合流的基本方式。

总之,Reactive Streams是JVM面向流库的标准和规范

  • 处理潜在的无限数量的数据
  • 按顺序处理
  • 在组件之间异步传递元素
  • 带有强制非阻塞背压

Reactive Streams规范包含以下部分:

API 指定了实现Reactive Streams的类型以及实现不同实现之间的互操作性.

Technology Compatibility Kit (TCK) 是用于实现一致性测试的标准测试套件.

任意实现只要符合API要求并通过TCK中的测试,就可以自由实现规范中未涵盖的其他功能。

API Components

The API 中包含以下需要被Reactive Stream提供的组件:

  1. Publisher
  2. Subscriber
  3. Subscription
  4. Processor

Publisher 是一个可能无限数量有序元素的提供者,按照从它的Subscriber处收到的需求将这些数据发布出去。

为响应对Publisher.subscribe(Subscriber)的调用,Subscriber上方法可能按照以下顺序调用:

onSubscribe onNext* (onError | onComplete)?

这意味着 onSubscribe 永远是发出信号的;接着可能是无限数量的 onNext 信号 (根据 Subscriber的要求); 如果发生任何错误则会触发一个onError信号;如果没有更多数据可用、且只要 Subscription 没有被取消,则会触发一个onComplete信号。

术语表

术语定义
Signal作为一个名词: onSubscribeonNextonCompleteonErrorrequest(n) 或者 cancel 方法中的一个。 作为一个动词: 调用或者触发一个信号。
Demand一个名词: Subscriber请求的尚未由Publisher交付(完成)的元素总数。 作为一个动词, request请求更多元素的行为。
Synchronous(ly)同步,在调用线程中执行.
Return normally仅向调用方返回声明类型的值。 向Subscriber发出失败信号的唯一合法方法是通过onError方法。
Responsivity准备就绪/响应能力。在本文中,用于表明不同组件不应相互影响响应能力。
Non-obstructing描述在调用线程上尽可能快地执行的方法的质量。例如,避免繁重的计算和其他会使调用方的执行线程停滞的事情。
Terminal state对于Publisher: 当发出onComplete 或者 onError信号时. 对于Pubscriber: 当收到 onComplete 或者 onError 信号时。
NOP空操作指令,对调用线程没有实质影响的执行,因此可以安全地任意调用。
Serial(ly)在 Signal的上下文中,是指不会发生重叠. 在JVM上下文中, 当且仅当一个对象的方法调用之间存在“先发生后发生”关系(也意味着调用不重叠)的时候,方法的调用是串行的。当这些调用异步执行的时候,将使用包含但不限于原子类、监视器或锁等技术来实现线条执行的先后关系。
Thread-safe可以安全地同步或异步调用,无需外部同步来确保程序的正确性。

SPECIFICATION

1. Publisher (Code)

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}
IDRule
1PublisherSubscriber发送信号的onNext总数一定小于等于SubscriberSubscription从前到后请求的元素总数量。
说明这条规范的意图是明确发布者不能发送超过订阅者要求的元素。这条规则有一个隐含但重要的后果:因为需求只有在收到之后才能满足,所以在请求元素和接收元素之间存在一种先发生后发生的关系
2Publisher也许会发出比请求少的onNext,并且通过调用onComplete 或者 onError终止Subscription
说明这条规范的意图是说清楚一个Publisher不能保证他会生成和请求数量一样的元素数据。Publisher可能根本无法生成所有这些数据;Publisher也许处在一个失败的状态;Publisher也许是空的或者已经完成了。
3onSubscribeonNextonError 和 onComplete 信号发送给 Subscriber 必须是按照顺序的 serially.
说明这条规范的意图是当且仅当在建立每个信号之间的先后关系时,才允许发送信号(包括来自多个线程的信号)。
4Publisher 故障的时候必须发送 onError信号。
说明这条规范的意图是说明发布者需要在检测到无法继续工作时通知其订阅者,Subscriber需要提供清理资源的可能性或者其他方式处理Publisher的故障
5如果Publisher成功终止(有限流),则必须发出onComplete信号。
说明这条规范的意图是说明Publisher需要通知它的Subscribers它已经到达终止状态—Subscribers可以收到信号的时候执行一些动作,比如清理资源等。
6如果Publisher发出onError 或 onComplete 信号给到 Subscriber, 那么必须要考虑 Subscriber的 Subscription被取消.
说明这条规范的意图是说明无论Subscription是否被取消, Publisher都会发出onError 或者 onComplete信号。
7一旦终止状态信号被发出来(onErroronComplete),就不要求有任何其他信号发出。
说明这条规范的意图是确保onError 和 onComplete是一对 Publisher/Subscriber之间的最终交互状态。
8如果Subscription被取消,其Subscriber最终必须停止接收信号。
说明这条规范的意图是确保当Subscription.cancel()已经被调用的时候,Publisher慎重对待Subscriber的取消Subscription的请求. 根本的原因是信号可能由于异步而传播延迟。
9Publisher.subscribe 必须在向提供的Subscriber发出任何其他信号之前,调用其onSubscribe,并且必须返回正常return normally;除非提供的Subscriber 是 null,在这种情况下必须向他的调用者返回java.lang.NullPointerException 异常;对于所有其他情况,发出故障信号(或拒绝订阅者)的唯一合法方法是在调用onSubscribe之后调用onError。
说明这条规范的意图是确保onSubscribe永远在其他信号之前被发送出来, 以便信号被收到后,初始化逻辑能够被Subscriber执行。 同样 onSubscribe 必须被最多调用1次, [see 2.12]。 如果提供的 Subscriber 是 null, 除了它的调用者能够接收信号,没有其它任何信号接收者,这就意味着java.lang.NullPointerException必须被抛出给调用者。 可能情况的示例: 受限于有限数量的底层资源、耗尽或处于,一个有状态的Publisher可能会不知如何处理。
10Publisher.subscribe 可以根据需要调用多次,但是每次调用应该使用不同的 Subscriber
说明这条规范的意图是让调用方知道不能重复关联一般的Publisher、Subscriber。此外,它还强制规定,无论subscribe被调用多少次,都必须坚持其语义。
11Publisher可以支持多个Subscriber并决定每个Subscription是单播还是多播。
说明这条规范的意图是为Publisher的实现提供灵活性,以决定他们将支持多少订阅者(如果有的话),以及如何分发元素。

2. Subscriber (Code)

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
IDRule
1Subscriber必须通过Subscription.request(long n) 发送需求去触发接收onNext 信号。
说明这条规范的意图是明确Subscriber有责任决定何时以及能够愿意接收多少元素。为避免由于可重复调用Subscription方法导致的信号重新排序,强烈建议同步的Subscriber实现在所有信号处理的最后调用Subscription方法。建议Subscribers申请自己能够处理的数据量上限,因为一次只请求一个元素会导致固有的低效“停止并等待”
2如果Subscriber可能其对的信号处理将对其Publisher的响应率产生负面影响,建议异步调度其信号。
说明这条规范的意图是从执行角度来看,Subscriber应该不妨碍Publisher的处理。换句话说,Subscriber不应使Publisher无法接收CPU周期。
3Subscriber.onComplete()Subscriber.onError(Throwable t)不能调用Subscription或者Publisher的任何方法。
说明这条规范的意图是在处理完成信号期间,防止Publisher,Subscription和Subscriber之间出现循环和争用情况。
4Subscriber.onComplete()Subscriber.onError(Throwable t)在收到信号后必须要考虑Subscription被取消的情况。
说明这条规范的意图是确保Subscribers关注Publisher的终止状态信号. 在收到onComplete或者onError信号后,Subscription就不应再有效。
5如果Subscriber已经有一个活动的Subscription,在发出onSubscribe信号,Subscriber必须调用SubscriptionSubscription.cancel()方法。
说明这条规范的意图是阻止两个或多个独立的Publishers尝试与同一个Subscriber交互。强制执行此规则,因为将取消额外的订阅,可以防止资源泄漏。未能遵守此规则可能会导致违反Publisher规则1,等。此类违规行为可能导致难以诊断的错误。
6如果Subscription不在需要使用的时候,Subscriber必须调用Subscription.cancel()方法。
说明这条规范的意图是确保Subscribers不能在不需要Subscriptions的时候就将他们抛弃,他们必须调用cancel方法以便Subscription所持有的资源被安全、及时的释放掉。例如,订阅者只对特定元素感兴趣,然后取消订阅,向发布者发出完成的信号。
7Subscriber必须确保所有对其Subscription的request和cancel方法的调用是串行的。
说明这条规范的意图是当且仅当在每个调用之间建立了串行关系时,才允许调用request和cancel方法(包括从多个线程调用)。
8如果仍有请求的元素挂起,则在调用Subscription.cancel之后,Subscriber必须准备好接收一个或多个onNext信号。Subscription.cancel()不保证立即执行基础清理操作。
说明这条规范的意图是强调在调用cancel和Publisher观察到该取消之间可能存在延迟。
9Subscriber必须准备好接收onComplete信号,无论是否有前面正在处理的的Subscription.request(long n)调用。
说明这条规范的意图是明确完成与需求流无关,这允许流提前完成,并且无需轮询完成。
10Subscriber必须准备好接收onError信号,无论是否有前面正在处理的的Subscription.request(long n)调用。
说明这条规范的意图是明确Publisher故障也许和发送的需求完全无关。这意味着Subscribers不需要轮训来确定Publisher是否无法满足其请求。
11Subscriber必须确保其信号方法上的所有调用都发生在处理相应信号之前。也就是说Subscriber必须注意将信号正确发布到其处理逻辑。
说明这条规范的意图是明确Subscriber的实现负责确保信号的异步处理必须是线程安全的.
12Subscriber.onSubscribe对于同一个Subscriber最多只能调用1次。
说明这条规范建立在必须假设同一用户Subscriber只能订阅一次的基础上。
13调用onSubscribeonNextonError或者onComplete必须正常返回,除非有任何一个提供的参数是null则必须抛出java.lang.NullPointerException异常给到它的调用者;对于其他情况,Subscriber发出故障信号的唯一合法方式是取消其订阅。如果违反了此规则,则必须将订阅服务器的任何关联订阅视为已取消,并且调用方必须以适合运行时环境的方式抛出此错误条件。
说明此规则的目的是建立Subscriber方法的语义,以及在违反此规则的情况下允许Publisher执行的操作。«以适合运行时环境的方式提出此错误条件»可能意味着记录错误或以其他方式让某人或某物知道情况,因为错误无法通知故障订阅者。

3. Subscription (Code)

public interface Subscription {
    public void request(long n);
    public void cancel();
}
IDRule
1Subscription.request 和 Subscription.cancel 只能在其 Subscriber 上下文中被调用.
说明这条规范的意图是确定Subscription表示Subscriber和Publisher之间的唯一关系。Subscriber可以控制何时请求元素以及何时不再需要更多元素。
2Subscription必须要让SubscriberonNext或者onSubscribe方法中同步调用Subscription.request
说明这条规范的意图是说明request的实现必须是可以重入的,以避免在requestonNext(最终是onComplete/onError)之间相互递归调用的情况下发生堆栈溢出。这暗示Publishers可以是同步的,也就是在调用request的线程上发送onNext的信号。
3Subscription.request必须为PublisherSubscriber之间可能的同步递归设置上限。
说明该规则的目的是在requestonNext(最终是onComplete/onError)之间的相互递归上设置一个上限。为了节省堆栈空间,建议Subscription的实现将这种相互递归的深度限制为1。Subscriber.onNext -> Subscription.request -> Subscriber.onNext -> …是不需要的同步、开放递归的一个例子,否则会导致调用线程的堆栈崩溃。
4Subscription.request应该及时返回调用者。
说明这条规范的意图是表明request应该是非阻塞的方法,并且在调用线程上应该尽量快的执行结束,因此,请避免繁重的计算和其他会阻碍调用方执行线程的事情。
5Subscription.cancel必须及时返回调用者,必须幂等的,必须是线程安全的。
说明这条规范的意图是表明cancel应该是非阻塞的方法,并且在调用线程上应该尽量快的执行结束,因此,请避免繁重的计算和其他会阻碍调用方执行线程的事情。此外,同样重要的是,可以多次调用它而不会产生任何不利影响。
6Subscription被取消后,随后收到的Subscription.request(long n)必须没有任何操作。
说明这条规范的意图是在取消subscription和随后没有操作的更多元素请求之间建立一种因果关系。
7Subscription被取消后,其它的Subscription.cancel()必须没有任何操作。
说明3.5.
8Subscription没有被取消的时候,Subscription.request(long n)必须向相应的Subscriber提供要生成的给定数量的附加元素。
说明这条规范的意图是确保request是一个加法运算,以及确保申请元素的请求被提交给Publisher。
9Subscription没有被取消的时候,Subscription.request(long n)在收到的参数小于等于0的时候必须发送带有一个java.lang.IllegalArgumentException异常onError信号。这个异常信息应该能够解释清楚非正请求参数是非法的。
说明这条规范的意图是为了防止错误的实现在不引发任何异常的情况下继续操作。因为request是叠加的,请求小于等于0个的元素,是最能代表Subscriber进行错误的计算。
10Subscription没有被取消的时候,Subscription.request(long n)可以同步调用subscriber的onNext方法.
说明这条规范的意图是说明可以创建同步的Publishers,也就是,在调用线程直接上执行他们逻辑的Publishers。
11Subscription没有被取消的时候,Subscription.request(long n)可以同步调用subscriber的onCompleteonError方法.
说明这条规范的意图是说明可以创建同步的Publishers,也就是,在调用线程直接上执行他们逻辑的Publishers。
12Subscription没有被取消的时候,Subscription.cancel()必须请求Publisher最终停止向其Subscriber发出信号。这个操作不要求立即对Subscription产生影响。
说明这条规范的意图是以确定取消Subscription的需求最终会得到Publisher的关注,但可能需要一段时间才能收到信号。
13Subscription没有被取消的时候,Subscription.cancel()必须请求Publisher删除任何对应的subscriber的任何引用。
说明这条规范的意图是确保Subscribers在他们的订阅不在可用后,能够被恰当的垃圾回收掉。不应该使用同一个Subscriber再次订阅,但是本规范并未明确禁止这样做,如果这样做就意味着你需要无限期的存储已经取消掉的subscriptions。
14Subscription没有被取消的时候,如果有状态的Publisher且此时没有其他的Subscription存在,调用Subscription.cancel可能会导致Publisher切换到shut-down状态。
说明这条规范的意图是为了响应来自一个现有Subscriber的取消信号,允许Publishers为在新的Subscribers发送onSubscribe信号后发送onCompleteonError信号。
15调用Subscription.cancel必须正常返回。
说明这条规范的意图是不允许Subscription的实现类在cancel被调用的时候抛出异常。
16调用Subscription.request必须正常返回。
说明这条规范的意图是不允许Subscription的实现类在request被调用的时候抛出异常。
17Subscription必须要支持无限数量的request调用,并且支持高达2^63-1 (java.lang.Long.MAX_VALUE)的需求。一个需求如果等于或者大于2^63-1 (java.lang.Long.MAX_VALUE)可能被Publisher任务是一个有效的无限制请求。
说明这条规范的意图是表明Subscriber可以在任意数量的请求调用中请求无限数量的元素(增量大于0)。但由于使用当前或预期的硬件在合理的时间内更笨无法满足2^63-1次的调用需求,因此允许Publisher不用实现这一点的需求。

一个Subscription仅会被一个Publisher和一个Subscriber共享,用于协调这两者之间的数据交换。这就是为什么subscribe()不返回创建的Subscription而是返回void;这个Subscription只会通过onSubscribe的回调传递给这个Subscriber

4.Processor (Code)

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
IDRule
1一个Processor代表一个处理阶段—这个阶段既是一个Subscriber又是一个Publisher,并且必须准守两者规范。
说明这条规范的意图是以确定Processors的行为受Publisher和Subscriber规范的约束。
2一个Processor也许会选择从onError信号中恢复过来。如果它选择这样做,它必须要考虑Subscription被取消掉,否则它必须将onError信号立即传递给他的Subscribers。
说明这条规范的意图是表明可能相关的实现不仅仅只是简单的实现规范。

虽然不是强制的,但当Processor最后的Subscriber取消其Subscription时,建议取消它的上游Subscription,让取消信号向上游传播。

Asynchronous vs Synchronous Processing

Reactive Streams API规定所有元素(onNext)或终止信号(onErroronComplete)的处理都不能阻塞Publisher。但是,每个on*处理程序都可以同步或异步地处理事件。

例如:

nioSelectorThreadOrigin map(f) filter(p) consumeTo(toNioSelectorOutput)

他有一个异步开起点和一个异步终点。让我们假设起点和终点都是选择器事件轮训。Subscription.request(n)必须冲终点连接到起点。这就是现在每个实现都可以选择如何执行此操作的地方。

下面使用管道符|标识异步边界(队列和调度),并且用R#表示资源(可能是线程)。

nioSelectorThreadOrigin | map(f) | filter(p) | consumeTo(toNioSelectorOutput)
-------------- R1 ----  | - R2 - | -- R3 --- | ---------- R4 ----------------

在这个例子中,3个消费者(mapfilterconsumeTo)都异步调度他们的任务,可以在同一个事件循环中(trampoline),独立的线程。。。

nioSelectorThreadOrigin map(f) filter(p) | consumeTo(toNioSelectorOutput)
------------------- R1 ----------------- | ---------- R2 ----------------

这里,通过向NioSelectorOutput事件循环添加工作,只要异步调度最后一步。mapfilter步骤在起始线程上同步执行。

或者另一个实现可以将操作融合到最终消费者:

nioSelectorThreadOrigin | map(f) filter(p) consumeTo(toNioSelectorOutput)
--------- R1 ---------- | ------------------ R2 -------------------------

所有这些变体都是异步流。它们都有各自适用的场景并且每个都有不同的侧重点,包括性能和实现复杂性。

Reactive Streams规范允许实现灵活地管理资源和调度,并在非阻塞、异步、动态推拉流的范围内混合异步和同步处理。

为了能够让所有API(Publisher/Subscription/Subscriber/Processor)的方法能够实现全异步,这些方法的返回参数都是void

Subscriber controlled queue bounds

一个基本的设计原则是所有的缓冲区大小都是有边界的,并且这些边界必须被订阅者知道控制。这些边界用元素计数表示(元素计数反过来转换为onNext的调用计数)。 任何支持无限流(特别是高输出速率的流)的实现需要在整个过程中严格控制边界,避免OOM错误、限制资源的使用。

由于背压是强制性的,可以避免使用无界缓冲区。一般来说,缓冲区无限增长的唯一情况是publisher在较长一段时间内生产速度比subscriber消费速度更高,但这种情况已经被背压处理了。

Subscriber通过发送适量数量元素的需求信号,队列边界可以被效控制。在任何时间点,Subscriber都知道:

  • 请求的元素总数量: P
  • 已经处理的元素总数量: N

那么直到请求更多元素的信号被发送到Publisher前,可能到达的最大元素数量是P - N。如果subscriber还知道其输入缓冲区中元素的数量B,那么边界可以更细化为P - B - N

Publisher必须尊重这些界限,无论其所代表的来源是否支持背压。对于那些生产速率不会被影响的情况(例如时钟的嘀嗒或者鼠标移动),Publisher要么选择使用缓冲区要么删除元素,以满足有限的边界。

Subscriber在接收到一个元素后发出对一个元素的需求信号,有效地实现了一个需求信号相当于确认信号的Stop-and-Wait协议。通过发出需要多个要素的需求,确认成本被均摊了。值得注意的是,Subscriber被允许在任何时间点发出请求信号,避免Publisher和Subscriber之间不必要的延迟(即保持其输入缓冲区已满,而不必等待完整的往返)。

 类似资料: