我有一个场景,我需要定期调用一个应用编程接口来检查结果。我使用Flowable.interval
来创建一个调用应用编程接口的间隔函数。
然而,我有背压的问题。在我下面的例子中,间隔中的每个记号都会创建一个新的单曲。理想的效果是仅在调用尚未进行时调用API
Flowable.interval(1, 1, TimeUnit.SECONDS).flatMap {
System.out.println("Delay $it")
//simulates API call
Single.just(1L).doAfterSuccess {
System.out.println("NEW SINGLE!!!")
}.delay(4, TimeUnit.SECONDS).doAfterSuccess {
System.out.println("SINGLE SUCCESS!!!")
}.toFlowable()
}.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).blockingFirst()
我可以使用过滤器变量来解决这个问题:
var filter = true
Flowable.interval(1, 1, TimeUnit.SECONDS).filter {
filter
}.flatMap {
System.out.println("Delay $it")
Single.just(1L).doOnSubscribe {
filter = true
}.doAfterSuccess {
System.out.println("NEW SINGLE!!!")
}.delay(4, TimeUnit.SECONDS).doAfterSuccess {
System.out.println("SINGLE!!!")
filter = true
}.toFlowable()
}.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).blockingFirst()
但是它看起来像一个黑客解决方案。我已经厌倦了在间隔
函数之后应用onBackPressureDrop
,但是它没有效果。
有什么建议吗?
您还必须约束flatMap
:
Flowable.interval(1, 1, TimeUnit.SECONDS)
.onBackpressureDrop()
.flatMapSingle({
System.out.println("Delay $it")
//simulates API call
Single.just(1L).doAfterSuccess {
System.out.println("NEW SINGLE!!!")
}.delay(4, TimeUnit.SECONDS).doAfterSuccess {
System.out.println("SINGLE SUCCESS!!!")
}
}, false, 1) // <----------------------------------------------------------
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe()
如何应用背压来限制生产比并行运行的更多的项目? 为了说明起见,这里有一个快速twitter用户名生成器、一个慢速twitter查找调用、一个慢速twitter文件编写器和一个打印方法。 最终目标是并行运行twitter查找,同时对生成器施加反压力,使其不会发出超出可处理范围的用户名(预计会有一些预取)。 这很好地在一个单独的线程上生成了5个twitter用户名 不确定它是正确的,但我的理由是,从一
在用RxJava编写数据同步作业时,我发现了一个无法解释的奇怪行为。我是RxJava的新手,非常感谢您的帮助。 简单地说,我的工作很简单,我有一个元素ID列表,我调用一个Web服务来按ID获取每个元素,进行一些处理,并执行多个调用来将数据推送到数据库。数据加载比数据存储快,所以我遇到了OutOfMemory错误。 我的代码看起来很像“失败”测试,但在进行一些测试后,我意识到删除这行代码: 让它发挥
假设我有一个返回列表的博客帖子api 从列表创建可观察 将每个可观察拆分为
有人能解释一下如何在RxJava中通过平面图运算符传递完整信号吗? 如果flatMap操作符被注释,我可以得到从1到10的数字列表,这意味着toList将收到onComplete信号。但当我想在flatMap中进一步处理数据时,它会消耗一个完整的信号,而我无法得到任何结果。如何通过flatMap操作符传递onComplete信号? 我有以下简单的程序:
我是RxJava的新手,经常被平面图函数弄糊涂。根据文档,平面图 有人能给出一个很好的用例吗?为什么要将原来的可观察对象转换为可观察对象(复数),然后将它们转换为单个可观察对象。 你为什么不直接用“地图”呢? 如果你举一个Android的例子,那就太棒了,否则纯Java就足够了。谢谢
在阅读了关于表达式的这个极好的答案之后,我尝试将我的<code>平面图</code>转换为表达式</code>的<code>。 然后,for-expression。 我得到一个编译时错误 我的for表达式怎么了?