我在JDK 11上使用带Webflux的Spring boot 2。我编写了以下配置类:
@Configuration
public class WebSocketConfiguration {
@Autowired
@Bean
public HandlerMapping webSocketMapping(final MyWebSocketHandler server) {
final Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/echo", server);
final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
mapping.setUrlMap(map);
return mapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
以及以下WebSocketHandler
方法:
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
return webSocketSession.send(webSocketSession.receive().
map(msg -> webSocketSession
.textMessage("response:jack ->" + msg.getPayloadAsText())));
}
现在,我可以接收我发送的任何内容,比如:
客户端发送:4545
客户端接收:响应:jack-
我想知道当客户端不向我发送消息时,我如何向客户端推送消息,我需要随时推送消息!
如何在任何时候发送自定义消息,而不是使用相同的输入消息进行响应?
你可以在我的博客文章中读到http://kojotdev.com/2019/08/spring-webflux-websocket-with-vue-js/.
您需要将您的WebSocketHandler
更改为:
private final GreetingsPublisher greetingsPublisher;
private final Flux<String> publisher;
public ReactiveWebSocketHandler(GreetingsPublisher greetingsPublisher) {
this.greetingsPublisher = greetingsPublisher;
this.publisher = Flux.create(greetingsPublisher).share();
}
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
final Flux<WebSocketMessage> message = publisher
.map(greetings -> webSocketSession.textMessage(greetings));
return webSocketSession.send(message);
}
并添加GreetingPublisher
@Component
public class GreetingsPublisher implements Consumer<FluxSink<String>> {
private static final Logger log = LoggerFactory.getLogger(GreetingsPublisher.class);
private final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
private final Executor executor = Executors.newSingleThreadExecutor();
public boolean push(String greeting) {
return queue.offer(greeting);
}
@Override
public void accept(FluxSink<String> sink) {
this.executor.execute(() -> {
while (true) {
Try.of(() -> {
final String greeting = queue.take();
return sink.next(greeting);
})
.onFailure(ex -> log.error("Could not take greeting from queue", ex));
}
});
}
}
它是一个bean,因此无论您在何处注入它并调用push方法,它都将使用WebSocket发送消息。例如:
@Controller
public class GreetingsController {
private final GreetingsPublisher greetingsPublisher;
public GreetingsController(GreetingsPublisher greetingsPublisher) {
this.greetingsPublisher = greetingsPublisher;
}
@Bean
RouterFunction<ServerResponse> pushMessage() {
return route(GET("/push"),
request -> {
greetingsPublisher.push("Send a new message with WebSocket");
return ServerResponse.ok().body(fromObject("websocket message sent"));
});
}
}
首先连接WebSocket并在localhost:8080/push上打开浏览器。应该发送消息。
请注意,这似乎是SpringBoot2.1的一个bug。我在我的博客文章中提到了这一点。