当前位置: 首页 > 知识库问答 >
问题:

使用Spring WebFlux完成WebClient异步示例

卢子民
2023-03-14

我不熟悉反应式编程范式,但最近我决定在Spring WebClient上建立一个简单的Http客户端,因为旧的同步RestTemplate已经在维护中,可能会在即将发布的版本中被弃用。

因此,首先我查看了Spring文档,然后在web上搜索示例。

我必须说,(只是暂时)我已经有意识地决定不通过Reactor库留档,所以除了发布者-订阅者模式之外,我对Mono和Flux的了解很少。相反,我专注于让一些东西工作。

我的场景是一个简单的POST,用于向客户端只对响应状态代码感兴趣的服务器发送回调。没有返回正文。所以我终于想出了这个有效的代码片段:

private void notifyJobSuccess(final InternalJobData jobData) {
        
        SuccessResult result = new SuccessResult();
        result.setJobId(jobData.getJobId());
        result.setStatus(Status.SUCCESS);
        result.setInstanceId(jobData.getInstanceId());
        
        log.info("Result to send back:" + System.lineSeparator() + "{}", result.toString());
        
        this.webClient.post()
            .uri(jobData.getCallbackUrl())
            .body(Mono.just(result), ReplaySuccessResult.class)
            .retrieve()
            .onStatus(s -> s.equals(HttpStatus.OK), resp -> {   
                log.info("Expected CCDM response received with HttpStatus = {}", HttpStatus.OK);
                return Mono.empty();
            })
            .onStatus(HttpStatus::is4xxClientError, resp -> {   
                log.error("CCDM response received with unexpected Client Error HttpStatus {}. "
                        + "The POST request sent by EDA2 stub did not match CCDM OpenApi spec", resp.statusCode());
                return Mono.empty();
            })
            .onStatus(HttpStatus::is5xxServerError, resp -> {   
                log.error("CCDM response received with unexpected Server Error HttpStatus {}", resp.statusCode());
                return Mono.empty();
            }).bodyToMono(Void.class).subscribe(Eda2StubHttpClient::handleResponseFromCcdm);
        
    }

我对反应式WebClient的工作原理了解不足,从调用订阅开始。在编写客户端代码之前,我检查的数十个示例中没有一个包含这样的调用,但事实是,在我包含该调用之前,服务器一直在等待请求。

然后,我偶然发现了一句格言:“在你订阅之前什么都不会发生”。我知道模式Plublisher订阅服务器,但我(错误地)认为订阅是由WebClient API在任何exchage或BodyTomino方法中处理的。。。block()肯定必须订阅,因为当您阻止它时,请求会立即发出。

所以我的第一个问题是:真的需要调用subscribe()吗?

第二个问题是为什么StubHttpClient::HandlerResponse方法从未被回调。对于这一点,我发现的唯一解释是,由于Mono返回的是一个

最后,要求一个完整的例子来说明一个方法在单声道中接收一个身体,然后再使用它,这是否太过分了?我发现的所有示例都只关注于发出请求,但我现在无法理解Mono或Flux后来是如何使用的。。。我知道我必须尽快检查Reactor文件,但我希望能得到一些帮助,因为我遇到了异常和错误处理方面的问题。

谢谢

共有1个答案

朱渝
2023-03-14

自从我在这里求助以来,已经有一段时间了。现在,我不想编辑,而是想为我的前一个问题添加一个答案,以便答案保持清晰,与原始问题和评论分开。这里有一个完整的例子。

上下文:作为客户端的应用程序,从OAuth2授权服务器请求访问令牌。异步请求访问令牌,以避免在另一端处理令牌请求并到达响应时阻塞应用程序的线程。

首先,这是一个为其客户端提供访问令牌的类(方法getAccessToken):如果访问令牌已经初始化并且有效,则返回存储的值;否则,将获取一个调用内部方法fetchAccessTokenAsync的新方法:

