当前位置: 首页 > 文档资料 > Vert.x 中文文档 >

Reactive - Vert.x Sync

优质
小牛编辑
135浏览
2023-12-01

中英文对照表

  • bytecode instrumentation:字节码修改/增强
  • kernel thread:内核线程

简介

Vert.x Sync 是一组工具集,其特点是在不阻塞内核线程的同时,允许用户以同步的方式接收事件、执行异步操作。

比起很多历史遗留的应用系统,Vert.x 的一个关键优点是完全非阻塞(于内核线程而言) —— 这使它用少量的内核线程就可以处理大量的并发(例如,处理很多的连接、消息之类),具有优异的可扩展性。

Vert.x 非阻塞性的特性产生了异步的 API 。异步 API 可以有多种风格,包括回调、Promise和Rx风格。Vert.x 在绝大多数地方使用回调(尽管它也支持Rx)。

某些情况下,使用异步 API 编程比直接使用同步 API 要更具挑战,特别是当你有好几个操作要按顺序完成时。同时,使用异步 API 时,错误的传递也会变得更复杂。

Vert.x Sync 可以让你在熟悉的同步风格下继续使用异步 API 。

在此,通往自由之路的功臣乃 Fiber(译者注:这个词国内有译作纤程,类似协程-coroutine)。Fiber 是超轻量级的线程,并不是对应于底层的那种内核线程,它们被阻塞时不会导致内核线程也被阻塞。

Vert.x 借助 Quasar 库来实现 Fiber。

注意:Vert.x Sync 当前只适用于 Java 。

SyncVerticle

要使用 Vert.x Sync 库,你的代码需要继承 io.vertx.ext.sync.SyncVerticle 类,并重载start()方法和stop()方法(stop 非必需)。

这些方法还必须加上 @Suspendable 注解。

写好的 Sync Verticle,其部署方法和其他 Verticle 完全一样。

Instrumentation

Vert.x 用到了 Quasar 库,这个库借助字节码增强(bytecode instrumentation)的技术实现了 Fiber。字节码增强工作是在运行时使用 Java Agent 技术完成的。

为了使这个特性正常工作,需要在启动 JVM 时指定 quasar-core jar包为 Java Agent jar 包:

  1. -javaagent:/path/to/quasar/core/quasar-core.jar

如果你用的是 vertx 命令行工具,可以在执行 vertx 前设置环境变量 ENABLE_VERTX_SYNC_AGENTture,这样可以启用 Agent 的配置。

你也可以使用 quasar-maven-plugin 达成离线增强(offline instrumentation,指非运行时织入字节码)的效果。更多细节请参考 Quasar 官方文档

获得一次性的异步操作结果

在Vert.x 的领域里,很多操作都会接受一个 Handler<AsyncResult<T>> 作为最后的参数,例如用 Vert.x 的 Mongo 客户端执行一次查询或者发送一个 Event Bus 消息然后拿到回应。

Vert.x Sync 可以让你用同步的方式得到这种一次性的异步操作的结果。这是通过调用 Sync.awaitResult 方法完成的。调用这个方法时,需将想要执行的异步操作包装成 Consumer 的形式。Handler 参数会在运行时传给此 Consumer

看下面的例子:

  1. EventBus eb = vertx.eventBus();
  2. // Send a message and get the reply synchronously
  3. Message<String> reply = awaitResult(h -> eb.send("someaddress", "ping", h));
  4. System.out.println("Received reply " + reply.body());

上面的例子中,在回应返回前,Fiber 会一直被阻塞住,而内核线程不会。

获得一次性的事件

Vert.x Sync 也能以同步的方式获得一次性的事件,例如定时器的触发,或者 end handler(译者注:关于 end handler 的例子可以参见 Vert.x Core 文档中 HTTP 服务器与客户端 一节)的执行。这是通过 Sync.awaitEvent 方法完成成的。

看下面的例子:

  1. long tid = awaitEvent(h -> vertx.setTimer(1000, h));
  2. System.out.println("Timer has now fired");

事件流

很多时候,Vert.x 的 Handler 接收到的是事件流,例如 Event Bus 消息的消费者(Consumer)、HTTP 服务端里的 HTTP 服务端请求(server request)。

Vert.x Sync 使你能以同步的方式从这种流中接收事件。

你需要一个同时实现了 HandlerReceiver 接口的 HandlerReceiverAdaptor 类实例。Sync.streamAdaptor 方法可以创建这样一个实例。

你可以把它当成一个普通的 Handler ,然后可以用实现自 Receiver 接口的方法来同步地接收事件。

下面是一个 Event Bus 消息消费者的例子:

  1. EventBus eb = vertx.eventBus();
  2. HandlerReceiverAdaptor<Message<String>> adaptor = streamAdaptor();
  3. eb.<String>consumer("some-address").handler(adaptor);
  4. // Receive 10 messages from the consumer:
  5. for (int i = 0; i < 10; i++) {
  6. Message<String> received1 = adaptor.receive();
  7. System.out.println("got message: " + received1.body());
  8. }

使用 FiberHandler

如果你想在一般的 Handler 中使用 Fiber —— 例如 HTTP 服务端的请求处理器(request handler) ,那得首先把这个一般的 Handler 转换为 Fiber Handler 。

Fiber Handler 会在 Fiber 里运行那个一般的 Handler 。

看例子:

  1. EventBus eb = vertx.eventBus();
  2. vertx.createHttpServer().requestHandler(fiberHandler(req -> {
  3. // Send a message to address and wait for a reply
  4. Message<String> reply = awaitResult(h -> eb.send("some-address", "blah", h));
  5. System.out.println("Got reply: " + reply.body());
  6. // Now end the response
  7. req.response().end("blah");
  8. })).listen(8080, "localhost");

更多示例

Examples Repository 这里有一些示例,展示了 vertx-sync 的用法。


原文档最后更新于 2017-03-15 15:54:14 CET