我正在使用Spring Boot2.1.3开始使用reactive websockets。我创建了一个WebSocketHandler
实现,如下所示:
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(1);
var publisher = flux.map( o -> {
try {
return objectMapper.writeValueAsString(o);
} catch (JsonProcessingException e) {
e.printStackTrace();
return null;
}
}).map(session::textMessage)
.delayElements(Duration.ofSeconds(1));
return session.send(publisher);
}
但是,我想对来自websocket的请求作出反应,告诉service
我想要的数据的id是什么。我设法获得了这样的请求信息:
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(session.receive().map(webSocketMessage -> {
int id = Integer.parseInt(webSocketMessage.getPayloadAsText());
return session.textMessage("Subscribing with id " + id);
}));
现在我不知道如何结合这两个实现?
我希望能做这样的事情:
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(session.receive().map(webSocketMessage -> {
int id = Integer.parseInt(webSocketMessage.getPayloadAsText());
Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
var publisher = flux.map( o -> {
try {
return objectMapper.writeValueAsString(o);
} catch (JsonProcessingException e) {
e.printStackTrace();
return null;
}
}).map(session::textMessage)
.delayElements(Duration.ofSeconds(1));
return publisher; //Does not compile
}));
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<EfficiencyData> flux =
session.receive()
.map(webSocketMessage -> Integer.parseInt(webSocketMessage.getPayloadAsText()))
.concatMap(service::subscribeToEfficiencyData);
Mono<Void> input = flux.then();
Mono<Void> output = session.send(flux.map(data -> session.textMessage(data.toString()))).then();
return Mono.zip(input, output).then();
}
但这只是立即断开websocket客户机而不做任何事情。
为了解决您的问题,您必须使用允许对返回值进行平坦化的运算符。例如:
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(
session.receive()
.flatMap(webSocketMessage -> {
int id = Integer.parseInt(webSocketMessage.getPayloadAsText());
Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
var publisher = flux
.<String>handle((o, sink) -> {
try {
sink.next(objectMapper.writeValueAsString(o));
} catch (JsonProcessingException e) {
e.printStackTrace();
return; // null is prohibited in reactive-streams
}
})
.map(session::textMessage)
.delayElements(Duration.ofSeconds(1));
return publisher;
})
);
}
flatmap
或concatmap
(请参阅此处的区别null
。在reactive-streams中,null
是禁止的值(请参阅此处的规范规则null
结束时->使用handle
运算符。请参阅此处的详细说明WebSocket提供了一种像人类对话一样的双向通信。客户端可以向服务器发送数据,服务器可以随时向客户端发送数据。但是请求-响应行为呢?客户端可以向服务器询问一些内容并等待响应。看来Websocket并没有提供任何东西来将客户端数据(请求)链接到服务器数据(响应)。 这可能是子协议的工作,我对如何做到这一点有一些想法(发送一个ID和请求,并在超时期间内等待一个具有相同ID的响应)。 为了不推倒重来
嗨,我正在通过axios尝试reactjs POST请求,但出现错误,我查看了所有文档,但错误未得到解决。 这是我的错误: 未捕获(promise中)错误:请求在XMLHttpRequest的结算(eval at(bundle.js:4621),:15:15)处的createError(eval at(bundle.js:4615),:18:12)处失败,状态代码为400。手工装载(在(捆js:4
我正在尝试将SpringReactor与我的SpringBoot应用程序一起使用。 我正在使用ProjectReactor 3.0.7。松开并安装Spring护套1.5.3。释放 在我的服务类中有一个返回通量的方法。 我想将该值返回到web层中的控制器。但是,我没有看到json响应中返回的值。 当我从浏览器调用http://localhost:8080时,我得到的响应是,{“预取”:-1} 我不确
我有一个简单的java spring方法来创建对象 无法提取响应:找不到响应类型[class Address]和内容类型[text/plain;charset=UTF-8]的合适的HttpMessageConverter 因此,我认为,我需要更改响应头内容类型,以正确的application/JSON,以便MappingJackson2HttpMessageConverter找到JSON字符串并运
在服务器端,我正在使用一个HTTP API,它以页面形式返回结果。如中所示,响应包含x个结果,如果有超过0的结果,我可以再次调用它以获得下一个x个结果。x可以任意选择直到API的最大页面大小。 现在,我想要在WebSocket上高效地流式传输全套结果,而不会使它不堪重负(施加背压)。最初,我构建了整个resultset,然后从中创建了一个源代码: 这样可以工作,WebSocket客户机以其最大速度
我正在研究访问HTTP请求和响应体的最佳方式,以便在Spring反应式应用程序中进行跟踪。 对于以前的版本,我们已经利用Servlet过滤器和Servlet请求包装器来使用传入请求的输入流,并保存其副本,以便异步处理跟踪(我们将其发送给Elasticsearch)。 但对于一个Spring反应式应用程序(使用webflux),我想知道在解码之前访问请求的最合适方式是什么。有什么想法吗?