我对RxJava并不完全陌生,但我被一项看似简单的任务所阻碍。
我有一个数据源,它公开了一个反应式API,我所要做的就是获取一些数据,返回它,并在没有其他消息发出时自动关闭连接。
这是我的代码:
public Observable<Object> execute(String query) {
Single<RxConnection> rxConnection = getRxDB().getConnection();
return rxConnection.flatMapObservable(conn -> {
Observable<Object> rxResult = conn.query(query);
return rxResult.doOnCompleted(() -> {
conn.close(); // THIS DOES NOT WORK. I would like to close the connection and to wait without blocking.
});
});
}
conn.query()和conn.close()在不同的调度程序中异步执行。此代码不起作用,因为conn.close()返回一个没有订阅服务器的Completable。此外,如果我手动订阅doOnCompleted方法本身,那么rxResult Observable将在不等待连接关闭的情况下完成。
我希望“execute(String query)”方法返回一个可观察值:-发出conn.query()调用获取的所有项-当没有更多项要发出时,它触发conn.close()-仅在conn.close()Completable之后完成;
谢谢
因此,您希望仅在可观察对象被订阅时执行查询:
private Observable<Object> executeDeferred(String query) {
// your original execute() method here, including doOnComplete
}
public Observable<Object> execute(String query) {
return Observable.defer(() -> executeDeferred(query));
}
这将确保连接、查询和释放仅在observable获得订阅时发生,而不是在安装过程中发生。
编辑:您还需要一个doOnUn订阅/doOnTerminate,以在过早退订发生时关闭连接。
使用可观察的<代码>管理资源关闭。使用:
Observable<T> obs =
Observable.using(
resourceFactory,
observableFactory,
disposeAction)
此创建方法确保由resourceFactory创建的资源在终止(完成或错误)或取消订阅时被释放。
怎么样:
Observable<Object> rxResult = conn.query(query)
.concatWith(conn.close().toObservable())
.onErrorResumeNext(e ->
conn.close().toObservable().concatWith(Observable.error(e)));
问题内容: 这是我的情况: 在this.handleFormSubmit()上,我正在执行this.setState() 在this.handleFormSubmit()内部,我正在调用this.findRoutes(); -取决于this.setState()的成功完成 this.setState(); 在this.findRoutes被调用之前没有完成… 在调用this.findRoutes(
我目前在Android和Kotlin上使用RxJava,但我有一个问题,如果不使用toBlocking(),我无法解决。 我在员工服务中有一个方法,它返回一个可观察的 这一切都很好,因为每当员工发生变化时,这个可观察对象就会发出新的员工列表。但是我想从员工那里生成一个PDF文件,这显然不需要每次员工更改时都运行。另外,我想从PDF生成器方法返回一个可完成的对象。我想在PDF中添加一个标题,然后遍历
问题是 我有一个活动,它定期从API获取数据并显示收到的数据。API 使用 OAuth,因此我会收到一个临时访问令牌,该令牌在一段时间(1 小时)后过期。如果应用尝试使用过期的令牌获取数据,则显然请求将失败。在我的应用的早期迭代中,我对网络请求使用 AsyncTasks,基本上只是执行了一个新的异步任务,该任务将在调用从服务器获取数据的主异步任务之前获取新的访问令牌。这工作得很好,因为主要的Asy
问题内容: 嗨,下面的Javascript是在我提交表单时调用的。它首先从文本区域分割一堆网址,然后: 1)在表格中为每个网址添加行,并在最后一列(“状态”列)中显示“未开始”。 2)再次循环遍历每个URL,首先它进行ajax调用以检查状态(status.php),该状态将返回0到100之间的百分比 。3)在同一循环中,它通过ajax启动实际过程(process.php),当进程完成时(请记住连续
假设存在包含方法的接口: 实现combinedCall方法的最佳方法是什么: 从makeHttpCall获取数据 使用store InDatabase存储它 返回在store InDatabase完成时完成的完成? 似乎在RxJava 1.0中可以执行Completable.merge(可观察),但合并似乎不再接受可观察。
假设我有 我要实现的是一个流,它在doTask()完成后发出一个just(“completed”)项。 对于Observable,我可以执行doTask().map(f->just(“completed”));但是,如果它在完成的过程中没有散发出任何自然的东西,我怎么能做到完全的呢?