当前位置: 首页 > 知识库问答 >
问题:

Springboot2WebFlux websocket

周奇文
2023-03-14

我在JDK 11上使用带Webflux的Spring boot 2。我编写了以下配置类:

@Configuration
public class WebSocketConfiguration {

    @Autowired
    @Bean
    public HandlerMapping webSocketMapping(final MyWebSocketHandler server) {
        final Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/echo", server);

        final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
        mapping.setUrlMap(map);
        return mapping;
    }
    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

以及以下WebSocketHandler方法:

@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
    return webSocketSession.send(webSocketSession.receive().
            map(msg -> webSocketSession
                    .textMessage("response:jack ->" + msg.getPayloadAsText())));
}

现在,我可以接收我发送的任何内容,比如:

客户端发送:4545

客户端接收:响应:jack-

我想知道当客户端不向我发送消息时,我如何向客户端推送消息,我需要随时推送消息!

如何在任何时候发送自定义消息,而不是使用相同的输入消息进行响应?

共有1个答案

冀子石
2023-03-14

你可以在我的博客文章中读到http://kojotdev.com/2019/08/spring-webflux-websocket-with-vue-js/.

您需要将您的WebSocketHandler更改为:

private final GreetingsPublisher greetingsPublisher;
private final Flux<String> publisher;

public ReactiveWebSocketHandler(GreetingsPublisher greetingsPublisher) {
    this.greetingsPublisher = greetingsPublisher;
    this.publisher = Flux.create(greetingsPublisher).share();
}

@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
    final Flux<WebSocketMessage> message = publisher
            .map(greetings -> webSocketSession.textMessage(greetings));

    return webSocketSession.send(message);
}

并添加GreetingPublisher

@Component
public class GreetingsPublisher implements Consumer<FluxSink<String>> {
    private static final Logger log = LoggerFactory.getLogger(GreetingsPublisher.class);

    private final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
    private final Executor executor = Executors.newSingleThreadExecutor();

    public boolean push(String greeting) {
        return queue.offer(greeting);
    }

    @Override
    public void accept(FluxSink<String> sink) {
        this.executor.execute(() -> {
            while (true) {
                Try.of(() -> {
                    final String greeting = queue.take();
                    return sink.next(greeting);
                })
                        .onFailure(ex -> log.error("Could not take greeting from queue", ex));

            }
        });
    }
}

它是一个bean,因此无论您在何处注入它并调用push方法,它都将使用WebSocket发送消息。例如:

@Controller
public class GreetingsController {

    private final GreetingsPublisher greetingsPublisher;

    public GreetingsController(GreetingsPublisher greetingsPublisher) {
        this.greetingsPublisher = greetingsPublisher;
    }

    @Bean
    RouterFunction<ServerResponse> pushMessage() {
        return route(GET("/push"),
                request -> {
                    greetingsPublisher.push("Send a new message with WebSocket");
                    return ServerResponse.ok().body(fromObject("websocket message sent"));
                });
    }
}

首先连接WebSocket并在localhost:8080/push上打开浏览器。应该发送消息。

请注意,这似乎是SpringBoot2.1的一个bug。我在我的博客文章中提到了这一点。

 类似资料:

相关问答

相关文章

相关阅读