当前位置: 首页 > 知识库问答 >
问题:

通过SSE订阅的Flux引发取消()事件

曹镜
2023-03-14

我有一个Spring靴2.0.0。M7 Spring Webflux应用程序,其中我使用的是Thymeleaf Reactive。

我注意到,在我的微服务上,当我在SSE模式(文本/事件流)下调用一个返回数据流的endpoint时,即使该数据流已被正确处理,也会在该数据流上发生cancel()。

例如,这里有一个简单的控制器endpoint:

@GetMapping(value = "/posts")
public Flux<String> getCommunityPosts() {
    return Flux.just("A", "B", "C").log("POSTS");
}

以下是我在SSE模式下请求时得到的订阅流量日志:

2018-02-13 17:04:09.841  INFO 4281 --- [nio-9090-exec-4] POSTS : | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
2018-02-13 17:04:09.841  INFO 4281 --- [nio-9090-exec-4] POSTS : | request(1)
2018-02-13 17:04:09.842  INFO 4281 --- [nio-9090-exec-4] POSTS : | onNext(A)
2018-02-13 17:04:09.847  INFO 4281 --- [nio-9090-exec-4] POSTS : | request(1)
2018-02-13 17:04:09.847  INFO 4281 --- [nio-9090-exec-4] POSTS : | onNext(B)
2018-02-13 17:04:09.848  INFO 4281 --- [nio-9090-exec-4] POSTS : | request(1)
2018-02-13 17:04:09.848  INFO 4281 --- [nio-9090-exec-4] POSTS : | onNext(C)
2018-02-13 17:04:09.849  INFO 4281 --- [nio-9090-exec-4] POSTS : | request(1)
2018-02-13 17:04:09.849  INFO 4281 --- [nio-9090-exec-4] POSTS : | onComplete()
2018-02-13 17:04:09.852  INFO 4281 --- [nio-9090-exec-4] POSTS : | cancel()

我们可以在完成后通知取消事件。当我通过经典GET请求调用相同的终结点时,我没有这种行为。我怀疑这个取消事件会使客户端事件源(javascript)抛出一个onError事件。

这是上交所特有的已知/想要的行为吗?

问题更新

实际上,我在一些流上使用SSE,因为有时我需要事件源来获取JSON数据,而不是Thymeleaf已经处理过的HTML。我应该用另一种方式吗?

我的实现基于此示例的最后一种方法:https://github.com/danielfernandez/reactive-matchday/blob/master/src/main/java/com/github/danielfernandez/matchday/web/controller/MatchController.java

然而,我可能没有在我之前的帖子中提供一些信息。我使用Tomcat服务器(带有M7的8.5.23),而不是Netty服务器。我强制使用Tomcat,包括以下Maven依赖项:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>

在示例项目中使用您的代码,这似乎会导致问题。

当我在Netty服务器上运行代码时,得到的结果与您相同:

2018-02-14 12:30:48.713  INFO 3060 --- [ctor-http-nio-2] reactor.Flux.ConcatMap.1 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
2018-02-14 12:30:48.714  INFO 3060 --- [ctor-http-nio-2] reactor.Flux.ConcatMap.1 : request(1)
2018-02-14 12:30:49.717  INFO 3060 --- [     parallel-2] reactor.Flux.ConcatMap.1 : onNext(a)
2018-02-14 12:30:49.739  INFO 3060 --- [ctor-http-nio-2] reactor.Flux.ConcatMap.1 : request(31)
2018-02-14 12:30:50.731  INFO 3060 --- [     parallel-3] reactor.Flux.ConcatMap.1 : onNext(b)
2018-02-14 12:30:51.733  INFO 3060 --- [     parallel-4] reactor.Flux.ConcatMap.1 : onNext(c)
2018-02-14 12:30:51.735  INFO 3060 --- [     parallel-4] reactor.Flux.ConcatMap.1 : onComplete()

当我在Tomcat服务器上运行相同的代码时,我有取消问题:

