当前位置: 首页 > 知识库问答 >
问题:

使用Rx限制远程资源的使用

姬实
2023-03-14

我使用RxJava异步处理servlet请求。在每个请求期间,使用flatMap操作符对远程服务API进行大量异步调用。

由于资源限制,我需要限制针对该API的并发请求总数。对于使用其并发参数在单个平面图中调用API的单个Rx流来说,这是微不足道的。但是我的应用程序中有多个独立的流(基本上每个ServletRequest一个),并且每个流都有多个对API的平面图调用。

因此,我认为我必须将所有远程请求汇集到一个执行实际调用的单例流中,然后可以轻松限制并发性。但是,将API响应返回到原始流中似乎并不简单。此外,在这种结构上保持背压似乎很复杂。

另一种选择是使用传统的信号量,但我不确定它的阻塞行为是否适合Rx。

那么,有没有一个既定的模式来实现这样的东西?还是我错过了一个巧妙的操作员组合,完全避免了这些复杂情况?

共有2个答案

贾成天
2023-03-14

签出调度程序。when()。它允许用户从现有调度程序构建修改的调度程序,而不是创建新的线程池。就像使用调度器一样。io()但限制活动线程的数量。或速率限制调度程序上的onNexts数。计算()。

长孙嘉
2023-03-14

在RxJava中,您可以从常规Java执行器创建自己的调度程序:

ExecutorService exec= Executors.newFixedThreadPool(2); //2 Fixed threads
Schedulers.from(exec);

因此,只需为每个资源创建一个具有有限线程数的执行器,并在访问资源时使用该特定调度器。有限的线程数将限制并发调用的数量。

编辑:

显然我误解了这个问题。如果调用是异步的,您可以尝试使用Rx的背压来管理它们。以下是如何使用Rx管理此类呼叫的想法:

您创建了一个“可观察的资源许可”,每当可以调用API时,它都会发出一些东西(某种令牌)。其令牌(许可)创建速率将是该API的最大使用速率。每当某个observable需要调用API时,只需使用许可证observable压缩调用即可。Zip操作符将阻止,直到有可用的许可证,从而将API调用限制在许可证生成的速率上

下面是一个可以观察到的许可证的简单实现,它会发出时间戳:

public class PermitObservable extends Observable<Long> {

    private final long msBetweenEmissions;

    public PermitObservable(long msBetweenEmissions) {
        super(new SyncOnSubscribe<Long, Long>() {
            @Override
            protected Long generateState() {
                return System.currentTimeMillis();
            }

            @Override
            protected Long next(Long state, Observer<? super Long> observer) {
                long nextEmissionAt = state + msBetweenEmissions;
                long timeToWait = nextEmissionAt - System.currentTimeMillis();
                if (timeToWait > 0) {
                    try {
                        Thread.sleep(timeToWait);
                    } catch (InterruptedException e) {
                        observer.onError(e);
                    }
                }
                long now = System.currentTimeMillis();
                observer.onNext(Long.valueOf(now)); // Permit emission
                return now;
            }
        });

        this.msBetweenEmissions = msBetweenEmissions;
    }
}
 类似资料:
  • 我有托盘货物到达仓库。一旦到达,叉车就被用来卸载,并将货物移动到存储单元。叉车池的当前容量是50。现在会发生什么,当托盘到达时,有许多货物要卸载和存储,假设10批货物,然后10辆叉车去托盘,一次卸载10批货物。由于流程图,一个托盘到达,一辆叉车因此被扣押。我想有两辆叉车中的一辆与托盘相关联,直到所有货物都卸载。我试图使用限制区域开始和结束之间的叉车扣押块和运输移动到块,但这限制了分配给所有托盘的叉

  • 我正在尝试使用maven-telte-resource-plugin在多模块maven项目中跨模块共享许多资源。不幸的是,共享的二进制资源在捆绑期间被损坏,大概是通过过滤。 我相信,由于从本地存储库中提取共享资源jar时包含损坏的二进制文件,因此在这一阶段会发生损坏。 是否有任何关闭过滤maven-远程资源插件? 目前,我的共享资源模块中的pom看起来像

  • 使用启用了 schedule 参数的资源,你可以控制当前资源何时被应用。 例如,你希望如下的 exec 资源每天应用一次,将资源参数 schedule 设置成了内置的值 daily: exec { "/usr/bin/apt-get update": schedule => daily, } 遗憾的是,给 schedule 参数指定 daily 并不能保证该资源每天都能应用一次。 内置的 d

  • 【资源使用】页面主要展示项目运行过程中重点资源的使用情况,主要包括以下几个部分: 数据汇总 该项主要展示资源的具体使用情况,其展示数据根据资源种类的不同而不同,具体如下: 纹理 该项主要展示项目运行过程中纹理资源的 “内存峰值”、“大于1MB数量”、“RGBA32格式数量” 和 “RGB24格式数量”。其中,“内存峰值” 表示纹理资源在使用过程中的内存最大量;“大于1MB数量” 表示所检测到的纹理

  • 问题内容: 这是我的代码,用于限制分钟的请求数: 问题是没有错误,但是即使从资源请求了10个以上的请求,它也不会抛出“ Too Many Request” 问题答案: 我使用GAE项目中的配置代码以及开发服务器来使其工作。 我使用Restlet的2.3.1版本/ GAE的1.9.18版本以及以下代码作为客户端: 在第10次通话后,我遇到以下异常: 希望对您有帮助,蒂埃里

  • 本文向大家介绍vue-rx的初步使用教程,包括了vue-rx的初步使用教程的使用技巧和注意事项,需要的朋友参考一下 一、各文档介绍 1、rxjs官网 2、vue-rxjs地址 二、环境搭建 1、使用vue-cli构建一个项目 2、安装vue-rx的依赖包 3、在src/main.js中配置使用rxjs 三、没有使用vue-rx的时候 1、关于属性的使用 2、关于事件的使用 四、结合vue-rx的使