我不知道为什么背压在这个流动链中没有得到尊重。
我将其简化为以下三个阶段:heavyFlowableBasedOnInput
,singleThreadedPieceOfWork
和writeFileToDisk
。前两个阶段的速度比最后一个阶段快得多,因此它们的输出会让内存等待第3步,从而导致内存不足错误/内存使用量不断增加。
当我观看调试器时,我注意到步骤1比步骤3更频繁地发生,尽管我已经将缓冲区大小(observeOn的第三个参数)设置为一个较低的数字(例如1)。
Flowable.fromIterable(listOfResources)
.subscribeOn(Schedulers.io())
.flatMap { flowableUsing(it) } // small flowable though, ~0-10, mostly 1 to 2 emissions
.observeOn(Schedulers.single(), false, 1)
.map { singleThreadedPieceOfWork() }
.observeOn(Schedulers.io(), false, 1)
.map { writeFileToDisk() }
.subscribe()
我已经阅读了我正在使用的每个运算符的留档,看起来他们都有一些背压支持(例如通过背压BackpressureKind.PASS_THROUGH
,荣誉下游背压BackpressureKind. FULL
)。我没有线索说明为什么我要重载写入文件磁盘的队列/缓冲区。
我确实读过为什么我的RxJava Flowable在使用观察时不尊重背压?我知道观察会创建一个“异步边界”,但我不知道这如何影响背压。
来自akarnokds博客:
背压是RxJava的一个基石,它通过反应流防止缓冲区膨胀。大多数没有计时功能的运营商都会应用并尊重背压。不幸的是,flatMap在默认情况下只能说它尊重背压,但不能将其应用于主要输入。
这意味着,当您使用flatMap或merge的公共重载时,操作符将请求Long。从上游获取最大价值,同时实现所有价值。这种无界行为会导致对生成的内部观察值的活动订阅数量无界。
如果内部可观察对象很短或不经常发射,此属性不会引起太多麻烦,但是如果在平面图之后有一个异步边界,比方说,观察,项目很容易在平面图中堆积,并大大降低性能。
我想我通过在flatMap重载函数中指定maxConcurrency参数:kotlin解决了这个问题
Flowable.fromIterable(listOfResources)
.subscribeOn(Schedulers.io())
.flatMap({ flowableUsing(it) }, false, 1, 5) // <-- the maxConcurrency and bufferSize
.observeOn(Schedulers.single(), false, 1)
.map { singleThreadedPieceOfWork() }
.observeOn(Schedulers.io(), false, 1)
.map { writeFileToDisk() }
.subscribe()
或者,这不是一个解决方案。我可能只是降低了性能,以至于不再出现内存问题。
等待更多阅读/理解
我试图创建一个,它会发出关于反压力的事件,以避免内存问题,同时并行运行转换的每个阶段以提高效率。我创建了一个简单的测试程序,来解释我的程序的不同步骤的行为,以及何时发出事件,何时等待不同的阶段。 我的程序如下: 当我运行这个程序时,我得到了与预期背压相关的输出,其中一批事件被发送到
更新2 在较新版本的Sprint Boot上再次遇到此问题,不得不改为:
我正在使用MySQL,并希望利用属性。默认的MySQL JDBC实现并不真正尊重它。如果将 fetchsize 设置为 它将单独获取每一行,但考虑到我想使用 fetchSize 的原因是我有足够的数据将我的内存使用量放入 2 G 范围,每行必须执行一个查询将永远花费。 相反,我想插入一个JDBC实现,它将与MySQL一起使用并适当尊重获取大小,允许我将获取大小设置为10,000或其他更高的限制。有
我有一个将launchMode设置为singleTask的活动: 我有一个持续的通知,其中包含启动该活动的PendingIntent: 当我与现有的MyActivity交互时,我点击Home并通过启动器重新启动MyActivity,MyActivity的按预期调用。 问题是,当我与现有的MyActivity交互时,我点击正在进行的通知,通过创建一个新的MyActivity,通过销毁现有的MyAct
我想用for循环写一个代码,但我无法得到背后的逻辑。有人能帮助我在这个代码问题:问题的图片
我正试图编写一个应用程序,将与Kafka集成使用骆驼。(版本-3.4.2) 我从这个问题的答案中借用了一种方法。 我有一条路线可以监听Kafka主题的信息。通过使用一个简单的执行器,该消息的处理与消耗是分离的。每个处理都作为任务提交给该执行者。消息的顺序并不重要,唯一需要考虑的因素是消息处理的速度和效率。我已禁用自动提交,并在任务提交给执行者后手动提交消息。丢失当前正在处理的消息(由于崩溃/关闭)