当前位置: 首页 > 知识库问答 >
问题:

使用RxJava对多线程创建的多个可观察对象进行速率限制

纪正德
2023-03-14

我正在开发一个简单的REST应用程序,它利用RxJava向远程服务器发送请求(1)。对于REST API的每个传入请求,都会向(1)发送一个请求(使用RxJava和RxNetty)。一切正常,但现在我有了一个新的用例:

为了不让太多的请求轰炸(1),我需要实施速率限制。解决这个问题的一种方法(我假设)是将在向(1)发送请求时创建的每个可观察的(2)添加到另一个执行实际速率限制的(2)中。(2) 然后将或多或少地像队列一样,以尽可能快的速度处理出站请求(但不超过速率限制)。下面是一些伪代码:

Observable<MyResponse> r1 = createRequestToExternalServer() // In thread 1
Observable<MyResponse> r2 = createRequestToExternalServer() // In thread 2

// Somehow send r1 and r2 to the "rate limiter" observable, (2)

rateLimiterObservable.sample(1 / rate, TimeUnit.MILLISECONDS)

我将如何使用Rx/RxJava来解决这个问题?

共有1个答案

钱稳
2023-03-14

我会使用一个热定时器和一个原子计数器,在给定的持续时间内跟踪剩余的连接:

int rate = 5;
long interval = 1000;

AtomicInteger remaining = new AtomicInteger(rate);

ConnectableObservable<Long> timer = Observable
        .interval(interval, TimeUnit.MILLISECONDS)
        .doOnNext(e -> remaining.set(rate))
        .publish();

timer.connect();

Observable<Integer> networkCall = Observable.just(1).delay(150, TimeUnit.MILLISECONDS);

Observable<Integer> limitedNetworkCall = Observable
        .defer(() -> {
            if (remaining.getAndDecrement() != 0) {
                return networkCall;
            }
            return Observable.error(new RuntimeException("Rate exceeded"));
        });

Observable.interval(100, TimeUnit.MILLISECONDS)
.flatMap(t -> limitedNetworkCall.onErrorReturn(e -> -1))
.take(20)
.toBlocking()
.forEach(System.out::println);
 类似资料:
  • 我正在开发利用RxJava、realm和改进的应用程序。 我需要创建非常具体的数据处理链。我需要在io调度程序上执行改装调用,然后在我的自定义单线程领域调度程序上处理提供的数据,然后将结果推送到主线程调度程序上的ui。我试图通过使用多个组合来实现这一点,包括观察(observeOn)和订阅(subscribeOn),但我无法让中间部分在调度程序(scheduler)上执行。 我的目标是这样的

  • 我是RxJava的新手,正在尝试从link执行多个观测值的并行执行示例:RxJava并行获取观测值 虽然上面链接中提供的示例是并行执行可观察对象,但是当我在foreach方法中添加一个Thread.sleep(TIME_IN_MILLISECONDS)时,系统开始一次执行一个可观察对象。请帮助我理解为什么Thread.sleep停止可观察对象的并行执行。 下面是导致多个观测值同步执行的修改示例:

  • 我在创建一个将返回对象列表的可观察对象时遇到了麻烦。我有一个ID列表,想对我的数据库提出一个请求。在这种情况下,我使用的是Firebase。当得到一个结果时,我希望将这些对象中的每一个编译成一个列表,然后返回该列表。我需要在返回之前等待所有的对象都返回。我在我的视图模型反序列化器类中这样做。这是我的代码。 有几种方法可以从firebase数据库中返回数据,我可以返回Documentsnapshot

  • 我正在尝试实现某种流式解析器。假设我有整数的stream,我将它们组合起来创建新的object,它聚合了stream的一部分。 例如,当integer为负数时,object为“done”。为了保持简单,生成的项目将是一串数字。 下面是一个简单的例子: 看起来每个输入onNext都应该调用output onNext。 有什么想法吗?

  • 我使用的是RxJava和RxAndroid,我想将两个可观察到的东西结合起来,但在这两个东西之间,我需要更新UI,所以在到达订阅服务器之前,我必须在主线程中执行代码。 一个解决方案,而不是flatmapping(这是一个被接受的术语吗?)两个可观察到的,将是在更新UI之后在订阅服务器中调用下一个可观察到的,但我觉得应该有一个更优雅的解决方案,比如: 当然,很可能map不是我这里需要使用的运算符。那

  • 我对RxJS很陌生,所以如果这个问题已经得到回答,我提前道歉。 我有一个Angular 2应用程序,在其中一个组件中有一个普通对象。我将UI绑定到这个对象。我想做的是能够捕获对这个对象的所有更改,无论它们来自代码还是来自用户更改其中一个字段。 我正在查看可观察对象,但似乎只有通过Emit方法推送新对象时,订阅者才能接收通知。例如,在属性绑定到输入字段的情况下,这将如何工作? 有更好的方法吗? 这是