我一直在学习spring Webflux和reactive programming,并陷入了一个问题,我试图解决的重试逻辑使用spring WebClient。我已经创建了一个客户机,并成功地调用了一个外部Web服务GETendpoint,该endpoint返回一些JSON数据。
当外部服务以503-service unavailable
状态响应时,响应包括retry-after
标头,该标头具有一个值,指示在重试请求之前应等待多长时间。我想在spring WebFlux/Reactor中找到一种方法,告诉webClient在X周期之后重试它的请求,其中X是now和我从响应头解析出的DateTime之间的差值。
public <T> Mono<T> get(final String url, Class<T> clazz) {
return webClient
.get().uri(url)
.retrieve()
.bodyToMono(clazz);
}
我使用一个构建器来创建上述方法中使用的webclient
变量,并将其作为实例变量存储在类中。
webClientBuilder = WebClient.builder();
webClientBuilder.codecs(clientCodecConfigurer -> {
clientCodecConfigurer.defaultCodecs();
clientCodecConfigurer.customCodecs().register(new Jackson2JsonDecoder());
clientCodecConfigurer.customCodecs().register(new Jackson2JsonEncoder());
});
webClient = webClientBuilder.build();
我试图理解RetryWhen
方法并将其与Retry
类一起使用,但无法确定是否可以访问或传递那里的响应头值。
public <T> Mono<T> get(final String url, Class<T> clazz) {
return webClient
.get().uri(url)
.retrieve()
.bodyToMono(clazz);
.retryWhen(new Retry() {
@Override
public Publisher<?> generateCompanion(final Flux<RetrySignal> retrySignals) {
// Can I use retrySignals or retryContext to find the response header somehow?
// If I can find the response header, how to return a "yes-retry" response?
}
})
}
我还尝试在WebClient.Builder中执行一些额外的逻辑并使用筛选器,但这只能使我停止一个新请求(对#get
)直到先前建立的Retry-After值过去。
webClientBuilder = WebClient.builder();
webClientBuilder.codecs(clientCodecConfigurer -> {
clientCodecConfigurer.defaultCodecs();
clientCodecConfigurer.customCodecs().register(new Jackson2JsonDecoder());
clientCodecConfigurer.customCodecs().register(new Jackson2JsonEncoder());
});
webClientBuilder.filter(ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
final Clock clock = Clock.systemUTC();
final int id = (int) clientRequest.attribute("id"); // id is saved as an attribute for the request, pull it out here
final long retryAfterEpochMillis = // get epoch millisecond from DB for id
if(epoch is in the past) {
return Mono.just(clientRequest);
} else { // have to wait until epoch passes to send request
return Mono.just(clientRequest).delayElement(Duration.between(clock.instant(), Instant.ofEpochMilli(retryAfterEpochMillis)));
}
})
);
webClient = webClientBuilder.build();
.onStatus(HttpStatus::isError, response -> {
final List<String> retryAfterHeaders = response.headers().header("Retry-After");
if(retryAfterHeaders.size() > 0) {
final long retryAfterEpochMillis = // parse millisecond epoch time from header
// Save millisecond time to DB associated to specific id
}
return response.bodyToMono(String.class).flatMap(body ->
Mono.error(new RuntimeException(
String.format("Request url {%s} failed with status {%s} and reason {%s}",
url,
response.rawStatusCode(),
body))));
})
任何帮助都是感激的,如果我能提供更多的上下文数据来帮助,我会的。
public class WebClientStatefulRetry3 {
public static void main(String[] args) {
WebClient webClient = WebClient.create();
call(webClient)
.retryWhen(Retry.indefinitely()
.filter(ex -> ex instanceof WebClientResponseException.ServiceUnavailable)
.doBeforeRetryAsync(signal -> Mono.delay(calculateDelay(signal.failure())).then()))
.block();
}
private static Mono<String> call(WebClient webClient) {
return webClient.get()
.uri("http://mockbin.org/bin/b2a26614-0219-4018-9446-c03bc1868ebf")
.retrieve()
.bodyToMono(String.class);
}
private static Duration calculateDelay(Throwable failure) {
String headerValue = ((WebClientResponseException.ServiceUnavailable) failure).getHeaders().get("Retry-After").get(0);
return // calculate delay here from header and current time;
}
}
public class WebClientRetryWithExpand {
public static void main(String[] args) {
WebClient webClient = WebClient.create();
call(webClient)
.expand(prevResponse -> {
List<String> header = prevResponse.headers.header("Retry-After");
if (header.isEmpty()) {
return Mono.empty();
}
long delayInMillis = // calculate delay from header and current time
return Mono.delay(Duration.ofMillis(delayInMillis))
.then(call(webClient));
})
.last()
.block();
}
private static Mono<ResponseWithHeaders> call(WebClient webClient) {
return webClient.get()
.uri("https://example.com")
.exchangeToMono(response -> response.bodyToMono(String.class)
.map(rawResponse -> new ResponseWithHeaders(rawResponse, response.headers())));
}
@Data
static class ResponseWithHeaders {
private final String rawResponse;
private final ClientResponse.Headers headers;
}
}
我想在java中对API的HTTP响应实现重试框架。 如果回答是: 400:将json中的参数设为null,然后重试 202:返回成功 429:请等待2分钟,然后重试 5XX:等待5分钟,然后重试 如果重试次数超过,则抛出异常。是否有任何可用的库支持重试响应类型并允许编辑请求对象?如果没有,我怎么能设计一个?有没有围绕它的教程?
我正在尝试使用Web客户端创建REST调用 我只想记录通话结果。成功时 - 使用响应正文记录成功消息,在 5XX 或超时或其他时 - 记录错误消息。日志应该在后台创建(而不是由创建调用的线程创建)但是每次都会执行,工作正常,但也有 记录在日志文件中。 我也在一些教程中看到过方法,但在我的设置中有这样的方法。 如何记录成功和失败消息?
我们看到一些行为,我们没有在OkHttp中缓存响应,最终每次都访问服务器。但是,响应在将来会有一个过期时间,所以理想情况下它将被缓存。 从我对CacheStrategy的观察来看,问题是它将日期+年龄加在一起,以查看它是否超过了过期时间。在本例中,,因此它最终不会被添加到缓存中。 因此,我认为理想情况下,要么响应日期等于last-modified(在本例中,时间为2021年1月16日00:40:3
我已经定义了: 在filter方法中,我记录了响应,但我还想添加执行请求的。我知道我可以在请求之前访问这些数据(使用),但有没有办法将这些数据添加到我的过滤器?
我有一个web服务,它接收对象,通过AMQP发送通知,并向请求者返回JSON响应。每个请求都是在一个线程上执行的,我正在尝试实现publisher confirms,我正在努力解决如何设置它。我有它的工作,但我不喜欢我这样做。 我这样做的方式是: 在邮件上添加一些标题 拥有一个包含2个订阅者的发布-订阅频道 订户1)创建一个阻塞队列,使其准备就绪,并通过amqp发送消息 订户2)开始在该队列上拉动
我非常确定“Expires”是有效的HTTP响应头类型。但是当我尝试在代码中设置它时:(这是在ActionFilter.OnActionExecuted方法中) 我最后有一个例外: InvalidOperationException:错误使用的标头名称。确保请求头与HttpRequestMessage一起使用,响应头与HttpResponseMessage一起使用,内容头与HttpContent对