第15章 Server-Sent Events (SSE) 支持 - 15.4. 在 JAX-RS 资源中实现 SSE

优质
小牛编辑
129浏览
2023-12-01

15.4.1 简单 SSE 资源方法

先要添加 Jersey SSE 模块的依赖在你的项目中,并在你的 Application 或者 ResourceConfig 中注册这个 SseFeature:

Example 15.2. Simple SSE resource method

  1. ...
  2. import org.glassfish.jersey.media.sse.EventOutput;
  3. import org.glassfish.jersey.media.sse.OutboundEvent;
  4. import org.glassfish.jersey.media.sse.SseFeature;
  5. ...
  6. @Path("events")
  7. public static class SseResource {
  8. @GET
  9. @Produces(SseFeature.SERVER_SENT_EVENTS)
  10. public EventOutput getServerSentEvents() {
  11. final EventOutput eventOutput = new EventOutput();
  12. new Thread(new Runnable() {
  13. @Override
  14. public void run() {
  15. try {
  16. for (int i = 0; i < 10; i++) {
  17. // ... code that waits 1 second
  18. final OutboundEvent.Builder eventBuilder
  19. = new OutboundEvent.Builder();
  20. eventBuilder.name("message-to-client");
  21. eventBuilder.data(String.class,
  22. "Hello world " + i + "!");
  23. final OutboundEvent event = eventBuilder.build();
  24. eventOutput.write(event);
  25. }
  26. } catch (IOException e) {
  27. throw new RuntimeException(
  28. "Error when writing the event.", e);
  29. } finally {
  30. try {
  31. eventOutput.close();
  32. } catch (IOException ioClose) {
  33. throw new RuntimeException(
  34. "Error when closing the event output.", ioClose);
  35. }
  36. }
  37. }
  38. }).start();
  39. return eventOutput;
  40. }
  41. }

上面的代码定义了资源部署在 URI “/events”。这个资源有一个 @GET 资源方法返回作为一个实体 EventOutput ——通用 Jersey ChunkedOutput API 的扩展用于输出分块消息处理。

注意

如果对 ChunkedOutput 和 ChunkedInput 部署,见 Async一章

eventOutput 从方法返回后,Jersey 运行时认识到,这是一个ChunkedOutput 扩展,于是不立即关闭客户端连接。相反,它写 HTTP 头到响应流,并等待发送块(SSE 事件)。此时客户端可以读头,开始监听独立事件。

注意

因为 Jersey 运行时不隐式地关闭连接到客户端(类似于异步处理),关闭连接是一个资源的方法的责任,或客户端监听在开放连接用于新的事件(见下面的例子)。

Example 15.2, “Simple SSE resource method”例子中,资源方法创建一个新的线程用于发送 10 个事件序列。两个事件序列之间有一个1秒的延迟。每个事件由OutboundEvent 类型展示,是建立的 helpf 出站事件生成器。OutboundEvent 反映了 SSE 消息的标准化格式,包含属性,代表的 name (指定事件), comment, data 或 id。还设置事件数据媒体类型,在 eventBuilder 上使用 mediaType(MediaType) 方法。媒体类型和数据类型的 data<Class, Object> 方法(在我们的例子中String.class),用于序列化的事件数据。注意事件数据媒体类型不会写入任何头定义的头,因为用于响应的 content-type 头已经在 @Produces 定义,并且在 SseFeature 中使用设置为“text/event-stream”。事件媒体类型是用于序列化事件数据。事件数据媒体类型和 Java 类型是用来选择合适的 MessageBodyWriter用于事件数据序列化,并且传递给选定的 writer 来序列化传递事件数据内容。在我们的例子中字符串 “Hello world “ + i + “!” 被序列化为 “text/plain” 。在事件数据可以发送任何 Java 实体,并将它与任何媒体类型关联,你可以用一个可用 MessageBodyWriter 来序列化。通常情况下,您可能想要发送例如 JSON 数据,所以你会在数据上用 JAXB 注解为 bean 实例,并定义媒体类型为 JSON。

注意

如果事件媒体类型不明确,媒体类型默认使用”text/plain” 。

当出站事件已经准备好,它就能写入到 eventOutput。此时,事件已经被内置的 OutboundEventWriter 使用合适的 MessageBodyWriter<T> 序列化为 “Hello world “ + i + “!” 文本。你可以发送任意多条消息。线程执行的最后资源将关闭,同时客户端的连接也关闭了。这样这个连接就不会再发送消息。若客户端想接收到更多消息,可以发送新的请求到服务器来初始化新的 SSE 流连接。

客户端将收到如下消息:

  1. event: message-to-client
  2. data: Hello world 0!
  3. event: message-to-client
  4. data: Hello world 1!
  5. event: message-to-client
  6. data: Hello world 2!
  7. event: message-to-client
  8. data: Hello world 3!
  9. event: message-to-client
  10. data: Hello world 4!
  11. event: message-to-client
  12. data: Hello world 5!
  13. event: message-to-client
  14. data: Hello world 6!
  15. event: message-to-client
  16. data: Hello world 7!
  17. event: message-to-client
  18. data: Hello world 8!
  19. event: message-to-client
  20. data: Hello world 9!

