当前位置: 首页 > 知识库问答 >
问题:

spring WebClient-如何基于响应头重试延迟

巫坚白
2023-03-14

我一直在学习spring Webflux和reactive programming,并陷入了一个问题,我试图解决的重试逻辑使用spring WebClient。我已经创建了一个客户机,并成功地调用了一个外部Web服务GETendpoint,该endpoint返回一些JSON数据。

当外部服务以503-service unavailable状态响应时,响应包括retry-after标头,该标头具有一个值,指示在重试请求之前应等待多长时间。我想在spring WebFlux/Reactor中找到一种方法,告诉webClient在X周期之后重试它的请求,其中X是now和我从响应头解析出的DateTime之间的差值。

public <T> Mono<T> get(final String url, Class<T> clazz) {
        return webClient
                .get().uri(url)
                .retrieve()
                .bodyToMono(clazz);
    }

我使用一个构建器来创建上述方法中使用的webclient变量,并将其作为实例变量存储在类中。

webClientBuilder = WebClient.builder();
webClientBuilder.codecs(clientCodecConfigurer -> {
    clientCodecConfigurer.defaultCodecs();
    clientCodecConfigurer.customCodecs().register(new Jackson2JsonDecoder());
    clientCodecConfigurer.customCodecs().register(new Jackson2JsonEncoder());
});
webClient = webClientBuilder.build();

我试图理解RetryWhen方法并将其与Retry类一起使用,但无法确定是否可以访问或传递那里的响应头值。

public <T> Mono<T> get(final String url, Class<T> clazz) {
        return webClient
                .get().uri(url)
                .retrieve()
                .bodyToMono(clazz);
                .retryWhen(new Retry() {
                    @Override
                    public Publisher<?> generateCompanion(final Flux<RetrySignal> retrySignals) {
                        // Can I use retrySignals or retryContext to find the response header somehow?
                        // If I can find the response header, how to return a "yes-retry" response?
                    }
                })
    }

我还尝试在WebClient.Builder中执行一些额外的逻辑并使用筛选器,但这只能使我停止一个新请求(对#get)直到先前建立的Retry-After值过去。

webClientBuilder = WebClient.builder();
webClientBuilder.codecs(clientCodecConfigurer -> {
    clientCodecConfigurer.defaultCodecs();
    clientCodecConfigurer.customCodecs().register(new Jackson2JsonDecoder());
    clientCodecConfigurer.customCodecs().register(new Jackson2JsonEncoder());
});
webClientBuilder.filter(ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
            final Clock clock = Clock.systemUTC();
            final int id = (int) clientRequest.attribute("id"); // id is saved as an attribute for the request, pull it out here
            final long retryAfterEpochMillis = // get epoch millisecond from DB for id
            if(epoch is in the past) {
                return Mono.just(clientRequest);
            } else { // have to wait until epoch passes to send request
                return Mono.just(clientRequest).delayElement(Duration.between(clock.instant(), Instant.ofEpochMilli(retryAfterEpochMillis)));
            }
        })
);
webClient = webClientBuilder.build();
.onStatus(HttpStatus::isError, response -> {
                    final List<String> retryAfterHeaders = response.headers().header("Retry-After");
                    if(retryAfterHeaders.size() > 0) {
                        final long retryAfterEpochMillis = // parse millisecond epoch time from header
                        // Save millisecond time to DB associated to specific id
                    }
                    return response.bodyToMono(String.class).flatMap(body ->
                            Mono.error(new RuntimeException(
                                            String.format("Request url {%s} failed with status {%s} and reason {%s}",
                                                    url,
                                                    response.rawStatusCode(),
                                                    body))));
                })

任何帮助都是感激的,如果我能提供更多的上下文数据来帮助,我会的。

共有1个答案

弓智明
2023-03-14
public class WebClientStatefulRetry3 {
    public static void main(String[] args) {
        WebClient webClient = WebClient.create();

        call(webClient)
                .retryWhen(Retry.indefinitely()
                        .filter(ex -> ex instanceof WebClientResponseException.ServiceUnavailable)
                        .doBeforeRetryAsync(signal -> Mono.delay(calculateDelay(signal.failure())).then()))
                .block();
    }

