当前位置: 首页 > 知识库问答 >
问题:

使用Reactor限制请求的速率

戚浩淼
2023-03-14

我正在使用ProjectReactor使用rest从web服务加载数据。这是与多个线程并行完成的。我开始达到web服务的速率限制,因此我希望每秒最多发送10个请求,以避免出现这些错误。用Reactor我该怎么做?

使用zipWith(Mono.delayMillis(100))?还是有更好的办法?

非常感谢。

共有2个答案

范彭亮
2023-03-14

下面的代码将以每秒10次请求的速度在https://www.google.com/上执行GET。您必须进行额外的更改,以支持服务器无法在1秒内处理所有10个请求的情况;当服务器仍在处理前一秒询问的请求时,您可以跳过发送请求

@Test
void parallelHttpRequests() {
    // this is just for limiting the test running period otherwise you don't need it
    int COUNT = 2;

    // use whatever (blocking) http client you desire;
    // when using e.g. WebClient (Spring, non blocking client)
    // the example will slightly change for no longer use
    // subscribeOn(Schedulers.elastic())
    RestTemplate client = new RestTemplate();
    
    var exit = new AtomicBoolean(false);
    var lock = new ReentrantLock();
    var condition = lock.newCondition();

    MessageFormat message = new MessageFormat("#batch: {0}, #req: {1}, resultLength: {2}");
    Flux.interval(Duration.ofSeconds(1L))
            .take(COUNT) // this is just for limiting the test running period otherwise you don't need it
            .doOnNext(batch -> debug("#batch", batch)) // just for debugging
            .flatMap(batch -> Flux.range(1, 10) // 10 requests per 1 second
                            .flatMap(i -> Mono.fromSupplier(() ->
                                    client.getForEntity("https://www.google.com/", String.class).getBody()) // your request goes here (1 of 10)
                                    .map(s -> message.format(new Object[]{batch, i, s.length()})) // here the request's result will be the output of message.format(...)
                                    .doOnSubscribe(s -> debug("doOnSubscribe: #batch = " + batch + ", i = " + i)) // just for debugging
                                    .subscribeOn(Schedulers.elastic()) // one I/O thread per request
                            )
            )
            .subscribe(
                    s -> debug("received", s) // do something with the above request's result
                    e -> {
                        debug("error", e.getMessage());
                        signalAll(exit, condition, lock);
                    },
                    () -> {
                        debug("done");
                        signalAll(exit, condition, lock);
                    }
            );

    await(exit, condition, lock);
}

// most probably you won't need the "await" and "signalAll" methods below but
// I created them anyway just to be easier for one to run this in a test class

private void await(AtomicBoolean exit, Condition condition, Lock lock) {
    lock.lock();
    while (!exit.get()) {
        try {
            condition.await();
        } catch (InterruptedException e) {
            // maybe spurious wakeup
            e.printStackTrace();
        }
    }
    lock.unlock();
    debug("exit");
}

private void signalAll(AtomicBoolean exit, Condition condition, Lock lock) {
    exit.set(true);
    try {
        lock.lock();
        condition.signalAll();
    } finally {
        lock.unlock();
    }
}
刘绍晖
2023-03-14

您可以使用delayElements代替整个zipwith

 类似资料:
  • 问题内容: 我正在用GRequests和lxml在Python 2.7.3中编写一个小脚本,这将允许我从各个网站收集一些可收集的卡价格并进行比较。问题是网站之一限制了请求的数量,如果我超过了它,则会发回HTTP错误429。 有没有一种方法可以限制GRequestes中的请求数量,以使我不超过我指定的每秒请求数量?另外-如果发生HTTP 429,如何让GRequestes在一段时间后重试? 附带说明

  • 我正在用Python 2.7.3编写一个小脚本,其中包含GRequests和lxml,它将允许我从各种网站收集一些可收集的卡价格并进行比较。问题是其中一个网站限制了请求的数量,如果我超过它,就会发回HTTP错误429。 有没有办法在grequests中增加限制请求数,这样我就不会超过我指定的每秒请求数?还有——如果HTTP 429出现,我如何让GRequestes在一段时间后重试? 另一方面,他们

  • 问题内容: API通常具有用户必须遵循的速率限制。举个例子,让我们50个请求/秒。连续的请求采取0.5-1秒,因此是来接近极限速度太慢。但是,使用aiohttp的并行请求超出了速率限制。 轮询API尽可能快地允许,需要限速并行调用。 例如,我发现到目前为止装饰,大约像这样: 这非常适用于连续通话。试图并行调用来实现这个按预期不起作用。 下面是一些代码示例: 这里的问题是,它会率限制 排队 的任务。

  • 我正在写一个网络爬虫,运行并行抓取许多不同的域。我想限制每秒向每个域发出的请求数量,但我不关心打开的连接总数,也不关心跨所有域发出的每秒请求总数。我想最大限度地增加打开连接和每秒请求的总数,同时限制对单个域的每秒请求数。 我可以找到的所有现有示例(1)限制打开连接的数量,或(2)限制在fetch循环中每秒发出的请求总数。例子包括: aiohttp:速率限制并行请求 它们都不做我要求的事情,即限制每

  • 同步-向API发出一批请求,并定期将响应保存到我的数据库。 客户端-从我的客户端的用户向API请求的传递。 服务的文档指定了在给定时间段内可以发出的最大请求数的以下规则: 在一天中: null null 超过这些限制不会导致立即锁定-不会抛出任何异常。但供应商可能会感到恼火,联系我们,然后禁止我们使用他的服务。因此,我需要有一些请求延迟机制,在适当的地方,以防止这一点。我是这样看的: 最安全和最简

  • 以下是文档中此任务的Istio利率限制部分:https://istio.io/docs/tasks/policy-enforcement/rate-limiting/ 我已经正确设置了bookinfo应用程序,我有一个productpage的虚拟服务(以及bookinfo的所有其他组件),我正在运行他们的代码,但速率限制不适合我。 每次我点击productpage的url时,它都能正常工作,没有任