当前位置: 首页 > 知识库问答 >
问题:

RxJava改造轮询

汤枫
2023-03-14

我的问题是我无法使用Retrofit获得无限流。在我获得初始轮询()请求的凭据后-我执行初始轮询()请求。如果没有变化,每个轮询()请求会在25秒内响应,如果有任何变化,则会更早响应-返回changed_data[]。每个响应都包含下一个轮询请求所需的时间戳数据-我应该在每次轮询()响应后执行新轮询()请求。这是我的代码:

getServerApi().getLongPollServer() 
  .flatMap(longPollServer -> getLongPollServerApi(longPollServer.getServer()).poll("a_check", Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollServer.getTs(), "") 
   .take(1) 
   .flatMap(longPollEnvelope -> getLongPollServerApi(longPollServer.getServer()).poll("a_check", Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollEnvelope.getTs(), ""))) 
   .retry()
   .subscribe(longPollEnvelope1 -> {
   processUpdates(longPollEnvelope1.getUpdates());
});

我是RxJava新手,也许我不懂一些东西,但我无法获得无限流。我接到3个电话,然后是onNext和onComplete。

附言:也许有更好的解决方案在Android上实施长轮询?

共有1个答案

毋琪
2023-03-14

虽然不理想,但我相信您可以使用RX的副作用来实现预期的结果(“doOn”操作)。

Observable<CredentialsWithTimestamp> credentialsProvider = Observable.just(new CredentialsWithTimestamp("credentials", 1434873025320L)); // replace with your implementation

Observable<ServerResponse> o = credentialsProvider.flatMap(credentialsWithTimestamp -> {
    // side effect variable
    AtomicLong timestamp = new AtomicLong(credentialsWithTimestamp.timestamp); // computational steering (inc. initial value)
    return Observable.just(credentialsWithTimestamp.credentials) // same credentials are reused for each request - if invalid / onError, the later retry() will be called for new credentials
            .flatMap(credentials -> api.query("request", credentials, timestamp.get()))  // this will use the value from previous doOnNext
            .doOnNext(serverResponse -> timestamp.set(serverResponse.getTimestamp()))
            .repeat();
})
        .retry()
        .share();

private static class CredentialsWithTimestamp {

    public final String credentials;
    public final long timestamp; // I assume this is necessary for you from the first request

    public CredentialsWithTimestamp(String credentials, long timestamp) {
        this.credentials = credentials;
        this.timestamp = timestamp;
    }
}

订阅“o”时,内部可观察将重复。如果有错误,那么“o”将重试并重新从凭据流请求。

在您的示例中,计算指导是通过更新timestamp变量来实现的,这对于下一个请求是必要的。

 类似资料:
  • 所以基本上我想做的是,打第一个网络电话。如果调用的RESTful web服务返回1,则进行第二次网络调用。如果web服务返回0,则不要进行第二次网络调用。 这是我的密码 显然,上面的代码是错误的,因为它应该总是返回可观察的。那么,如果第一次网络调用返回0,我的代码应该如何编写?

  • 我在使用Reformation的RxJava支持链接观察对象时遇到了问题。我可能误解了如何使用它,否则它可能是改装中的一个bug。希望这里有人能帮我理解发生了什么事。编辑:我正在对这些响应使用MockRestaAdapter—这可能与RxSupport实现略有不同有关。 这是一个假的银行应用程序。它正在尝试进行传输,传输完成后,它应该执行帐户请求以更新帐户值。这基本上只是我试用flatMap的一个

  • 如何使用RxJava和Kotlin中的改型创建api调用的泛型类? 我的JAVA实现是:: 首先添加Gradle依赖项:(更新到最新版本(如果可用)) //用于改装 实现“com.squareup.reverfit2:reverfit:2.3.0” 实现“com.squareup.reverfit2:converter-gson:2.3.0” //对于拦截器实现“com.squareup.okht

  • 我试图使用RxJava和Rome的改型,在你建议使用组件拱之前(在这个机会是不可能的,项目是在和50%和只需要继续与拱清理)。 所以问题是这个。我有一个返回POJO的web服务。类似于这样: null 相互作用者 回调 演示文稿 演示者 查看

  • 我面临的问题是我需要一个身份验证令牌来创建我的Retrofit服务。我目前使用可观察来获取所述令牌,导致一个相当丑陋的可观察构造: 我忍不住觉得这不是应该做的。我说得对吗?

  • 我正在尝试从使用普通改型迁移到使用RxJava扩展进行改型,以便在后台线程上进行API调用链。 例如,我有一个名为ModelGroup的对象,它有一个ModelPerson对象列表。我的目标是做到以下几点。 将ModelGroup发送到服务器并接收一个响应,它是一个整数,表示新插入的ID,我们称之为newGroupId 对于ModelGroup中的每个ModelPerson,设置Person。gr