    private static Mono<String> call(WebClient webClient) {
        return webClient.get()
                .uri("http://mockbin.org/bin/b2a26614-0219-4018-9446-c03bc1868ebf")
                .retrieve()
                .bodyToMono(String.class);
    }

    private static Duration calculateDelay(Throwable failure) {
        String headerValue = ((WebClientResponseException.ServiceUnavailable) failure).getHeaders().get("Retry-After").get(0);

        return // calculate delay here from header and current time;
    }
}
public class WebClientRetryWithExpand {
    public static void main(String[] args) {
        WebClient webClient = WebClient.create();

        call(webClient)
                .expand(prevResponse -> {
                    List<String> header = prevResponse.headers.header("Retry-After");
                    if (header.isEmpty()) {
                        return Mono.empty();
                    }
                    long delayInMillis = // calculate delay from header and current time

                    return Mono.delay(Duration.ofMillis(delayInMillis))
                            .then(call(webClient));
                })
                .last()
                .block();
    }

    private static Mono<ResponseWithHeaders> call(WebClient webClient) {
        return webClient.get()
                .uri("https://example.com")
                .exchangeToMono(response -> response.bodyToMono(String.class)
                        .map(rawResponse -> new ResponseWithHeaders(rawResponse, response.headers())));
    }

    @Data
    static class ResponseWithHeaders {
        private final String rawResponse;
        private final ClientResponse.Headers headers;
    }
}
 类似资料:
  • 我想在java中对API的HTTP响应实现重试框架。 如果回答是: 400:将json中的参数设为null,然后重试 202:返回成功 429:请等待2分钟,然后重试 5XX:等待5分钟,然后重试 如果重试次数超过,则抛出异常。是否有任何可用的库支持重试响应类型并允许编辑请求对象?如果没有,我怎么能设计一个?有没有围绕它的教程?

  • 我正在尝试使用Web客户端创建REST调用 我只想记录通话结果。成功时 - 使用响应正文记录成功消息,在 5XX 或超时或其他时 - 记录错误消息。日志应该在后台创建(而不是由创建调用的线程创建)但是每次都会执行,工作正常,但也有 记录在日志文件中。 我也在一些教程中看到过方法,但在我的设置中有这样的方法。 如何记录成功和失败消息?

  • 我们看到一些行为,我们没有在OkHttp中缓存响应,最终每次都访问服务器。但是,响应在将来会有一个过期时间,所以理想情况下它将被缓存。 从我对CacheStrategy的观察来看,问题是它将日期+年龄加在一起,以查看它是否超过了过期时间。在本例中,,因此它最终不会被添加到缓存中。 因此,我认为理想情况下,要么响应日期等于last-modified(在本例中,时间为2021年1月16日00:40:3

  • 我已经定义了: 在filter方法中,我记录了响应,但我还想添加执行请求的。我知道我可以在请求之前访问这些数据(使用),但有没有办法将这些数据添加到我的过滤器?

  • 我有一个web服务,它接收对象,通过AMQP发送通知,并向请求者返回JSON响应。每个请求都是在一个线程上执行的,我正在尝试实现publisher confirms,我正在努力解决如何设置它。我有它的工作,但我不喜欢我这样做。 我这样做的方式是: 在邮件上添加一些标题 拥有一个包含2个订阅者的发布-订阅频道 订户1)创建一个阻塞队列,使其准备就绪,并通过amqp发送消息 订户2)开始在该队列上拉动

  • 我非常确定“Expires”是有效的HTTP响应头类型。但是当我尝试在代码中设置它时:(这是在ActionFilter.OnActionExecuted方法中) 我最后有一个例外: InvalidOperationException:错误使用的标头名称。确保请求头与HttpRequestMessage一起使用,响应头与HttpResponseMessage一起使用,内容头与HttpContent对