当前位置: 首页 > 知识库问答 >
问题:

java - Java Spring WebFlux怎么重试另外的服务?

丌官承
2024-04-01

Spring WebFlux怎么重试另一个url?
我正在做一个LLM gateway的需求,要求用webFlux,逐字吐出数据。同时支持容灾。
调用链路:clientA -> gateway -> serverB
如果gateway -> serverB这一步失败,则采用备选机serverC,即在gateway做重试,gateway -> serverC.
请注意,gateway -> serverB(serverC)这一步是采用webFlux的。
现在的问题是,我如何拿到gateway -> serverB的错误返回码,并且去重试serverC?

gateway -> serverB(serverC)的代码如下(取名为sseHttp):

Flux<Response> responseFlux = WebClient.create(url)                .post().headers(httpHeaders -> setHeaders(httpHeaders, headers))                .contentType(MediaType.APPLICATION_JSON).bodyValue(jsonBody)                .retrieve().onStatus(status -> status != HttpStatus.OK, response -> {                    String message = String.valueOf(response.bodyToMono(String.class));                    aimlGatewayFptiEvent.put(INT_ERROR_CODE, String.valueOf(response.statusCode()));                    aimlGatewayFptiEvent.put(STATUS_CODE, String.valueOf(response.statusCode()));                    aimlGatewayFptiEvent.put(INT_ERROR_DESC, message);                    logger.error("url:{}, response.statusCode:{}, message:{}",                            url, response.statusCode(), message);                    throw new GatewayException("call upstream seldon error, code:" + response.statusCode(),                            INTERNAL_SERVER_ERROR);                })                .bodyToFlux(typeRef)                .onErrorResume(WebClientResponseException.class, err -> {                    aimlGatewayFptiEvent.put(INT_ERROR_CODE, String.valueOf(err.getStatusCode()));                    aimlGatewayFptiEvent.put(INT_ERROR_DESC, err.getMessage());                    throw new GatewayException(err.getMessage(), INTERNAL_SERVER_ERROR);                })                .doOnNext(event -> {                    if(chunksCount.get() == 0) {                        firstChunkReceivedTime.set(System.currentTimeMillis());                    }                    chunksCount.getAndIncrement();                    if(null != event && null != event.data() && (null == bypassPayload || !bypassPayload)) {                        try {                            aimlGatewayFptiEvent.putResp(compressStringAndBase64Encode(event.data()));                        } catch (IOException e) {                            logger.error("Failed to compress and encode predict result.", e);                        }                    }                })                .map(event -> Response.ok().entity(                        buildData2PredictResponse(event.data(), request.getEndpoint())                ).build())                .onErrorReturn(GatewayExceptionHandler.toStreamErrorResponse(                        new GatewayException("Error predict from upstream seldon.", INTERNAL_SERVER_ERROR)))                .doFinally((signalType) -> {                    if (chunksCount.get() > 0) {                        aimlGatewayFptiEvent.putIfAbsent(STATUS_CODE, "200");                    }                    aimlGatewayFptiEvent.putResponseTs(Long.toString(System.currentTimeMillis()));                    aimlGatewayFptiEvent.putApiDuration(Long.toString(System.currentTimeMillis() - beginTs));                    if(!Objects.equals(aimlGatewayFptiEvent.get(STATUS_CODE), "200")) {                        calTransaction.setStatus(EXCEPTION);                        calTransaction.addData(ERR_MSG, aimlGatewayFptiEvent.get(INT_ERROR_DESC));                        CalLogHelper.logException(calTransaction.getType() + EXCEPTION_POSTFIX,                                new GatewayException(aimlGatewayFptiEvent.get(INT_ERROR_DESC), INTERNAL_SERVER_ERROR));                    }                    calTransaction.completed();                    // write to fpti                    trackingUtil.send2Fpti(aimlGatewayFptiEvent, httpParams);                })

尝试1:
尝试抓取gateway -> serverB抛出的异常,然后重试。

try {   sseHttp(serverB.url); // call serverB} catch(Exception e) {   sseHttp(serverC.url); // call serverC}

但问题是,call serverB即使失败了,也会先返回给clientA,而不是抛出异常。当serverB不可用,但是serverC可用时候,在clientA仍然是出错的。

尝试2:
使用subscribe获取gateway -> serverB的返回码。

Flux<Response> responseFlux = sseHttp(serverB.url);responseFlux.subscribe(response -> {            //设置map标记            if (response.getStatus() != 200) {                aimlGatewayFptiEvent.put(STATUS_CODE, String.valueOf(response.getStatus()));            }        });//检查标记if(null != aimlGatewayFptiEvent.get(STATUS_CODE) && !"200".equals(aimlGatewayFptiEvent.get(STATUS_CODE))) {            responseFlux = sseHttp(serverC.url);        }

很遗憾,由于subscribe是非阻塞的,因此在aimlGatewayFptiEvent被设置前,代码已经走到下面,并且返回了失败。
我如何能在clientA中得到成功的返回,当serverB不可用但是serverC可用?

共有1个答案

温翔宇
2024-04-01

在WebFlux中,你可以使用retryWhen操作符来重试失败的请求。retryWhen接受一个Publisher,当原始Flux发出错误信号时,它将这个错误信号发送给这个Publisher。你可以在这个Publisher中定义你的重试逻辑。

在你的情况下,你可以在retryWhen中检查错误的HTTP状态码,如果状态码表示请求失败(例如500错误),那么你可以重试另一个URL。如果新的URL也失败,你可以继续重试,直到达到最大重试次数。

以下是一个可能的实现:

import reactor.core.publisher.Flux;import reactor.core.publisher.Mono;import org.springframework.web.reactive.function.client.WebClient;import org.springframework.web.reactive.function.client.WebClientResponseException;// ...Flux<Response> responseFlux = Flux.just(serverB.url, serverC.url) // 创建一个包含两个URL的Flux        .concatMap(url -> WebClient.create(url)                .post()                .headers(httpHeaders -> setHeaders(httpHeaders, headers))                .contentType(MediaType.APPLICATION_JSON)                .bodyValue(jsonBody)                .retrieve()                .onStatus(HttpStatus::isError, clientResponse -> {                    // 在这里处理错误状态码                    String message = String.valueOf(clientResponse.bodyToMono(String.class));                    // 记录错误日志                    logger.error("url:{}, response.statusCode:{}, message:{}",                            url, clientResponse.statusCode(), message);                    throw new GatewayException("call upstream error, code:" + clientResponse.statusCode(),                            INTERNAL_SERVER_ERROR);                })                .bodyToFlux(typeRef)                .onErrorResume(throwable -> {                    // 在这里处理错误并返回一个新的Flux,或者抛出一个异常                    if (throwable instanceof WebClientResponseException) {                        WebClientResponseException exception = (WebClientResponseException) throwable;                        if (exception.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {                            // 如果是500错误,重试下一个URL                            return Flux.error(new RuntimeException("Retry with another URL"));                        }                    }                    // 如果不是500错误,直接抛出异常                    return Flux.error(throwable);                })        )        .retryWhen(retries -> retries.delayElements(Duration.ofMillis(1000)) // 每次重试间隔1秒                .take(2) // 最大重试次数为2(即重试serverB和serverC各一次)        );// 订阅并处理响应responseFlux.subscribe(response -> {    // 处理响应}, throwable -> {    // 处理所有重试失败后的错误    logger.error("Failed to call both servers", throwable);    throw new GatewayException("Failed to call both servers", INTERNAL_SERVER_ERROR);});

在这个例子中,我们创建了一个包含两个URL的Flux,然后使用concatMap对每个URL发起请求。如果请求失败并抛出WebClientResponseException,我们检查错误状态码是否为500。如果是500错误,我们返回一个包含错误消息的新Flux,这将触发retryWhen操作符进行重试。如果不是500错误,我们直接抛出异常。

retryWhen中,我们使用delayElements来设置每次重试之间的延迟,并使用take来限制最大重试次数。最后,我们订阅responseFlux并处理响应或错误。

这种方式可以确保在serverB不可用但serverC可用时,客户端A能够收到成功的返回。

 类似资料:
  • 本文向大家介绍外边距重叠是什么?重叠的结果是什么?怎么防止外边距重叠?相关面试题,主要包含被问及外边距重叠是什么?重叠的结果是什么?怎么防止外边距重叠?时的应答技巧和注意事项,需要的朋友参考一下 外边距重叠是什么? 外边距重叠指的是,当两个垂直外边距相遇时,它们将形成一个外边距。 重叠后的外边距的高度等于两个发生重叠的外边距的高度中的较大者。 发生的条件:属于同一个BFC的两个相邻元素上下marg

  • 服务订阅,一般按年订阅。 订阅表字段:订阅状态、订阅开始日期、订阅结束日期; 订阅到期后需要将状态更新为“未订阅”,怎么检查订阅是否到期并更新状态? 定时任务:每天0时,那当前日期和到期日期去比较 监听机制 有什么更好的处理方式?哪种方式最优

  • 网络通讯天然是不稳定的,因此在分布式系统中,需要有良好的容错设计。无差别重试是非常危险的。当通讯出现问题时,每个请求都重试一次,相当于系统 IO 负载增加了 100%,容易诱发雪崩事故。重试还要考虑错误的原因,如果是无法通过重试解决的问题,那么重试只是浪费资源而已。除此之外,如果重试的接口不具备幂等性,还可能造成数据不一致等问题。 本组件提供了丰富的重试机制,可以满足多种场景的重试需求。 安装 c

  • Outlook Calendar有开发文档吗 我想通过java代码在outlook创建一个重复的日历,创建一个每天、每周重复的日历,要怎么实现 我尝试过这种,但是只能创建连续的日历,没有办法创建周期重复的日历

  • 创建两个表: 课程(Course_id(主键),Course_name) Student(Roll_no(主键),Name,Course_id(外键))并检索“BSC”课程录取的所有学生的姓名。 设BSC的course_id为105。 其查询将是:从Student中选择Name(其中Course_id=105) 在不知道Course_id(仅使用Course_name)的情况下,我可以查询学生的姓

  • 本文向大家介绍你对微服务是怎么理解的?相关面试题,主要包含被问及你对微服务是怎么理解的?时的应答技巧和注意事项,需要的朋友参考一下 微服务,又名微服务架构,是一种架构风格,它将应用构建为一个小型自治服务的集合,以业务领域为模型。 通俗地说,就像蜜蜂通过对蜡制的等边六角形单元来构建它们的蜂巢。 他们最初从使用各种材料的小单元开始,一点点的搭建出一个大型蜂巢。 这些小单元组成坚固的结构,将蜂窝的特定部