每个消息将会延迟一秒收到

注意

如果你有使用过 JAX-RS 的流,你可能想知道 ChunkedOutputStreamingOutput之间的区别。

*ChunkedOutput 是 Jersey 特殊的 API。它允许您发送“块”的数据而无需关闭客户端连接,使用一系列方便调用 ChunkedOutput.write 方法来使 POJO + chunk 媒体类型作为输入,然后使用配置的 JAX-RS MessageBodyWriter<T> 提供者用于找出适当的序列化方式给每个块POJO 字节。此外,ChunkedOutput 写可以多次调用同一出站响应连接,也就是,每个独立的块都写在每一个写里,而不是一个完整的响应实体。

StreamingOutput,另一方面,是低级别的 JAX-RS API 用于直接处理字节。你必须自己实现 StreamingOutput 接口。同时,write(OutputStream) 方法将会被 JAX-RS 运行时在每个响应中调用一次,并且调用这个方法是阻塞的,也就是,预计方法会编写整个实体的 body ,然后返回。*

15.4.2. 通过 Jersey SSE 广播

Jersey SSE 服务器 API 定义了 SseBroadcaster 允许广播独立的事件给多个客户端。下面是一个例子:

Example 15.3. Broadcasting SSE messages

  1. ...
  2. import org.glassfish.jersey.media.sse.SseBroadcaster;
  3. ...
  4. @Singleton
  5. @Path("broadcast")
  6. public static class BroadcasterResource {
  7. private SseBroadcaster broadcaster = new SseBroadcaster();
  8. @POST
  9. @Produces(MediaType.TEXT_PLAIN)
  10. @Consumes(MediaType.TEXT_PLAIN)
  11. public String broadcastMessage(String message) {
  12. OutboundEvent.Builder eventBuilder = new OutboundEvent.Builder();
  13. OutboundEvent event = eventBuilder.name("message")
  14. .mediaType(MediaType.TEXT_PLAIN_TYPE)
  15. .data(String.class, message)
  16. .build();
  17. broadcaster.broadcast(event);
  18. return "Message '" + message + "' has been broadcast.";
  19. }
  20. @GET
  21. @Produces(SseFeature.SERVER_SENT_EVENTS)
  22. public EventOutput listenToBroadcast() {
  23. final EventOutput eventOutput = new EventOutput();
  24. this.broadcaster.add(eventOutput);
  25. return eventOutput;
  26. }
  27. }

让我们一起探索这个例子。BroadcasterResource 资源类注释 用 @Singleton 注解,告诉 Jersey 运行时,只有一个实例的资源类应该用于所有传入请求 /broadcast 路径。这是需要我们想让一个应用程序范围单一引用私有的 broadcaster 字段,这样我们为所有请求可以使用相同的实例。客户端想监听 SSE 事件,先发送 GET 请求到 BroadcasterResource 的 listenToBroadcast() 资源方法处理。方法创建一个新的 EventOutput 用于展示请求的客户端的连接,并通过 add(EventOutput) 注册 eventOutput 实例到单例 broadcaster。方法返回 eventOutput 导致 Jersey 使请求的客户端事件与 eventOutput 实例绑定,向客户机发送响应 HTTP 头。客户端连接保持开放,客户端等待准备接收新的 SSE 事件。所有的事件通过 broadcaster 写入 eventOutput。这样开发人员可以方便地处理发送新的事件到所有订阅的客户端。

当客户机想要广播新消息给所有的已经监听 SSE 连接的客户端,它发送一个POST 请求 BroadcasterResource 资源消息内容。 BroadcasterResource 资源上调用方法 broadcastMessage(String),消息内容作为输入参数。一个新的 SSE 出站事件是建立在标准方法并传递给 broadcaster。 broadcaster 内部调用 write(OutboundEvent) 在所有注册了的 EventOutput 上。当该方法只是返回一个标准文本响应给客户端,来通知客户端已经成功广播了消息。正如您可以看到的, broadcastMessage(String) 资源方法只是一个简单的 JAX-RS 资源的方法。

为了实现这种情况下,您可能已经注意到,Jersey SseBroadcaster 完成用例不是强制性的。单个 EventOutput 可以只是存储在收集器里,并且迭代 broadcastMessage 方法。然而,SseBroadcaster 内部识别和处理客户端断开连接。当客户端关闭连接, broadcaster 可检测并删除过期的内部收集器里面注册了 EventOutput的连接,以及释放所有服务器端资源关联的陈旧的连接。此外,SseBroadcaster 实现线程安全的,这样客户可以在任何时间和连接和断开 SseBroadcaster 总是广播消息最近收集的注册和活跃的客户端。