当前位置: 首页 > 知识库问答 >
问题:

Reactor通量平面图操作员吞吐量/并发控制和实现背压

呼延鹏云
2023-03-14

我正在使用Flux构建我的反应式管道。在管道中,我需要调用3个不同的外部系统REST API,它们对访问速率非常严格。如果我超过每秒速率阈值,我将会受到指数限制。每个系统都有自己的阈值。

我正在使用Spring WebClient进行REST API调用;在3个API中,2个是GET,1个是POST。

在我的反应器管道中,WebClient被包装在平面图中以执行API调用,如下代码所示:

WebClient getApiCall1 = WebClient.builder().build().get("api-system-1").retrieve().bodyToMono(String.class) //actual return DTO is different from string
WebClient getApiCall2 = WebClient.builder().build().get("api-system-2").retrieve().bodyToMono(String.class) //actual return DTO is different from string
WebClient getApiCall3 = WebClient.builder().build().get("api-system-3").retrieve().bodyToMono(String.class) //actual return DTO is different from string

    Flux.generator(generator) // Generator pushes the elements from source 1 at a time

    // make call to 1st API Service
    .flatMap(data -> getApiCall1)
    .map(api1Response -> api1ResponseModified)

    // make call to 2nd API Service
    .flatMap(api1ResponseModified -> getApiCall2)
    .map(api2Response -> api2ResponseModified)

// make call to 3rd API Service
.flatMap(api2ResponseModified -> getApiCall3)
.map(api3Response -> api3ResponseModified)

// rest of the pipeline operators

//end
.subscriber();

问题是,如果我没有将并发性值设置为flatMap,那么在服务启动的几秒钟内,管道执行就会突破阈值。如果我将并发性的值设置为1、2、5、10,那么吞吐量就会变得非常低。

问题是,在不对并发性设置任何值的情况下,如何实现应遵守外部系统速率限制的背压?

共有2个答案

堵鸿光
2023-03-14

Resilience4j支持Reactor的速率限制。请参见:

https://resilience4j.readme.io/docs/ratelimiter

https://resilience4j.readme.io/docs/examples-1#section-decorate-mono-or-flux-with-a-ratelimiter

咸正平
2023-03-14

如果您有“每秒速率”的要求,我将显式地设置流量窗口,并将每个窗口限制在所选的时间段内。这将在不受限制的情况下为您提供最大吞吐量。

我将使用类似于以下内容的助手函数:

public static <T> Flux<T> limitIntervalRate(Flux<T> flux, int ratePerInterval, Duration interval) {
    return flux
            .window(ratePerInterval)
            .zipWith(Flux.interval(Duration.ZERO, interval))
            .flatMap(Tuple2::getT1);
}

它允许您执行:

sourceFlux
        .transform(f -> limitIntervalRate(f, 2, Duration.ofSeconds(1))) //Limit to a rate of 2 per second

然后,您可以根据需要将此映射到您的WebClient调用,同时遵守每个API的现有限制:

sourceFlux
        //...assume API 1 has a limit of 10 calls per second
        .transform(f -> limitIntervalRate(f, 10, Duration.ofSeconds(1)))
        .flatMap(data -> getApiCall1)
        .map(api1Response -> api1ResponseModified)

        //...assume API 2 has a limit of 20 calls per second
        .transform(f -> limitIntervalRate(f, 20, Duration.ofSeconds(1))) 
        .flatMap(api1ResponseModified -> getApiCall2)
        .map(api2Response -> api2ResponseModified)

。。。等等

 类似资料:
  • 我有一个类女巫负责向客户端发送数据,所有其他类在需要发送数据时都使用这个。让我们称之为“数据ender.class”。 现在客户端要求我们将吞吐量控制在每秒最多50次调用。 我需要在这个类上创建一个algoritm(如果可能的话),以保持当前秒的调用次数,如果它达到50的最大值,保持进程要么睡眠或某事,并继续而不丢失数据。也许我必须实现一个队列或比简单的睡眠更好的东西。我需要建议或遵循的方向。 为

  • 在我的测试计划中,我有24个吞吐量控制器,它们的执行率不同,最小的是1%。10个不同的吞吐量控制器有1%的执行率。每个吞吐量控制器下面都有许多事务控制器。当我运行一个测试1小时时,在某些最小百分比吞吐量控制器下定义的采样器甚至不会执行一次。我已经确保所有24个吞吐量控制器的总数增加到100%。如何确保在所有吞吐量控制器上定义的所有采样器至少执行一次? 对于吞吐量最少的控制器,我将其更改为“Tota

  • 我需要一些帮助来使用JMeter。我想记录两个不同的场景,比如单击两个不同的按钮。如果我尝试为两个用户运行,一个用户应该点击第一个按钮,另一个用户应该同时点击另一个按钮。 我知道这是一个基本问题。但我对这一点还不熟悉。这就是为什么在这里问。我试过如下方法: 当我使用CSV数据集配置与两个用户一起运行时,两个登录使用相同的用户。 我还需要测试并发性。我不知道如何正确地执行这个。有人能帮我吗?

  • 本文向大家介绍springboot高并发下提高吞吐量的实现,包括了springboot高并发下提高吞吐量的实现的使用技巧和注意事项,需要的朋友参考一下 公司让做一个全文检索的项目,我使用的是elasticsearch。但是对性能有很高的要求,为了解决性能问题,我简直是寝食难安。 es(elasticsearch)没有使用分布式,单台的。 开发完测试的时候,查询慢,吞吐量低。 网友们建议用异步--使

  • 问题内容: 我知道这是一个非常笼统的问题。但是,我想了解使Redis(或诸如MemCached,Cassandra之类的缓存)在惊人的性能极限下工作的主要架构决策是什么。 如何维护连接? 连接是TCP还是HTTP? 我知道它完全用C编写。如何管理内存? 尽管存在竞争的读/写,但用于实现高吞吐量的同步技术有哪些? 基本上,具有内存高速缓存的计算机和可以响应命令的服务器的普通香草实现和Redis框之间

  • 无论从什么角度来看,它都不是。 假设我有两个消费者,它们以每秒“10”条消息的速度从给定主题中消耗数据。现在,不管它们是从单个分区还是从两个不同的分区进行消耗;我的吞吐量将保持不变,每秒20条消息。 我觉得我一定漏了一些内部工作的细节,你能帮我解释一下kafka分区(多个)是如何帮助提高固定用户数量的吞吐量的,而不是单个kafka分区。