下面的代码演示了我正在尝试做的事情(似乎有效),但我很确定这是不行的。
// return Observable<Response>
makeGetObservable("http://my.service.com/api/events")
// check for error and map to Observable<ResponseBody>
.map(this::mapRespBodyOrError)
// flat map to Observable<String> representing line of long polling response
.flatMap(respBody -> Observable.create(emitter -> {
// open reader on response body stream
try (BufferedReader reader = new BufferedReader(respBody.charStream())) {
String line;
// block and wait to read a line from input
while((line = reader.readLine()) !=null) {
// once line was read from response body input stream emit it as observable event
emitter.onNext(line);
}
}
}));
经过一些研究,我发现被阻塞的线程是OkHttp客户端线程。这是由于我实现了MakegetObservable
,它从OkHttp客户机的newcall...enqueue回调发出。默认情况下,OkHttp客户端有5个线程池,用于一个资源的5个并发连接。因此,每次订阅另一个长轮询资源时,我都会阻塞其中一个线程。在5个阻塞线程之后,OkHttp客户端停止工作,因为它没有线程来处理响应。
正如@Progman所建议的,我使用subscribeon
使用IO调度器,它为每个阻塞IO操作生成新线程。一个必须小心和适当地处置资源与这个调度器。
我的实现目前如下所示(添加了调度程序、完成和错误事件)
// return Observable<Response>
makeGetObservable("http://my.service.com/api/events")
// check for error and map to Observable<ResponseBody>
.map(this::mapRespBodyOrError)
// use IO scheduler that spawns new threads to take care of blocking operations
.observeOn(Schedulers.io())
// flat map to Observable<String> representing line of long polling response
.flatMap(respBody -> Observable.create(emitter -> {
try (BufferedReader reader = new BufferedReader(respBody.charStream())) {
// blocking read lines while available
String line;
while ((line = reader.readLine()) != null) {
// emit event for every line
emitter.onNext(line);
}
// emit the completion event to indicate we are done
emitter.onComplete();
} catch (IOException | RuntimeException err) {
// emit any error that might have occurred
emitter.onError(err);
}
}));
问题内容: 我最近在StackOverflow上问了一个有关我的功能的问题,人们建议我使用Ajax Long Polling。我花了几天的时间研究该主题,并尝试编写基本的长轮询代码,但是这些代码都没有起作用,而且我什么也做不了。 这是我的基本功能: 有人能够告诉我如何将其转变为基本的长轮询功能,或者甚至直接指向我需要到达的路径。很感谢任何形式的帮助。谢谢! 问题答案: 通常(即,当不使用长时间轮询
长轮询在GCP PubSub JS SDK上可用吗? 我希望能够同时处理多个PubSub消息,例如: 这是它将如何在AWS上工作的一个示例: SQS队列包含超过5条消息。 侦听器将在单个中一次获得5条消息。事件
本文向大家介绍JavaScript使用setInterval()函数实现简单轮询操作的方法,包括了JavaScript使用setInterval()函数实现简单轮询操作的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了JavaScript使用setInterval()函数实现简单轮询操作的方法。分享给大家供大家参考。具体分析如下: 轮询(Polling)是一种CPU决策如何提供周边设备
在 OpenResty 中,连接池在使用上如果不加以注意,容易产生数据写错地方,或者得到的应答数据异常以及类似的问题,当然使用短连接可以规避这样的问题,但是在一些企业用户环境下,短连接 + 高并发对企业内部的防火墙是一个巨大的考验,因此,长连接自有其用武之地,使用它的时候要记住,长连接一定要保持其连接池中所有连接的正确性。 -- 错误的代码 local function send() fo
问题内容: 我可以找到许多有关LongPolling工作原理的信息例如this和this],但是没有_简单的_ 示例说明如何在代码中实现这一点。 我所能找到的就是cometd,它依赖于Dojo JS框架和相当复杂的服务器系统。 基本上,我将如何使用Apache来处理请求,以及如何编写一个简单的脚本(例如PHP)来“长时间轮询”服务器以接收新消息? 该示例不必是可伸缩的,安全的或完整的,只需要工作即