当前位置: 首页 > 文档资料 > Vert.x 中文文档 >

Reactive - Reactive Streams

优质
小牛编辑
132浏览
2023-12-01

中英文对照表

  • publisher(n.):发布者
  • subscriber(n.):订阅者
  • read stream(n.):可读流
  • write stream(n.):可写流
  • subscribe(v.):注册
  • back pressure:背压机制

为了支持在 JVM 上进行非阻塞的带背压机制的异步流处理, Reactive Streams 做了一些初创性的工作来提供这样一份标准。

这个库提供了 Vert.x 上 Reactive Streams 标准的实现。

在处理流式数据方面,Vert.x 有自己的机制;通过这三个类:io.vertx.core.streams.ReadStreamio.vertx.core.streams.WriteStreamio.vertx.core.streams.Pump,可以在将数据从一个流泵到另一个流时,实现流量控制。更多关于 Vert.x 流方面的信息请查阅 Vert.x Core 部分的手册。

这个库为可读流、可写流都提供了实现,这两者分别扮演了 Reactive Streams 中发布者和订阅者的角色;这使得我们能够以对待 Vert.x 中读写流的方式处理任意 Reactive Streams 的发布者和订阅者。

使用 Vert.x Reactive Streams

要使用 Vert.x Reactive Streams 组件,需要在构建描述符中添加如下依赖:

  • Maven(在 pom.xml 文件中):
  1. <dependency>
  2. <groupId>io.vertx</groupId>
  3. <artifactId>vertx-reactive-streams</artifactId>
  4. <version>3.4.2</version>
  5. </dependency>
  • Gradle(在 build.gradle 文件中):
  1. compile 'io.vertx:vertx-reactive-streams:3.4.2'

Reactive Read Stream

我们为 Vert.x 的 ReadStream 接口提供了实现类 ReactiveReadStream ,它同样也实现了 Reactive Streams 的订阅者角色。

你可以把这个类的实例传递给任意的 Reactive Streams 发布者(例如来自 Akka 的发布者),随后你就可以像从其他任意的 Vert.x ReadStream中一样读取数据(例如使用一个 Pump 把数据从这个流泵到一个 WriteStream)。

这里有个例子,从某个其他的 Reactive Streams 实现中(例如 Akka)获得一个发布者,并将其数据泵入到服务端的 HTTP 响应体中。其间,背压机制是自动执行的。

  1. ReactiveReadStream<Buffer> rrs = ReactiveReadStream.readStream();
  2. // 在另外的发布者上注册订阅者
  3. otherPublisher.subscribe(rrs);
  4. // 将数据从可读流泵入 HTTP 响应
  5. Pump pump = Pump.pump(rrs, response);
  6. pump.start();

Reactive Write Stream

同样的,我们为 WriteStream 接口提供了实现类 ReactiveWriteStream ,它也是 Reactive Streams 的发布者角色的实现。拿到任意的 Reactive Streams 订阅者(例如来自 Akka 的订阅者)之后,你就可以像处理其他任意的 Vert.x WriteStream一样,往其中写入数据(例如使用一个 Pump 把从 ReadStream 来的数据泵入其中)。

在手动处理 Vert.x 可读流的背压时,你会用到 pauseresumewriteQueueFull 这些方法;它们会在内部被自动转换成 Reactive Streams 中背压机制传播方面的方法(在请求更多数据项时)。

这里有个例子,从其他的 Reactive Streams 实现拿到订阅者之后,将服务端的请求体泵入其中。其间背压机制将自动运行。

  1. ReactiveWriteStream<Buffer> rws = ReactiveWriteStream.writeStream(vertx);
  2. // 在可写流上注册另外的订阅者
  3. rws.subscribe(otherSubscriber);
  4. // 将 HTTP 请求泵入可写流
  5. Pump pump = Pump.pump(request, rws);
  6. pump.start();

原文档最后更新于 2017-03-15 15:54:14 CET