当前位置: 首页 > 知识库问答 >
问题:

如何用WebClient限制请求/秒?

琴修为
2023-03-14
@MessageEndpoint
public class CustomServiceActivator {

    private static final Logger logger = LogManager.getLogger();

    @Autowired
    IHttpService httpService;

    @ServiceActivator(
            inputChannel = "outputFilterChannel",
            outputChannel = "outputHttpServiceChannel",
            poller = @Poller( fixedDelay = "1000" )
    )
    public void processMessage(Data data) {
        httpService.push(data);
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

WebClient服务类:

@Service
public class HttpService implements IHttpService {

    private static final String URL = "http://www.blabla.com/log";

    private static final Logger logger = LogManager.getLogger();

    @Autowired
    WebClient webClient;

    @Override
    public void push(Data data) {
        String body = constructString(data);
        Mono<ResponseEntity<Response>> res = webClient.post()
                .uri(URL + getLogType(data))
                .contentLength(body.length())
                .contentType(MediaType.APPLICATION_JSON)
                .syncBody(body)
                .exchange()
                .flatMap(response -> response.toEntity(Response.class));

        res.subscribe(new Consumer<ResponseEntity<Response>>() { ... });
    }
}

共有1个答案

陆耀
2023-03-14

Resilience4j对项目Reactor的非阻塞速率限制有出色的支持。

必需的依赖关系(Spring WebFlux旁边):

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-reactor</artifactId>
    <version>1.6.1</version>
</dependency>
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-ratelimiter</artifactId>
    <version>1.6.1</version>
</dependency>

示例:

import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterOperator;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.atomic.AtomicInteger;

public class WebClientRateLimit
{
    private static final AtomicInteger COUNTER = new AtomicInteger(0);

    private final WebClient webClient;
    private final RateLimiter rateLimiter;

    public WebClientRateLimit()
    {
        this.webClient = WebClient.create();

        // enables 3 requests every 5 seconds
        this.rateLimiter = RateLimiter.of("my-rate-limiter",
                RateLimiterConfig.custom()
                                 .limitRefreshPeriod(Duration.ofSeconds(5))
                                 .limitForPeriod(3)
                                 .timeoutDuration(Duration.ofMinutes(1)) // max wait time for a request, if reached then error
                                 .build());
    }

    public Mono<?> call()
    {
        return webClient.get()
                        .uri("https://jsonplaceholder.typicode.com/todos/1")
                        .retrieve()
                        .bodyToMono(String.class)
                        .doOnSubscribe(s -> System.out.println(COUNTER.incrementAndGet() + " - " + LocalDateTime.now()
                                + " - call triggered"))
                        .transformDeferred(RateLimiterOperator.of(rateLimiter));
    }

    public static void main(String[] args)
    {
        WebClientRateLimit webClientRateLimit = new WebClientRateLimit();

        long start = System.currentTimeMillis();

        Flux.range(1, 16)
            .flatMap(x -> webClientRateLimit.call())
            .blockLast();

        System.out.println("Elapsed time in seconds: " + (System.currentTimeMillis() - start) / 1000d);
    }
}
1 - 2020-11-30T15:44:01.575003200 - call triggered
2 - 2020-11-30T15:44:01.821134 - call triggered
3 - 2020-11-30T15:44:01.823133100 - call triggered
4 - 2020-11-30T15:44:04.462353900 - call triggered
5 - 2020-11-30T15:44:04.462353900 - call triggered
6 - 2020-11-30T15:44:04.470399200 - call triggered
7 - 2020-11-30T15:44:09.461199100 - call triggered
8 - 2020-11-30T15:44:09.463157 - call triggered
9 - 2020-11-30T15:44:09.463157 - call triggered
11 - 2020-11-30T15:44:14.461447700 - call triggered
10 - 2020-11-30T15:44:14.461447700 - call triggered
12 - 2020-11-30T15:44:14.461447700 - call triggered
13 - 2020-11-30T15:44:19.462098200 - call triggered
14 - 2020-11-30T15:44:19.462098200 - call triggered
15 - 2020-11-30T15:44:19.468059700 - call triggered
16 - 2020-11-30T15:44:24.462615 - call triggered
Elapsed time in seconds: 25.096

 类似资料:
  • Spring留档声明,即使要执行同步超文本传输协议调用,我们也必须从RestTemboard切换到。 目前,我有以下代码: 当然,我可以在这里使用CountdownLatch,但它看起来像是API滥用。 如何执行同步请求?

  • 我正在尝试使用<code>网络客户端 谢谢 -斯里尼

  • 如何将转换为请求正文的表示形式?或者,如何正确地记录整个请求/响应,同时能够在其中散列潜在的凭据?

  • 我有一个要求,即我使用Spring批处理从SQL DB中读取大量行(数千行),并在编写Kafka主题之前调用REST服务来丰富内容。 使用Spring Reactive webClient时,如何限制活动非阻塞服务调用的数量?在我使用Spring Batch读取数据后,我应该在循环中引入通量吗? (我理解delayElements的用法,它有不同的目的,比如当一个获取服务调用带来大量数据,你希望服

  • 问题内容: 我正在用GRequests和lxml在Python 2.7.3中编写一个小脚本,这将允许我从各个网站收集一些可收集的卡价格并进行比较。问题是网站之一限制了请求的数量,如果我超过了它,则会发回HTTP错误429。 有没有一种方法可以限制GRequestes中的请求数量,以使我不超过我指定的每秒请求数量?另外-如果发生HTTP 429,如何让GRequestes在一段时间后重试? 附带说明

  • 我正在用Python 2.7.3编写一个小脚本,其中包含GRequests和lxml,它将允许我从各种网站收集一些可收集的卡价格并进行比较。问题是其中一个网站限制了请求的数量,如果我超过它,就会发回HTTP错误429。 有没有办法在grequests中增加限制请求数,这样我就不会超过我指定的每秒请求数?还有——如果HTTP 429出现,我如何让GRequestes在一段时间后重试? 另一方面,他们