当前位置: 首页 > 知识库问答 >
问题:

带有MongoDB的Spring WebFlux-节流SSE客户端

周锐
2023-03-14

我正在开发一个简单的聊天服务,该服务由Spring Boot 2.1.1运行,WebFlux、Reactor 3.2.3、Mongo 3.8.2和Netty 4.1.31。

    null

观察似乎很明显,因为当我在测试期间通过SSE连接时,当新事件到达时,它几乎立即更新我--基本上,SSE的响应比每2秒轮询高数百倍。

问题是:

假定客户机最终是订阅者(或者至少我认为它是由有限的知识提供的),我能以某种方式抑制ReactiveMongoTemplate发布消息的速度吗?或者以某种方式减少对新事件的需求,而不必在客户端这样做?

// ChatRepository.java

private static final Query chatEventsQuery = new Query();

public Flux<ChatEvent> getChatEventsStream(String chatId) {
    return reactiveMongoTemplate.tail(
            chatEventsQuery,
            ChatEvent.class,
            chatId
    );
}

,

// ChatHandler.java

public Mono<ServerResponse> getChatStream(ServerRequest request) {

    String chatId = request.pathVariable(CHAT_ID_PATH_VARIABLE);
    String username = getUsername(request);

    Flux<ServerSentEvent> chatEventsStream = chatRepository
            .getChatEventsStream(chatId)
            .map(addUserSpecificPropsToChatEvent(username))
            .map(event -> ServerSentEvent.<ChatEvent>builder()
                    .event(event.getType().getEventName())
                    .data(event)
                    .build());

    log.debug("\nExposing chat stream\nchat: {}\nuser: {}", chatId, username);

    return ServerResponse.ok().body(
            chatEventsStream,
            ServerSentEvent.class
    );
}

,

// ChatRouter.java

RouterFunction<ServerResponse> routes(ChatHandler handler) {
    return route(GET("/api/chat/{chatId}/stream"), handler::getChatStream);
}

共有1个答案

鲁宏爽
2023-03-14

答案是:您可以使用flux.buffer方法来完成。然后,流量将按定义的速率将事件以块的形式发送给订阅服务器。

我发布的代码有两个主要问题

>

  • 考虑到多个用户通常都在收听一个聊天,我对ChatRepository进行了重构,以利用“热”的、可重放的流量(现在每个聊天有1个流,而不是每个用户有1个流),这些流存储在咖啡因缓存中。另外,我通过短时间间隔缓冲它们,以避免在繁忙的聊天中将事件推送给客户机时占用大量资源。

    // ChatRepository.java
    
    public Flux<List<ChatEvent>> getChatEventsStream(String chatId) {
        return Optional.ofNullable(chatStreamsCache.getIfPresent(chatId))
                .orElseGet(newCachedChatEventsStream(chatId))
                .autoConnect();
    }
    
    private Supplier<ConnectableFlux<List<ChatEvent>>> newCachedChatEventsStream(String chatId) {
        return () -> {
            ConnectableFlux<List<ChatEvent>> chatEventsStream = reactiveMongoTemplate.tail(
                    null,
                    ChatEvent.class,
                    chatId
            ).buffer(Duration.ofMillis(chatEventsBufferInterval))
                    .replay(chatEventsReplayCount);
    
            chatStreamsCache.put(chatId, chatEventsStream);
    
            return chatEventsStream;
        };
    }
    
    // ChatHandler.java
    
    public Mono<ServerResponse> getChatStream(ServerRequest request) {
    
        String chatId = request.pathVariable(CHAT_ID_PATH_VARIABLE);
        String username = getUsername(request);
    
        Flux<ServerSentEvent> chatEventsStream = chatRepository
                .getChatEventsStream(chatId)
                .map(addUserSpecificPropsToChatEvents(username))
                .map(event -> ServerSentEvent.<List<ChatEvent>>builder()
                        .event(CHAT_SSE_NAME)
                        .data(event)
                        .build());
    
        log.debug("\nExposing chat stream\nchat: {}\nuser: {}", chatId, username);
    
        return ServerResponse.ok().body(
                chatEventsStream,
                ServerSentEvent.class
        );
    }
    

    ,

    应用这些更改后,即使有3000个活跃用户,该服务也能很好地执行(JVM使用了50%的CPU,Mongo~7%主要是由于大量插入--流现在不那么明显了)

  •  类似资料:
    • 如何将响应从反应型HTTP客户机流式传输到控制器,而不在任何时候将整个响应主体放在应用程序内存中? 几乎所有project reactor客户机的示例都返回。据我所知,反应流是关于流,而不是加载它,然后发送响应。 是否可以返回,以便在不需要使用大量RAM内存来存储中间结果的情况下,将大文件从某个外部服务传输到应用程序客户机?

    • 当我用我的节点运行MongoDB连接时。js应用程序哪个游戏平台我在这里面临着越来越多的MongoDB连接的问题[不使用任何查询,但它不断增加],达到819,我的MongoDB复制服务器停止响应,间接应用程序停止工作。但是我想保持至少20个关于如何解决这些问题的联系,请帮助我。 ** Mongodb连接:const connectionString=mongodb://AAAA:PASSWORD@

    • 对于客户端, Jersey API 提供支持 接收和处理 SSE 事件两种编程模式: Pull 模式 - 从 EventInput 里面 pull 事件, 或者 Push 模式 - 监听 EventSource 异步通知 15.5.1. 从 EventInput 中读 SSE 事件 客户端可以从 EventInput 里面读事件,代码如下: Client client = ClientBuilde

    • 问题内容: 我需要连接到通过REST接口提供JSON的端点。我真的找不到以一致的方式将这两种技术结合在一起的东西。 我正在寻找一个可以让我快速入门的图书馆。 问题答案: 您可以使用Json.Net库,这个扩展类是利用了 一些用法示例: 编辑: 这是另一个没有

    • 我有一个来自http工具包的Websocket连接(Clojure,它工作得很好)。我从客户端发送ping以确保我们仍然保持连接,并且在那里一切正常。我的问题是,在这种情况下,人们是否会从服务器ping客户端? 我试图设置一些东西,如果我没有得到响应,就从服务器上删除频道,但是设置定时进程和改变状态来跟踪乒乓周期不是很方便,所以它变得有点难看。然后我就想,服务器能处理几十万个同时连接,我是不是就应

    • 我正在为SharePoint 2007编写一个使用CXF框架(版本:2.7.8)的SOAP客户端。我在这里按照在线文档添加了NTLM支持。我让客户端工作,跟踪HTTP会话显示正在发送NTLM凭证,但是,我仍然收到401未授权响应。 代码: 有趣的是,我使用HTTP PUT for WebDAV编写了一个类似的客户端,使用Apache HTTPClient库上传文档,并且能够使用NTLM成功地进行身