2018-02-14 12:33:18.294  INFO 3088 --- [nio-8080-exec-3] reactor.Flux.ConcatMap.2 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
2018-02-14 12:33:18.295  INFO 3088 --- [nio-8080-exec-3] reactor.Flux.ConcatMap.2 : request(1)
2018-02-14 12:33:19.295  INFO 3088 --- [     parallel-4] reactor.Flux.ConcatMap.2 : onNext(a)
2018-02-14 12:33:19.297  INFO 3088 --- [     parallel-4] reactor.Flux.ConcatMap.2 : request(1)
2018-02-14 12:33:20.302  INFO 3088 --- [     parallel-5] reactor.Flux.ConcatMap.2 : onNext(b)
2018-02-14 12:33:20.302  INFO 3088 --- [     parallel-5] reactor.Flux.ConcatMap.2 : request(1)
2018-02-14 12:33:21.306  INFO 3088 --- [     parallel-6] reactor.Flux.ConcatMap.2 : onNext(c)
2018-02-14 12:33:21.306  INFO 3088 --- [     parallel-6] reactor.Flux.ConcatMap.2 : request(1)
2018-02-14 12:33:21.307  INFO 3088 --- [     parallel-6] reactor.Flux.ConcatMap.2 : onComplete()
2018-02-14 12:33:21.307  INFO 3088 --- [nio-8080-exec-4] reactor.Flux.ConcatMap.2 : cancel()

这可能是雄猫的问题还是我做错了什么?

共有1个答案

逄皓轩
2023-03-14

首先,我认为不应该对有限流使用SSE。

当我创建一个Controller方法时,比如:

@GetMapping(path = "/test", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<String> test() {
    return Flux.just("a", "b", "c").delayElements(Duration.ofSeconds(1)).log();
}

并通过以下浏览器(Chrome或Firefox)请求:

<script type="text/javascript">
    var testEventSource = new EventSource("/test");
    testEventSource.onmessage = function (e) {
        console.log(e);
    };
</script>

我在服务器上获得以下日志:

| onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
| request(1)
| onNext(a)
| request(31)
| onNext(b)
| onNext(c)
| onComplete()
| onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
| request(1)
| onNext(a)
| request(31)
| onNext(b)
| onNext(c)
| onComplete()

Flux完成后,服务器将关闭连接,浏览器将自动重新连接。这将一次又一次地重播相同的序列。

我在服务器上获得取消()事件的唯一方法是在流期间关闭浏览器选项卡。

 类似资料:
  • Tendermint 会发出不同的事件,您可以通过Websocket订阅这些事件。这对于第三方应用程序(如 analysys)或检查状态非常有用。 事件列表 您可以通过 Websocket 调用 subscribe RPC 方法订阅上面的任何事件。 { "jsonrpc": "2.0", "method": "subscribe", "id": "0", "para

  • 我喜欢做一个有redis回应的SSE。在quarkus中订阅。 我有一个来自quarkus快速入门的简单SSE示例 这个效果很好,每2秒钟我就会收到Hello。。。。在我的web浏览器中 现在我尝试订阅Redis,所以我应该会收到Redis的消息。 Redis示例: 现在,我用quarkus SSE尝试以下方法: 我收到的是一个例外: 有人能支持我吗?有一个简单的例子吗?我对此一无所知,我无法接收

  • 我正在我的应用程序中的服务中使用某些EventSource。 我想在浏览器关闭/刷新时关闭所有连接。 我看过这篇文章 现在,这就是我添加到我的服务中的内容。 有更好的解决办法吗?

  • 本文向大家介绍结合Visual C#开发环境讲解C#中事件的订阅和取消订阅,包括了结合Visual C#开发环境讲解C#中事件的订阅和取消订阅的使用技巧和注意事项,需要的朋友参考一下 类或对象可以通过事件向其他类或对象通知发生的相关事情。发送(或引发)事件的类称为“发行者”,接收(或处理)事件的类称为“订户”。 在典型的 C# Windows 窗体或 Web 应用程序中,可订阅由控件(如按钮和列表

  • 我编写了一个Spring RestController,它返回一个SseEmitter(对于服务器发送的事件),并向每个事件添加HATEOAS链接。以下是该控制器的一个简化但有效的示例: 问候语课程如下: 此代码运行完美。如果我尝试使用Web浏览器访问REST服务,我会看到显示正确内容和链接的事件。 结果如下所示(每个事件出现在前一个事件之后5秒): 现在我需要调用这个REST服务并从另一个Spr

  • 我使用SockJS和StompJS,当我在浏览器中打开我的应用程序时,有时它会在连接到websocket之前尝试订阅一些主题。我希望主题订阅等待应用程序连接到websocket。 这就是我实现此代码的原因,我将其称为: 因此,我只在连接状态为时才订阅该主题,并且只有在客户端首次成功连接时才会调用该主题。 我想稍后从主题中取消订阅,所以我需要内部订阅返回的对象,我还需要内部订阅的消息。 我所实现的很