在用RxJava编写数据同步作业时,我发现了一个无法解释的奇怪行为。我是RxJava的新手,非常感谢您的帮助。
简单地说,我的工作很简单,我有一个元素ID列表,我调用一个Web服务来按ID获取每个元素,进行一些处理,并执行多个调用来将数据推送到数据库。数据加载比数据存储快,所以我遇到了OutOfMemory错误。
我的代码看起来很像“失败”测试,但在进行一些测试后,我意识到删除这行代码:
flatMap(dt -> Observable.just(dt))
让它发挥作用。失败的测试输出清楚地表明,未使用的项会堆积起来,这会导致OutOfMemory。工作测试输出表明,生产者将始终等待消费者,所以这永远不会导致OutOfMemory。
public static class DataStore {
public Integer myVal;
public byte[] myBigData;
public DataStore(Integer myVal) {
this.myVal = myVal;
this.myBigData = new byte[1000000];
}
}
@Test
public void working() {
int MAX_CONCURRENT_LOAD = 1;
int MAX_CONCURRENT_STORE = 2;
AtomicInteger nbUnconsumed = new AtomicInteger(0);
List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
Observable.from(ids)
.flatMap(this::produce, MAX_CONCURRENT_LOAD)
.doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet()))
.flatMap(this::consume, MAX_CONCURRENT_STORE)
.doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet()))
.toBlocking().forEach(s -> {});
logger.info("Finished");
}
@Test
public void failing() {
int MAX_CONCURRENT_LOAD = 1;
int MAX_CONCURRENT_STORE = 2;
AtomicInteger nbUnconsumed = new AtomicInteger(0);
List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
Observable.from(ids)
.flatMap(this::produce, MAX_CONCURRENT_LOAD)
.doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet()))
.flatMap(dt -> Observable.just(dt))
.flatMap(this::consume, MAX_CONCURRENT_STORE)
.doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet()))
.toBlocking().forEach(s -> {});
logger.info("Finished");
}
private Observable<DataStore> produce(final int value) {
return Observable.<DataStore>create(s -> {
try {
if (!s.isUnsubscribed()) {
Thread.sleep(200); //Here I synchronous call WS to retrieve data
s.onNext(new DataStore(value));
s.onCompleted();
}
} catch (Exception e) {
s.onError(e);
}
}).subscribeOn(Schedulers.io());
}
private Observable<Boolean> consume(DataStore value) {
return Observable.<Boolean>create(s -> {
try {
if (!s.isUnsubscribed()) {
Thread.sleep(1000); //Here I synchronous call DB to store data
s.onNext(true);
s.onCompleted();
}
} catch (Exception e) {
s.onNext(false);
s.onCompleted();
}
}).subscribeOn(Schedulers.io());
}
这种行为背后的解释是什么?我如何在不删除Observable.just(dt)的情况下解决我的失败测试,而在我的真实情况下,Observable.from
默认情况下,flatMap
合并无限数量的源,通过应用不带maxConcurrent参数的特定lambda,基本上可以对上游进行无界限制,现在上游可以全速运行,压倒其他运营商的内部缓冲区。
我有一个场景,我需要定期调用一个应用编程接口来检查结果。我使用来创建一个调用应用编程接口的间隔函数。 然而,我有背压的问题。在我下面的例子中,间隔中的每个记号都会创建一个新的单曲。理想的效果是仅在调用尚未进行时调用API 我可以使用过滤器变量来解决这个问题: 但是它看起来像一个黑客解决方案。我已经厌倦了在函数之后应用,但是它没有效果。 有什么建议吗?
如何应用背压来限制生产比并行运行的更多的项目? 为了说明起见,这里有一个快速twitter用户名生成器、一个慢速twitter查找调用、一个慢速twitter文件编写器和一个打印方法。 最终目标是并行运行twitter查找,同时对生成器施加反压力,使其不会发出超出可处理范围的用户名(预计会有一些预取)。 这很好地在一个单独的线程上生成了5个twitter用户名 不确定它是正确的,但我的理由是,从一
背压有问题。使用发布主题获取发射时的传感器事件,并需要在事务中订阅主题时将数据保存到数据库。 我一直在尝试使用。窗口(100)操作符,这样每当我连续收到100个传感器事件时,我就可以批量插入,但一次只能收到一个项目。订阅 不希望使用缓冲区运算符删除事件。正确的处理方法是什么?
我使用RxJava观察点击几个按钮。 这些订阅将在一个对象上调用不同的函数,这需要几毫秒的时间。这些功能是同步的。 问题是,当按下太多按钮时,会出现背压异常。对我来说,有效的方法是删除几个输入(最好是旧的输入)。RxJava有可能做到这一点吗?
我有以下代码来解析一个JSON文件: 要处理以下JSON文件: 如果我执行此代码,我将收到以下错误: 所以我开始一步一步地调试应用程序,看看part processing()中的哪个代码部分抛出了这个异常。令人惊讶的是,那里的所有代码都正常执行:没有抛出异常,也没有返回结果I except。 更让我惊讶的是,当我稍微改变第一种方法的代码时,它可以在不产生异常的情况下工作。 我不知道println方
有人能解释一下如何在RxJava中通过平面图运算符传递完整信号吗? 如果flatMap操作符被注释,我可以得到从1到10的数字列表,这意味着toList将收到onComplete信号。但当我想在flatMap中进一步处理数据时,它会消耗一个完整的信号,而我无法得到任何结果。如何通过flatMap操作符传递onComplete信号? 我有以下简单的程序: