我不熟悉反应式编程范式,但最近我决定在Spring WebClient上建立一个简单的Http客户端,因为旧的同步RestTemplate已经在维护中,可能会在即将发布的版本中被弃用。
因此,首先我查看了Spring文档,然后在web上搜索示例。
我必须说,(只是暂时)我已经有意识地决定不通过Reactor库留档,所以除了发布者-订阅者模式之外,我对Mono和Flux的了解很少。相反,我专注于让一些东西工作。
我的场景是一个简单的POST,用于向客户端只对响应状态代码感兴趣的服务器发送回调。没有返回正文。所以我终于想出了这个有效的代码片段:
private void notifyJobSuccess(final InternalJobData jobData) {
SuccessResult result = new SuccessResult();
result.setJobId(jobData.getJobId());
result.setStatus(Status.SUCCESS);
result.setInstanceId(jobData.getInstanceId());
log.info("Result to send back:" + System.lineSeparator() + "{}", result.toString());
this.webClient.post()
.uri(jobData.getCallbackUrl())
.body(Mono.just(result), ReplaySuccessResult.class)
.retrieve()
.onStatus(s -> s.equals(HttpStatus.OK), resp -> {
log.info("Expected CCDM response received with HttpStatus = {}", HttpStatus.OK);
return Mono.empty();
})
.onStatus(HttpStatus::is4xxClientError, resp -> {
log.error("CCDM response received with unexpected Client Error HttpStatus {}. "
+ "The POST request sent by EDA2 stub did not match CCDM OpenApi spec", resp.statusCode());
return Mono.empty();
})
.onStatus(HttpStatus::is5xxServerError, resp -> {
log.error("CCDM response received with unexpected Server Error HttpStatus {}", resp.statusCode());
return Mono.empty();
}).bodyToMono(Void.class).subscribe(Eda2StubHttpClient::handleResponseFromCcdm);
}
我对反应式WebClient的工作原理了解不足,从调用订阅开始。在编写客户端代码之前,我检查的数十个示例中没有一个包含这样的调用,但事实是,在我包含该调用之前,服务器一直在等待请求。
然后,我偶然发现了一句格言:“在你订阅之前什么都不会发生”。我知道模式Plublisher订阅服务器,但我(错误地)认为订阅是由WebClient API在任何exchage或BodyTomino方法中处理的。。。block()肯定必须订阅,因为当您阻止它时,请求会立即发出。
所以我的第一个问题是:真的需要调用subscribe()吗?
第二个问题是为什么StubHttpClient::HandlerResponse方法从未被回调。对于这一点,我发现的唯一解释是,由于Mono返回的是一个
最后,要求一个完整的例子来说明一个方法在单声道中接收一个身体,然后再使用它,这是否太过分了?我发现的所有示例都只关注于发出请求,但我现在无法理解Mono或Flux后来是如何使用的。。。我知道我必须尽快检查Reactor文件,但我希望能得到一些帮助,因为我遇到了异常和错误处理方面的问题。
谢谢
自从我在这里求助以来,已经有一段时间了。现在,我不想编辑,而是想为我的前一个问题添加一个答案,以便答案保持清晰,与原始问题和评论分开。这里有一个完整的例子。
上下文:作为客户端的应用程序,从OAuth2授权服务器请求访问令牌。异步请求访问令牌,以避免在另一端处理令牌请求并到达响应时阻塞应用程序的线程。
首先,这是一个为其客户端提供访问令牌的类(方法getAccessToken):如果访问令牌已经初始化并且有效,则返回存储的值;否则,将获取一个调用内部方法fetchAccessTokenAsync的新方法:
public class Oauth2ClientBroker {
private static final String OAUHT2_SRVR_TOKEN_PATH= "/auth/realms/oam/protocol/openid-connect/token";
private static final String GRANT_TYPE = "client_credentials";
@Qualifier("oAuth2Client")
private final WebClient oAuth2Client;
private final ConfigurationHolder CfgHolder;
@GuardedBy("this")
private String token = null;
@GuardedBy("this")
private Instant tokenExpireTime;
@GuardedBy("this")
private String tokenUrlEndPoint;
public void getAccessToken(final CompletableFuture<String> completableFuture) {
if (!isTokenInitialized() || isTokenExpired()) {
log.trace("Access Token not initialized or has exired: go fetch a new one...");
synchronized (this) {
this.token = null;
}
fetchAccessTokenAsync(completableFuture);
} else {
log.trace("Reusing Access Token (not expired)");
final String token;
synchronized (this) {
token = this.token;
}
completableFuture.complete(token);
}
}
...}
接下来,我们将看到fetchAccessTokenAsync执行以下操作:
private void fetchAccessTokenAsync(final CompletableFuture<String> tokenReceivedInFuture) {
Mono<String> accessTokenResponse = postAccessTokenRequest();
accessTokenResponse.subscribe(tr -> processResponseBodyInFuture(tr, tokenReceivedInFuture));
}
这里发生了两件事:
postAccessTokenRequest()
构建一个POST请求并声明如何使用响应(当WebFlux在收到后使其可用时),使用exchange eToMono
:private Mono postAccessTokenRequest() {
log.trace("Request Access Token for OAuth2 client {}", cfgHolder.getClientId());
final URI uri = URI.create(cfgHolder.getsecServiceHostAndPort().concat(OAUHT2_SRVR_TOKEN_PATH));
} else {
uri = URI.create(tokenUrlEndPoint);
}
}
log.debug("Access Token endpoint OAuth2 Authorization server: {}", uri.toString());
return oAuth2Client.post().uri(uri)
.body(BodyInserters.fromFormData("client_id", cfgHolder.getEdaClientId())
.with("client_secret", cfgHolder.getClientSecret())
.with("scope", cfgHolder.getClientScopes()).with("grant_type", GRANT_TYPE))
.exchangeToMono(resp -> {
if (resp.statusCode().equals(HttpStatus.OK)) {
log.info("Access Token successfully obtained");
return resp.bodyToMono(String.class);
} else if (resp.statusCode().equals(HttpStatus.BAD_REQUEST)) {
log.error("Bad request sent to Authorization Server!");
return resp.bodyToMono(String.class);
} else if (resp.statusCode().equals(HttpStatus.UNAUTHORIZED)) {
log.error("OAuth2 Credentials exchange with Authorization Server failed!");
return resp.bodyToMono(String.class);
} else if (resp.statusCode().is5xxServerError()) {
log.error("Authorization Server could not generate a token due to a server error");
return resp.bodyToMono(String.class);
} else {
log.error("Authorization Server returned an unexpected status code: {}",
resp.statusCode().toString());
return Mono.error(new Exception(
String.format("Authorization Server returned an unexpected status code: %s",
resp.statusCode().toString())));
}
}).onErrorResume(e -> {
log.error(
"Access Token could not be obtained. Process ends here");
return Mono.empty();
});
}
exchange eToMono
方法在这里发挥了大部分魔力:告诉WebFlux返回一个Mono,该Mono将在收到响应后立即异步接收信号,包装在ClientACK
中,参数resp
在lambda中消耗。但重要的是要记住,此时尚未发出任何请求;我们只是传入函数,它将在ClientACK
到达时接受它,并将返回Mono
到目前为止,一切都很好:我们已经发出了请求,释放了CPU,但每当响应到来时,处理将继续到哪里?subscribe()方法将在本例中使用字符串参数化的消费者作为参数,该字符串与我们正在等待的响应的主体一样,用Mono包装。当响应到来时,WebFlux将向我们的Mono通知事件,Mono将调用方法processResponseBodyInFuture,在这里我们最终收到响应正文:
private void processResponseBodyInFuture(final String body, final CompletableFuture<String> tokenReceivedInFuture) {
DocumentContext jsonContext = JsonPath.parse(body);
try {
log.info("Access Token response received: {}", body);
final String aTkn = jsonContext.read("$.access_token");
log.trace("Access Token parsed: {}", aTkn);
final int expiresIn = jsonContext.read("$.expires_in");
synchronized (this) {
this.token = aTkn;
this.tokenExpireTime = Instant.now().plusSeconds(expiresIn);
}
log.trace("Signal Access Token request completion. Processing will continue calling client...");
tokenReceivedInFuture.complete(aTkn);
} catch (PathNotFoundException e) {
try {
log.error(e.getMessage());
log.info(String.format(
"Could not extract Access Token. The response returned corresponds to the error %s: %s",
jsonContext.read("$.error"), jsonContext.read("$.error_description")));
} catch (PathNotFoundException e2) {
log.error(e2.getMessage().concat(" - Unexpected json content received from OAuth2 Server"));
}
}
}
当单声道收到接收响应的信号时,就会调用此方法。所以在这里,我们尝试用访问令牌解析json内容,并对其进行处理。。。在本例中,调用初始方法调用方传入的CompletableFuture,希望它知道如何处理它。我们的工作在这里完成了。。。异步!
总结:总结一下,在ise反应式WebClient时,发送请求和处理响应的基本考虑事项如下:
考虑使用WebClient fluent API(设置http方法、uri、头和正文)来负责准备请求的方法。记住:这样做,您还没有发送任何请求
- 考虑一下您将使用何种策略来获取将接收http客户端事件(响应或错误)的发布者
retrieve()
是最直接的方法,但它比exchangeToMono操纵响应的能力要小 - 订阅。。。否则什么都不会发生。你会发现很多例子会欺骗你:他们声称将WebClient用于Asynchrony,但他们“忘记”订阅发布服务器,而是调用
block()
。好吧,虽然这让事情变得更简单,而且看起来很有效(您将看到收到的响应并传递给应用程序),但问题是这不再是异步的:在响应到达之前,您的Mono(或Flux,无论您使用什么)都将被阻塞。不太好 - 有一个单独的方法(作为在subscribe()方法中传递的使用者),用于处理响应正文
问题内容: 我知道这个问题以前曾被问过,但是所有解决方案都不适合我。 我有一个将参数发送到API的函数,并以列表的形式返回数据。我有一个UITableView设置为使用该列表,但是它在列表分配给变量之前运行。 码: 如果不立即将其作为重复投票,我将不胜感激,这是我尝试的方法。 派遣组 信号量计时 运行变量 其中包括= self和= self 。 编辑:要求提取项目, 问题答案: 您不能-也不应该-
问题内容: 嗨,我的脚本中有2个Ajax调用,我需要它们运行asnyc以节省时间,但是我需要第二个才能等待第一个完成。 有什么想法吗?谢谢 问题答案: 如果使用jQuery 1.5+,则可以使用jQuery 完成。诸如此类的东西(缩短了ajax的简洁性,只需像上面那样传递对象) 您不知道它们将以什么顺序返回,因此,如果您手动滚动此请求,则需要检查另一个请求的状态并等待它返回。
我搜索了许多网站和文档,但异步调用使用的代码相同。但不确定为什么它不起作用。如果我错过了什么,有人能帮我吗?
我的完成处理程序有问题。下面是一个带有完成处理程序的函数,位于一个实用程序文件中: 我在视图控制器中调用它 输出清楚地表明该函数在运行该块之前没有等待完成: 我如何解决这个问题?
本文向大家介绍使用AJAX完成用户名是否存在异步校验,包括了使用AJAX完成用户名是否存在异步校验的使用技巧和注意事项,需要的朋友参考一下 使用AJAX完成用户名是否存在异步校验: 1.事件触发: * onblur 2.编写AJAX代码: * 项Action中提交:传递username参数 3.编写Action * 接收username:模型驱动接收. 4.* 编写实体类 * User * Use
我正在编写一个WinForms应用程序,它将数据传输到USB HID类设备。我的应用程序使用了优秀的通用HID库V6.0,可以在这里找到。简单来说,当我需要向设备写入数据时,这是被调用的代码: 当我的代码退出while循环时,我需要从设备中读取一些数据。但是,设备无法立即响应,因此我需要等待此呼叫返回后再继续。由于当前存在,RequestToGetInputReport()声明如下: GetInp