当前位置: 首页 > 知识库问答 >
问题:

我们可以在调度程序上运行的可观测数据的数量有限制吗?

经慈
2023-03-14

我正在尝试查看是否可以在IO()Computation()调度程序上生成100万个可观察的

 public static void observableLimit() {
        sum = 0;
        long lowerBound = 0;
        long higherBound = 1000;
        Flowable.fromCallable(() -> {
            Flowable.rangeLong(lowerBound, higherBound + 1)
                    .subscribe(integer -> Observable.just(integer)
                            .subscribeOn(Schedulers.io())
                            .subscribe(j -> {
                                printNum(j);
                                sum = sum + j;
                            }));
            return true;
        }).blockingSubscribe(aBoolean -> {
            long actualSum = (higherBound * (higherBound + 1)) / 2;
            System.out.println("");
            System.out.println("SUM: " + sum);
            Assert.assertEquals(actualSum, sum);
        });
    }

对于higherbound=100来说,它大部分时间都可以工作,对于1000来说,它有时可以工作,但大部分时间都失败,对于10000来说,它几乎每次都失败,如果我告诉它在newthread()上运行它,并且根本不使用subscribeon()的话,它就可以工作。

我怎样才能纠正这种行为呢?

共有1个答案

慎俊艾
2023-03-14

您面临的问题不是关于可观察性的某些限制,而是您的代码的问题。您的blockingsubscribe是对一个与跨所有其他线程的可流无关的可流的blockingsubscribe。对于HigherBound的小值,您将看到代码可以工作,而对于大值则不行,这是因为对于小的HigherBound来说,外部可流动可能与内部可流动一样快,但是对于HigherBound的高值来说,折叠得更快。

我想说的是,为了看到正确的结果,您需要与跨所有其他线程而不是外部线程的可流同步。我还将long sum替换为线程安全实现longadder sum,您可以使用flatmap运算符来实现。

Flowable.rangeLong(lowerBound, higherBound + 1)
         .flatMap(t -> Flowable.just(t)
                 .subscribeOn(Schedulers.io())
         )
         .doOnNext(sum::add)
         .doOnComplete(() -> {
             long actualSum = (higherBound * (higherBound + 1)) / 2;
             log("SUM: " + sum.longValue() + ", ACTUAL: " + actualSum);
             log("Equals: " + (actualSum == sum.longValue()));
         })
         .blockingSubscribe();
 类似资料:
  • 我想显示产品浏览历史,所以我将产品id存储在浏览器cookie中。 因为历史记录列表仅限于5项,所以我将cookie值转换为一个数组,然后检查它的长度并删除多余的内容。 下面的代码是我尝试过的,但它不起作用;数组项没有被删除。 我想问一下,如何限制数组长度,使其只能存储5个项目? 或 如何在数组索引4之后剪切项目?

  • 问题内容: 通常,建议将RSA用于加密对称密钥,然后再将其用于加密“有效负载”。 可以使用RSA加密的数据量的实际(或理论)限制是多少(我使用的是2048位RSA密钥大小)。 特别是,我想知道使用(不同的)RSA公钥加密RSA公钥(256字节)是否安全?我在Java中使用Bouncy Castle加密库。 问题答案: 对于 n 位RSA密钥,直接加密(使用PKCS#1 “旧式”填充)适用于最大 下

  • 目前还不清楚我们是否可以在Android模拟器上测试谷歌地图v2。 这一切都是关于Play服务的,他们的“Google Android Team”宣布了以下内容: 在Android开发者博客上他们说 为了简化您的测试,我们还发布了更新的谷歌API模拟器图像,其中包括谷歌Play服务3.2。您可以通过Android SDK 管理器下载镜像。 在Android开发者网站上,他们说 要在使用Google

  • 我有一个cucumber test runner类,我在其中编写了如下运行的测试套件 @cucumberoptions(features={“feature_files/featues”},glue={“com.automation.stepdef”},monochrome=true,dryrun=false,plugin={“html:target/cucumber-html-report”},

  • 我用reactiveX Zip做了一些实验,我注意到我在zip中定义的可观察性是一个接一个地执行的。我认为zip的好处是,zip中定义的每一个可观察到的线程都是由一个线程执行的,所以所有这些线程都是并行执行的。有什么方法能达到我想要的吗?。这是我的zip例子

  • 关于Spring Boot的小问题,一些有用的默认度量,以及如何在Grafana中正确使用它们。 我看到许多这样的模式。 举几个例子: 不幸的是,我不确定如何使用它们,如何正确地使用它们,并且感觉我的无知使我错过了一些伟大的应用洞察力。