当前位置: 首页 > 知识库问答 >
问题:

使用OkHttp3和ReactiveX Java实现长轮询的正确方法

於英朗
2023-03-14

下面的代码演示了我正在尝试做的事情(似乎有效),但我很确定这是不行的。

// return Observable<Response>
makeGetObservable("http://my.service.com/api/events")
  // check for error and map to Observable<ResponseBody>
  .map(this::mapRespBodyOrError)
  // flat map to Observable<String> representing line of long polling response
  .flatMap(respBody -> Observable.create(emitter -> {
    // open reader on response body stream
    try (BufferedReader reader = new BufferedReader(respBody.charStream())) {
      String line;
      // block and wait to read a line from input
      while((line = reader.readLine()) !=null) {
        // once line was read from response body input stream emit it as observable event
        emitter.onNext(line);
      }
    }
  }));

共有1个答案

陆卓
2023-03-14

经过一些研究,我发现被阻塞的线程是OkHttp客户端线程。这是由于我实现了MakegetObservable,它从OkHttp客户机的newcall...enqueue回调发出。默认情况下,OkHttp客户端有5个线程池,用于一个资源的5个并发连接。因此,每次订阅另一个长轮询资源时,我都会阻塞其中一个线程。在5个阻塞线程之后,OkHttp客户端停止工作,因为它没有线程来处理响应。

正如@Progman所建议的,我使用subscribeon使用IO调度器,它为每个阻塞IO操作生成新线程。一个必须小心和适当地处置资源与这个调度器。

我的实现目前如下所示(添加了调度程序、完成和错误事件)

// return Observable<Response>
makeGetObservable("http://my.service.com/api/events")
  // check for error and map to Observable<ResponseBody>
  .map(this::mapRespBodyOrError)
  // use IO scheduler that spawns new threads to take care of blocking operations
  .observeOn(Schedulers.io())
  // flat map to Observable<String> representing line of long polling response
  .flatMap(respBody -> Observable.create(emitter -> {
    try (BufferedReader reader = new BufferedReader(respBody.charStream())) {
      // blocking read lines while available
      String line;
      while ((line = reader.readLine()) != null) {
        // emit event for every line
        emitter.onNext(line);
      }
      // emit the completion event to indicate we are done
      emitter.onComplete();
    } catch (IOException | RuntimeException err) {
      // emit any error that might have occurred
      emitter.onError(err);
    }
  }));
 类似资料:
  • 问题内容: 我最近在StackOverflow上问了一个有关我的功能的问题,人们建议我使用Ajax Long Polling。我花了几天的时间研究该主题,并尝试编写基本的长轮询代码,但是这些代码都没有起作用,而且我什么也做不了。 这是我的基本功能: 有人能够告诉我如何将其转变为基本的长轮询功能,或者甚至直接指向我需要到达的路径。很感谢任何形式的帮助。谢谢! 问题答案: 通常(即,当不使用长时间轮询

  • 长轮询在GCP PubSub JS SDK上可用吗? 我希望能够同时处理多个PubSub消息,例如: 这是它将如何在AWS上工作的一个示例: SQS队列包含超过5条消息。 侦听器将在单个中一次获得5条消息。事件

  • 本文向大家介绍JavaScript使用setInterval()函数实现简单轮询操作的方法,包括了JavaScript使用setInterval()函数实现简单轮询操作的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了JavaScript使用setInterval()函数实现简单轮询操作的方法。分享给大家供大家参考。具体分析如下: 轮询(Polling)是一种CPU决策如何提供周边设备

  • 在 OpenResty 中,连接池在使用上如果不加以注意,容易产生数据写错地方,或者得到的应答数据异常以及类似的问题,当然使用短连接可以规避这样的问题,但是在一些企业用户环境下,短连接 + 高并发对企业内部的防火墙是一个巨大的考验,因此,长连接自有其用武之地,使用它的时候要记住,长连接一定要保持其连接池中所有连接的正确性。 -- 错误的代码 local function send() fo

  • 问题内容: 我可以找到许多有关LongPolling工作原理的信息例如this和this],但是没有_简单的_ 示例说明如何在代码中实现这一点。 我所能找到的就是cometd,它依赖于Dojo JS框架和相当复杂的服务器系统。 基本上,我将如何使用Apache来处理请求,以及如何编写一个简单的脚本(例如PHP)来“长时间轮询”服务器以接收新消息? 该示例不必是可伸缩的,安全的或完整的,只需要工作即