public class Oauth2ClientBroker {
private static final String OAUHT2_SRVR_TOKEN_PATH= "/auth/realms/oam/protocol/openid-connect/token";
private static final String GRANT_TYPE = "client_credentials";

@Qualifier("oAuth2Client")
private final WebClient oAuth2Client;

private final ConfigurationHolder CfgHolder;

@GuardedBy("this")
private String token = null;

@GuardedBy("this")
private Instant tokenExpireTime;

@GuardedBy("this")
private String tokenUrlEndPoint;

public void getAccessToken(final CompletableFuture<String> completableFuture) {

    if (!isTokenInitialized() || isTokenExpired()) {
        log.trace("Access Token not initialized or has exired: go fetch a new one...");
        synchronized (this) {
            this.token = null;
        }
        fetchAccessTokenAsync(completableFuture);
    } else {
        log.trace("Reusing Access Token (not expired)");
        final String token;
        synchronized (this) {
            token = this.token;
        }
        completableFuture.complete(token);
    }
}

...}

接下来,我们将看到fetchAccessTokenAsync执行以下操作:

private void fetchAccessTokenAsync(final CompletableFuture<String> tokenReceivedInFuture) {

    Mono<String> accessTokenResponse = postAccessTokenRequest();
    accessTokenResponse.subscribe(tr -> processResponseBodyInFuture(tr, tokenReceivedInFuture));

}

这里发生了两件事:

  1. 方法postAccessTokenRequest()构建一个POST请求并声明如何使用响应(当WebFlux在收到后使其可用时),使用exchange eToMono
private Mono postAccessTokenRequest() {

        log.trace("Request Access Token for OAuth2 client {}", cfgHolder.getClientId());

        final URI uri = URI.create(cfgHolder.getsecServiceHostAndPort().concat(OAUHT2_SRVR_TOKEN_PATH));
            } else {
                uri = URI.create(tokenUrlEndPoint);
            }

        }
        log.debug("Access Token endpoint OAuth2 Authorization server: {}", uri.toString());

        return oAuth2Client.post().uri(uri)
                .body(BodyInserters.fromFormData("client_id", cfgHolder.getEdaClientId())
                        .with("client_secret", cfgHolder.getClientSecret())
                        .with("scope", cfgHolder.getClientScopes()).with("grant_type", GRANT_TYPE))
                .exchangeToMono(resp -> {
                    if (resp.statusCode().equals(HttpStatus.OK)) {
                        log.info("Access Token successfully obtained");
                        return resp.bodyToMono(String.class);
                    } else if (resp.statusCode().equals(HttpStatus.BAD_REQUEST)) {
                        log.error("Bad request sent to Authorization Server!");
                        return resp.bodyToMono(String.class);
                    } else if (resp.statusCode().equals(HttpStatus.UNAUTHORIZED)) {
                        log.error("OAuth2 Credentials exchange with Authorization Server failed!");
                        return resp.bodyToMono(String.class);
                    } else if (resp.statusCode().is5xxServerError()) {
                        log.error("Authorization Server could not generate a token due to a server error");
                        return resp.bodyToMono(String.class);
                    } else {
                        log.error("Authorization Server returned an unexpected status code: {}",
                                resp.statusCode().toString());
                        return Mono.error(new Exception(
                                String.format("Authorization Server returned an unexpected status code: %s",
                                        resp.statusCode().toString())));
                    }
                }).onErrorResume(e -> {
                    log.error(
                            "Access Token could not be obtained. Process ends here");
                    return Mono.empty();
                });
    }

exchange eToMono方法在这里发挥了大部分魔力:告诉WebFlux返回一个Mono,该Mono将在收到响应后立即异步接收信号,包装在ClientACK中,参数resp在lambda中消耗。但重要的是要记住,此时尚未发出任何请求;我们只是传入函数,它将在ClientACK到达时接受它,并将返回Mono

到目前为止,一切都很好:我们已经发出了请求,释放了CPU,但每当响应到来时,处理将继续到哪里?subscribe()方法将在本例中使用字符串参数化的消费者作为参数,该字符串与我们正在等待的响应的主体一样,用Mono包装。当响应到来时,WebFlux将向我们的Mono通知事件,Mono将调用方法processResponseBodyInFuture,在这里我们最终收到响应正文:

