当前位置: 首页 > 知识库问答 >
问题:

如何使用Spring在响应请求时通过反应websocket推送数据?

桓修能
2023-03-14

我正在使用Spring Boot2.1.3开始使用reactive websockets。我创建了一个WebSocketHandler实现,如下所示:

@Override
public Mono<Void> handle(WebSocketSession session) {

Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(1);
    var publisher = flux.map( o -> {
        try {
            return objectMapper.writeValueAsString(o);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            return null;
        }
    }).map(session::textMessage)
      .delayElements(Duration.ofSeconds(1));
    return session.send(publisher);
}

但是,我想对来自websocket的请求作出反应,告诉service我想要的数据的id是什么。我设法获得了这样的请求信息:

@Override
public Mono<Void> handle(WebSocketSession session) {

    return session.send(session.receive().map(webSocketMessage -> {
        int id = Integer.parseInt(webSocketMessage.getPayloadAsText());

        return session.textMessage("Subscribing with id " + id);
    }));

现在我不知道如何结合这两个实现?

我希望能做这样的事情:

@Override
public Mono<Void> handle(WebSocketSession session) {

    return session.send(session.receive().map(webSocketMessage -> {
        int id = Integer.parseInt(webSocketMessage.getPayloadAsText());

        Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
        var publisher = flux.map( o -> {
            try {
                return objectMapper.writeValueAsString(o);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
                return null;
            }
        }).map(session::textMessage)
                            .delayElements(Duration.ofSeconds(1));
        return publisher; //Does not compile
    }));
@Override
public Mono<Void> handle(WebSocketSession session) {
    Flux<EfficiencyData> flux =
            session.receive()
                   .map(webSocketMessage -> Integer.parseInt(webSocketMessage.getPayloadAsText()))
                   .concatMap(service::subscribeToEfficiencyData);
    Mono<Void> input = flux.then();
    Mono<Void> output = session.send(flux.map(data -> session.textMessage(data.toString()))).then();
    return Mono.zip(input, output).then();
}

但这只是立即断开websocket客户机而不做任何事情。

共有1个答案

刘棋
2023-03-14

为了解决您的问题,您必须使用允许对返回值进行平坦化的运算符。例如:

@Override
public Mono<Void> handle(WebSocketSession session) {

    return session.send(
       session.receive()
              .flatMap(webSocketMessage -> {
                  int id = Integer.parseInt(webSocketMessage.getPayloadAsText());

                  Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
                  var publisher = flux
                      .<String>handle((o, sink) -> {
                         try {
                            sink.next(objectMapper.writeValueAsString(o));
                         } catch (JsonProcessingException e) {
                            e.printStackTrace();
                            return; // null is prohibited in reactive-streams
                         }
                      })
                      .map(session::textMessage)
                      .delayElements(Duration.ofSeconds(1));

                  return publisher;
              })
    );
}
  1. 如果返回类型是流,请使用flatmapconcatmap(请参阅此处的区别
  2. 从不返回null。在reactive-streams中,null是禁止的值(请参阅此处的规范规则
  3. 当映射以null结束时->使用handle运算符。请参阅此处的详细说明
 类似资料:
  • WebSocket提供了一种像人类对话一样的双向通信。客户端可以向服务器发送数据,服务器可以随时向客户端发送数据。但是请求-响应行为呢?客户端可以向服务器询问一些内容并等待响应。看来Websocket并没有提供任何东西来将客户端数据(请求)链接到服务器数据(响应)。 这可能是子协议的工作,我对如何做到这一点有一些想法(发送一个ID和请求,并在超时期间内等待一个具有相同ID的响应)。 为了不推倒重来

  • 嗨,我正在通过axios尝试reactjs POST请求,但出现错误,我查看了所有文档,但错误未得到解决。 这是我的错误: 未捕获(promise中)错误:请求在XMLHttpRequest的结算(eval at(bundle.js:4621),:15:15)处的createError(eval at(bundle.js:4615),:18:12)处失败,状态代码为400。手工装载(在(捆js:4

  • 我正在尝试将SpringReactor与我的SpringBoot应用程序一起使用。 我正在使用ProjectReactor 3.0.7。松开并安装Spring护套1.5.3。释放 在我的服务类中有一个返回通量的方法。 我想将该值返回到web层中的控制器。但是,我没有看到json响应中返回的值。 当我从浏览器调用http://localhost:8080时,我得到的响应是,{“预取”:-1} 我不确

  • 我有一个简单的java spring方法来创建对象 无法提取响应:找不到响应类型[class Address]和内容类型[text/plain;charset=UTF-8]的合适的HttpMessageConverter 因此,我认为,我需要更改响应头内容类型,以正确的application/JSON,以便MappingJackson2HttpMessageConverter找到JSON字符串并运

  • 在服务器端,我正在使用一个HTTP API,它以页面形式返回结果。如中所示,响应包含x个结果,如果有超过0的结果,我可以再次调用它以获得下一个x个结果。x可以任意选择直到API的最大页面大小。 现在,我想要在WebSocket上高效地流式传输全套结果,而不会使它不堪重负(施加背压)。最初,我构建了整个resultset,然后从中创建了一个源代码: 这样可以工作,WebSocket客户机以其最大速度

  • 我正在研究访问HTTP请求和响应体的最佳方式,以便在Spring反应式应用程序中进行跟踪。 对于以前的版本,我们已经利用Servlet过滤器和Servlet请求包装器来使用传入请求的输入流,并保存其副本,以便异步处理跟踪(我们将其发送给Elasticsearch)。 但对于一个Spring反应式应用程序(使用webflux),我想知道在解码之前访问请求的最合适方式是什么。有什么想法吗?