当前位置: 首页 > 知识库问答 >
问题:

RxJava平面图和背压奇怪行为

吴高远
2023-03-14

在用RxJava编写数据同步作业时,我发现了一个无法解释的奇怪行为。我是RxJava的新手,非常感谢您的帮助。

简单地说,我的工作很简单,我有一个元素ID列表,我调用一个Web服务来按ID获取每个元素,进行一些处理,并执行多个调用来将数据推送到数据库。数据加载比数据存储快,所以我遇到了OutOfMemory错误。

我的代码看起来很像“失败”测试,但在进行一些测试后,我意识到删除这行代码:

flatMap(dt -> Observable.just(dt))

让它发挥作用。失败的测试输出清楚地表明,未使用的项会堆积起来,这会导致OutOfMemory。工作测试输出表明,生产者将始终等待消费者,所以这永远不会导致OutOfMemory。

public static class DataStore {
    public Integer myVal;
    public byte[] myBigData;

    public DataStore(Integer myVal) {
        this.myVal = myVal;
        this.myBigData = new byte[1000000];
    }
}

@Test
public void working() {
    int MAX_CONCURRENT_LOAD = 1;
    int MAX_CONCURRENT_STORE = 2;

    AtomicInteger nbUnconsumed = new AtomicInteger(0);

    List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
    Observable.from(ids)
        .flatMap(this::produce, MAX_CONCURRENT_LOAD)
        .doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet()))
        .flatMap(this::consume, MAX_CONCURRENT_STORE)
        .doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet()))
        .toBlocking().forEach(s -> {});

    logger.info("Finished");
}

@Test
public void failing() {
    int MAX_CONCURRENT_LOAD = 1;
    int MAX_CONCURRENT_STORE = 2;

    AtomicInteger nbUnconsumed = new AtomicInteger(0);

    List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
    Observable.from(ids)
        .flatMap(this::produce, MAX_CONCURRENT_LOAD)
        .doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet()))
        .flatMap(dt -> Observable.just(dt))
        .flatMap(this::consume, MAX_CONCURRENT_STORE)
        .doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet()))
        .toBlocking().forEach(s -> {});

    logger.info("Finished");
}

private Observable<DataStore> produce(final int value) {
    return Observable.<DataStore>create(s -> {
        try {
            if (!s.isUnsubscribed()) {
                Thread.sleep(200); //Here I synchronous call WS to retrieve data
                s.onNext(new DataStore(value));
                s.onCompleted();
            }
        } catch (Exception e) {
            s.onError(e);
        }
    }).subscribeOn(Schedulers.io());
}

private Observable<Boolean> consume(DataStore value) {
    return Observable.<Boolean>create(s -> {
        try {
            if (!s.isUnsubscribed()) {
                Thread.sleep(1000); //Here I synchronous call DB to store data
                s.onNext(true);
                s.onCompleted();
            }
        } catch (Exception e) {
            s.onNext(false);
            s.onCompleted();
        }
    }).subscribeOn(Schedulers.io());
}

这种行为背后的解释是什么?我如何在不删除Observable.just(dt)的情况下解决我的失败测试,而在我的真实情况下,Observable.from

共有1个答案

桂浩言
2023-03-14

默认情况下,flatMap合并无限数量的源,通过应用不带maxConcurrent参数的特定lambda,基本上可以对上游进行无界限制,现在上游可以全速运行,压倒其他运营商的内部缓冲区。

 类似资料:
  • 我有一个场景,我需要定期调用一个应用编程接口来检查结果。我使用来创建一个调用应用编程接口的间隔函数。 然而,我有背压的问题。在我下面的例子中,间隔中的每个记号都会创建一个新的单曲。理想的效果是仅在调用尚未进行时调用API 我可以使用过滤器变量来解决这个问题: 但是它看起来像一个黑客解决方案。我已经厌倦了在函数之后应用,但是它没有效果。 有什么建议吗?

  • 如何应用背压来限制生产比并行运行的更多的项目? 为了说明起见,这里有一个快速twitter用户名生成器、一个慢速twitter查找调用、一个慢速twitter文件编写器和一个打印方法。 最终目标是并行运行twitter查找,同时对生成器施加反压力,使其不会发出超出可处理范围的用户名(预计会有一些预取)。 这很好地在一个单独的线程上生成了5个twitter用户名 不确定它是正确的,但我的理由是,从一

  • 背压有问题。使用发布主题获取发射时的传感器事件,并需要在事务中订阅主题时将数据保存到数据库。 我一直在尝试使用。窗口(100)操作符,这样每当我连续收到100个传感器事件时,我就可以批量插入,但一次只能收到一个项目。订阅 不希望使用缓冲区运算符删除事件。正确的处理方法是什么?

  • 我使用RxJava观察点击几个按钮。 这些订阅将在一个对象上调用不同的函数,这需要几毫秒的时间。这些功能是同步的。 问题是,当按下太多按钮时,会出现背压异常。对我来说,有效的方法是删除几个输入(最好是旧的输入)。RxJava有可能做到这一点吗?

  • 我有以下代码来解析一个JSON文件: 要处理以下JSON文件: 如果我执行此代码,我将收到以下错误: 所以我开始一步一步地调试应用程序,看看part processing()中的哪个代码部分抛出了这个异常。令人惊讶的是,那里的所有代码都正常执行:没有抛出异常,也没有返回结果I except。 更让我惊讶的是,当我稍微改变第一种方法的代码时,它可以在不产生异常的情况下工作。 我不知道println方

  • 有人能解释一下如何在RxJava中通过平面图运算符传递完整信号吗? 如果flatMap操作符被注释,我可以得到从1到10的数字列表,这意味着toList将收到onComplete信号。但当我想在flatMap中进一步处理数据时,它会消耗一个完整的信号,而我无法得到任何结果。如何通过flatMap操作符传递onComplete信号? 我有以下简单的程序: