我正在开发一个简单的聊天服务,该服务由Spring Boot 2.1.1运行,WebFlux、Reactor 3.2.3、Mongo 3.8.2和Netty 4.1.31。
观察似乎很明显,因为当我在测试期间通过SSE连接时,当新事件到达时,它几乎立即更新我--基本上,SSE的响应比每2秒轮询高数百倍。
问题是:
假定客户机最终是订阅者(或者至少我认为它是由有限的知识提供的),我能以某种方式抑制ReactiveMongoTemplate发布消息的速度吗?或者以某种方式减少对新事件的需求,而不必在客户端这样做?
// ChatRepository.java
private static final Query chatEventsQuery = new Query();
public Flux<ChatEvent> getChatEventsStream(String chatId) {
return reactiveMongoTemplate.tail(
chatEventsQuery,
ChatEvent.class,
chatId
);
}
,
// ChatHandler.java
public Mono<ServerResponse> getChatStream(ServerRequest request) {
String chatId = request.pathVariable(CHAT_ID_PATH_VARIABLE);
String username = getUsername(request);
Flux<ServerSentEvent> chatEventsStream = chatRepository
.getChatEventsStream(chatId)
.map(addUserSpecificPropsToChatEvent(username))
.map(event -> ServerSentEvent.<ChatEvent>builder()
.event(event.getType().getEventName())
.data(event)
.build());
log.debug("\nExposing chat stream\nchat: {}\nuser: {}", chatId, username);
return ServerResponse.ok().body(
chatEventsStream,
ServerSentEvent.class
);
}
,
// ChatRouter.java
RouterFunction<ServerResponse> routes(ChatHandler handler) {
return route(GET("/api/chat/{chatId}/stream"), handler::getChatStream);
}
答案是:您可以使用flux.buffer
方法来完成。然后,流量将按定义的速率将事件以块的形式发送给订阅服务器。
我发布的代码有两个主要问题
>
考虑到多个用户通常都在收听一个聊天,我对ChatRepository进行了重构,以利用“热”的、可重放的流量(现在每个聊天有1个流,而不是每个用户有1个流),这些流存储在咖啡因缓存中。另外,我通过短时间间隔缓冲它们,以避免在繁忙的聊天中将事件推送给客户机时占用大量资源。
// ChatRepository.java
public Flux<List<ChatEvent>> getChatEventsStream(String chatId) {
return Optional.ofNullable(chatStreamsCache.getIfPresent(chatId))
.orElseGet(newCachedChatEventsStream(chatId))
.autoConnect();
}
private Supplier<ConnectableFlux<List<ChatEvent>>> newCachedChatEventsStream(String chatId) {
return () -> {
ConnectableFlux<List<ChatEvent>> chatEventsStream = reactiveMongoTemplate.tail(
null,
ChatEvent.class,
chatId
).buffer(Duration.ofMillis(chatEventsBufferInterval))
.replay(chatEventsReplayCount);
chatStreamsCache.put(chatId, chatEventsStream);
return chatEventsStream;
};
}
// ChatHandler.java
public Mono<ServerResponse> getChatStream(ServerRequest request) {
String chatId = request.pathVariable(CHAT_ID_PATH_VARIABLE);
String username = getUsername(request);
Flux<ServerSentEvent> chatEventsStream = chatRepository
.getChatEventsStream(chatId)
.map(addUserSpecificPropsToChatEvents(username))
.map(event -> ServerSentEvent.<List<ChatEvent>>builder()
.event(CHAT_SSE_NAME)
.data(event)
.build());
log.debug("\nExposing chat stream\nchat: {}\nuser: {}", chatId, username);
return ServerResponse.ok().body(
chatEventsStream,
ServerSentEvent.class
);
}
,
应用这些更改后,即使有3000个活跃用户,该服务也能很好地执行(JVM使用了50%的CPU,Mongo~7%主要是由于大量插入--流现在不那么明显了)
如何将响应从反应型HTTP客户机流式传输到控制器,而不在任何时候将整个响应主体放在应用程序内存中? 几乎所有project reactor客户机的示例都返回。据我所知,反应流是关于流,而不是加载它,然后发送响应。 是否可以返回,以便在不需要使用大量RAM内存来存储中间结果的情况下,将大文件从某个外部服务传输到应用程序客户机?
当我用我的节点运行MongoDB连接时。js应用程序哪个游戏平台我在这里面临着越来越多的MongoDB连接的问题[不使用任何查询,但它不断增加],达到819,我的MongoDB复制服务器停止响应,间接应用程序停止工作。但是我想保持至少20个关于如何解决这些问题的联系,请帮助我。 ** Mongodb连接:const connectionString=mongodb://AAAA:PASSWORD@
对于客户端, Jersey API 提供支持 接收和处理 SSE 事件两种编程模式: Pull 模式 - 从 EventInput 里面 pull 事件, 或者 Push 模式 - 监听 EventSource 异步通知 15.5.1. 从 EventInput 中读 SSE 事件 客户端可以从 EventInput 里面读事件,代码如下: Client client = ClientBuilde
问题内容: 我需要连接到通过REST接口提供JSON的端点。我真的找不到以一致的方式将这两种技术结合在一起的东西。 我正在寻找一个可以让我快速入门的图书馆。 问题答案: 您可以使用Json.Net库,这个扩展类是利用了 一些用法示例: 编辑: 这是另一个没有
我有一个来自http工具包的Websocket连接(Clojure,它工作得很好)。我从客户端发送ping以确保我们仍然保持连接,并且在那里一切正常。我的问题是,在这种情况下,人们是否会从服务器ping客户端? 我试图设置一些东西,如果我没有得到响应,就从服务器上删除频道,但是设置定时进程和改变状态来跟踪乒乓周期不是很方便,所以它变得有点难看。然后我就想,服务器能处理几十万个同时连接,我是不是就应
我正在为SharePoint 2007编写一个使用CXF框架(版本:2.7.8)的SOAP客户端。我在这里按照在线文档添加了NTLM支持。我让客户端工作,跟踪HTTP会话显示正在发送NTLM凭证,但是,我仍然收到401未授权响应。 代码: 有趣的是,我使用HTTP PUT for WebDAV编写了一个类似的客户端,使用Apache HTTPClient库上传文档,并且能够使用NTLM成功地进行身