当前位置: 首页 > 知识库问答 >
问题:

Spring WebFlux: WebSocketSession.send()不发送消息

濮赤岩
2023-03-14

Helloc我试图使用Spring WebFlux创建webscoket终结点。我希望这个endpoint返回一些事件。
为了做到这一点,我创建了事件的ConnectableFlux,并在句柄(...)方法中将其映射到Flux。但是在我把它给WebSocket会话之后,什么都没有发生——webSocket会话客户端没有收到任何东西。但是同时println(event.toString()),你可以在下面的句柄(...)方法中看到它实际上将信息打印到控制台。
你能告诉我错过了什么吗?

public class EventWebsocketHandler implements WebSocketHandler {

    //  constructors and etc.

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        ObjectMapper objectMapper = new ObjectMapper();

        Flux<WebSocketMessage> messages = eventService.events()
                .flatMap(event -> {
                    try {
                        System.out.println(event.toString());
                        return Mono.just(objectMapper.writeValueAsString(event));
                    } catch (JsonProcessingException e) {
                        return Mono.error(e);
                    }
                })
                .map(session::textMessage);

        return session.send(messages);
    }

@Service
public class EventService {

    List<EventDto> events = new ArrayList<>();

    private final Flux<EventDto> eventFlux = Flux.<EventDto>create(fluxSink -> {
        while (true) {
            if (!events.isEmpty()) {
                fluxSink.next(events.get(0));
                events.remove(0);
            }
        }
    })
            .publish()
            .autoConnect();

    public void push(EventDto event) {
        events.add(event);
    }

    public Flux<EventDto> events() {
        return eventFlux;
    }

}

我在我的项目中有另一个WebSocketHandler,它工作正常,这意味着配置一切正常:


public class MyWebSocketHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        Flux<Long> source = Flux.interval(Duration.ofMillis(1000 * 3));
        return session.send(source.map(l -> session.textMessage(String.valueOf(l))));
    }
}

共有1个答案

陶永望
2023-03-14

    private final Flux<EventDto> eventFlux = Flux.<EventDto>create(fluxSink -> {
    while (true) {
        if (!events.isEmpty()) {
            fluxSink.next(events.get(0));
            events.remove(0);
        }
    }
})
        .publish()
        .autoConnect();

必须用这个来代替

private final Sinks.Many<EventDto> processor = Sinks.many().multicast().onBackpressureBuffer();
 类似资料:
  • 接口说明 轻推轻应用/订阅号支持发送文本、图片、文本卡片、图文、key-value、文件、待办等消息类型。本接口针对各种消息类型和发送的对象(单发、群发以及给部分人发送)进行了定义。 注:openid是用户关注某个轻应用/订阅号后生成的唯一id,单发和给部分人发送消息必须携带此参数,可以通过如下接口来获取: 根据qt_code获取用户基本信息 获取使用者列表 通过userId获取openid 消息

  • 主动发送消息 use EasyWeChat\Kernel\Messages\TextCard; // 获取 Messenger 实例 $messenger = $app->messenger; // 准备消息 $message = new TextCard([ 'title' => '你的请假单审批通过', 'description' => '单号:1928373, ...

  • 向已经创建连接凭据的设备发送消息数据。 请求方式: |4|2|3|message|\r 参数 message 发送的消息内容 返回值: "|4|2|3|1|\r" 发送成功 "|4|2|3|2|\r" 发送失败 Arduino样例: softSerial.print("|4|2|3|DFRobot|\r");

  • 问题内容: 我有自己的基于JDA的Discord BOT。我需要向特定频道发送短信。我知道如何将消息作为onEvent响应发送,但是在我的情况下,我没有此类事件。 我有:作者(BOT),令牌和通道号。 我的问题是:如何在 没有事件的情况下 将消息发送到此频道? 问题答案: 好吧,我想我知道你的意思。您无需进行任何事件即可获取频道ID和发送消息。您唯一需要做的就是实例化JDA,调用awaitRead

  • 之前章节定义的SocketIO活动处理函数可以凭借send()函数和emit()函数来连接客户端 接下来的例子是将接收到的消息退回到发送它们的客户端: from flask_socketio import send, emit @socketio.on('message') def handle_message(message): send(message) @socketio.on('

  • 26.3 发送消息 JmsTemplate包含许多方便的方法来发送消息。有些发送方法可以使用 javax.jms.Destination对象指定目的地,也可以使用字符串在 JNDI 中查找目的地。没有目的地参数的发送方法使用默认的目的地。 import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.