private void processResponseBodyInFuture(final String body, final CompletableFuture<String> tokenReceivedInFuture) {

    DocumentContext jsonContext = JsonPath.parse(body);

    try {
        log.info("Access Token response received: {}", body);
        final String aTkn = jsonContext.read("$.access_token");
        log.trace("Access Token parsed: {}", aTkn);
        final int expiresIn = jsonContext.read("$.expires_in");
        synchronized (this) {
            this.token = aTkn;
            this.tokenExpireTime = Instant.now().plusSeconds(expiresIn);
        }
        log.trace("Signal Access Token request completion. Processing will continue calling client...");
        tokenReceivedInFuture.complete(aTkn);
    } catch (PathNotFoundException e) {
        try {
            log.error(e.getMessage());
            log.info(String.format(
                    "Could not extract Access Token. The response returned corresponds to the error %s: %s",
                    jsonContext.read("$.error"), jsonContext.read("$.error_description")));
        } catch (PathNotFoundException e2) {
            log.error(e2.getMessage().concat(" - Unexpected json content received from OAuth2 Server"));
        }
    }

}

当单声道收到接收响应的信号时,就会调用此方法。所以在这里,我们尝试用访问令牌解析json内容,并对其进行处理。。。在本例中,调用初始方法调用方传入的CompletableFuture,希望它知道如何处理它。我们的工作在这里完成了。。。异步!

总结:总结一下,在ise反应式WebClient时,发送请求和处理响应的基本考虑事项如下:

  1. 考虑使用WebClient fluent API(设置http方法、uri、头和正文)来负责准备请求的方法。记住:这样做,您还没有发送任何请求
  2. 考虑一下您将使用何种策略来获取将接收http客户端事件(响应或错误)的发布者retrieve()是最直接的方法,但它比exchangeToMono操纵响应的能力要小
  3. 订阅。。。否则什么都不会发生。你会发现很多例子会欺骗你:他们声称将WebClient用于Asynchrony,但他们“忘记”订阅发布服务器,而是调用block()。好吧,虽然这让事情变得更简单,而且看起来很有效(您将看到收到的响应并传递给应用程序),但问题是这不再是异步的:在响应到达之前,您的Mono(或Flux,无论您使用什么)都将被阻塞。不太好
  4. 有一个单独的方法(作为在subscribe()方法中传递的使用者),用于处理响应正文

 类似资料:
  • 问题内容: 我知道这个问题以前曾被问过,但是所有解决方案都不适合我。 我有一个将参数发送到API的函数,并以列表的形式返回数据。我有一个UITableView设置为使用该列表,但是它在列表分配给变量之前运行。 码: 如果不立即将其作为重复投票,我将不胜感激,这是我尝试的方法。 派遣组 信号量计时 运行变量 其中包括= self和= self 。 编辑:要求提取项目, 问题答案: 您不能-也不应该-

  • 问题内容: 嗨,我的脚本中有2个Ajax调用,我需要它们运行asnyc以节省时间,但是我需要第二个才能等待第一个完成。 有什么想法吗?谢谢 问题答案: 如果使用jQuery 1.5+,则可以使用jQuery 完成。诸如此类的东西(缩短了ajax的简洁性,只需像上面那样传递对象) 您不知道它们将以什么顺序返回,因此,如果您手动滚动此请求,则需要检查另一个请求的状态并等待它返回。

  • 我的完成处理程序有问题。下面是一个带有完成处理程序的函数,位于一个实用程序文件中: 我在视图控制器中调用它 输出清楚地表明该函数在运行该块之前没有等待完成: 我如何解决这个问题?

  • 我搜索了许多网站和文档,但异步调用使用的代码相同。但不确定为什么它不起作用。如果我错过了什么,有人能帮我吗?

  • 本文向大家介绍使用AJAX完成用户名是否存在异步校验,包括了使用AJAX完成用户名是否存在异步校验的使用技巧和注意事项,需要的朋友参考一下 使用AJAX完成用户名是否存在异步校验: 1.事件触发: * onblur 2.编写AJAX代码: * 项Action中提交:传递username参数 3.编写Action * 接收username:模型驱动接收. 4.* 编写实体类 * User * Use

  • 我正在编写一个WinForms应用程序,它将数据传输到USB HID类设备。我的应用程序使用了优秀的通用HID库V6.0,可以在这里找到。简单来说,当我需要向设备写入数据时,这是被调用的代码: 当我的代码退出while循环时,我需要从设备中读取一些数据。但是,设备无法立即响应,因此我需要等待此呼叫返回后再继续。由于当前存在,RequestToGetInputReport()声明如下: GetInp