如何正确处理由期货构建的Monos?
我试着让我的头脑围绕着Spring Reactive(和Spring 5),观看所有的视频,阅读所有我能找到的博客,但他们似乎都没有做一些事情,而不仅仅是查询数据库或其他琐碎的事情。
我正在使用新的AWS 2.0开发工具包,它使用CompletableFuture
的用于大多数事情。使用服务创建新实例,我的方法如下所示
public Mono<RunInstancesResponse> create(Instance instance) {
RunInstancesRequest runInstancesRequest = RunInstancesRequest.builder()
.instanceType(instance.getInstanceType())
.imageId(instance.getImageId())
.securityGroupIds(instance.getSecurityGroupIds())
.keyName(instance.getKeyName())
.minCount(1)
.maxCount(1)
.tagSpecifications(createTags(instance))
.build();
CompletableFuture<RunInstancesResponse> future = client.runInstances(runInstancesRequest);
future.whenComplete((response, error) -> {
response.reservation().instances().stream().map(aws -> Instance.builder()
.imageId(aws.imageId())
.build()
).forEach(instanceRepository::save);
});
return Mono.fromFuture(future);
}
我在这里的理解是,我几乎立即返回RunInstancesACK
类型的Mono
,而future.whenComplete
将随时执行它的操作。
我从我的路由处理器调用它,它看起来像
public Mono<ServerResponse> create(ServerRequest request) {
return request.bodyToMono(Instance.class)
.flatMap(createService::create)
.flatMap(i -> ServerResponse.accepted().build());
}
现在这几乎像我预期的那样工作,但是有几个关键的事情是错误的,我不知道如何解决。
1.)当Complete
从未被调用过,我相信这是因为我没有订阅它。
2.)服务器在完成之前
不会响应客户端(大约2.5秒),这并不理想,因为我希望它立即响应,然后在调用当完成时
更新客户端。
我感觉我的整个服务和处理程序完全是错误的。
我想举一些例子,说明我应该如何处理从<code>Mono<code>或<code>Flux<code>类型的路由处理程序调用的服务中的未来。
我编写了一个包装SQS、SNS和DynamoDb的开源库,以使其更容易一些。为了避免仅链接的答案,您可以对其应用以下内容:
public Mono<ChangeMessageVisibilityResult> changeMessageVisibility(String queueUrl, String receiptHandle, Integer visibilityTimeout) {
return Mono.create(subscriber -> amazonClient.changeMessageVisibilityAsync(queueUrl, receiptHandle, visibilityTimeout, AmazonWebServiceRequestAsyncHandler.valueEmittingHandlerFor(subscriber)));
}
传递的处理程序在 2 个世界之间进行转换:
public class AmazonWebServiceRequestAsyncHandler<RQ extends AmazonWebServiceRequest, RS> implements AsyncHandler<RQ, RS> {
private final MonoSink<? super RS> subscriber;
private boolean shouldEmitValue;
private AmazonWebServiceRequestAsyncHandler(MonoSink<? super RS> subscriber, boolean shouldEmitValue) {
this.subscriber = subscriber;
this.shouldEmitValue = shouldEmitValue;
}
@Override
public void onError(Exception exception) {
subscriber.error(exception);
}
@Override
public void onSuccess(RQ request, RS response) {
if (shouldEmitValue) {
subscriber.success(response);
} else {
subscriber.success();
}
}
public static <RQ extends AmazonWebServiceRequest, RS> AsyncHandler<RQ, RS> valueEmittingHandlerFor(final MonoSink<? super RS> subscriber) {
return new AmazonWebServiceRequestAsyncHandler<>(subscriber, true);
}
public static <RQ extends AmazonWebServiceRequest> AsyncHandler<RQ, Void> voidHandlerFor(MonoSink<? super Void> subscriber) {
return new AmazonWebServiceRequestAsyncHandler<>(subscriber, false);
}
}
假设我有一个使用CustomObject列表的API操作。对于这些对象中的每一个,它都会调用一个创建Mono的服务方法。如何以一种惯用的无阻塞方式从这些单一对象创建流量? 我现在想到的是这个。我更改了方法名称,以更好地反映它们的预期目的。 此外,我需要订阅通量才能真正让它返回一些东西吗?
我使用泽西API进行我的REST服务。我的问题是:是否有更优雅的方式以JSON形式返回异常?最好是自己创建一个json对象并将其直接附加到响应中? 这是服务中一种方法的简化示例。如您所见,我使用HashMap只是因为该方法可能引发异常,在这种情况下,我需要返回有关它的信息。
我正在我的项目中使用spring webflux。我的controller类调用返回Mono或Flux的服务类方法。 我正在尝试为我的服务类编写单元测试。我不确定如何为返回mono/flux的方法编写单元测试。我查看的大多数文章都建议我使用WebClientTest。但重点是,我在这里测试我的服务类。当我通过模拟服务类方法测试我的web层(控制器类)时,我使用了WebclientTest。 对如何
下一个metod在TokenService类中。
我正在学习Spring WebFlux,在编写示例应用程序的过程中,我发现了一个与Spring Cache结合的反应类型(Mono/Flux)相关的问题。 考虑以下代码段(Kotlin格式): 下面的代码用于SimpleCacheResolver,但默认情况下,由于Mono不可序列化,在Redis中失败。为了使它们工作,例如,需要使用Kryo串行化器。