当前位置: 首页 > 知识库问答 >
问题:

从返回未来的服务创建Mono/Flux的正确方法

白哲茂
2023-03-14

如何正确处理由期货构建的Monos?

我试着让我的头脑围绕着Spring Reactive(和Spring 5),观看所有的视频,阅读所有我能找到的博客,但他们似乎都没有做一些事情,而不仅仅是查询数据库或其他琐碎的事情。

我正在使用新的AWS 2.0开发工具包,它使用CompletableFuture的用于大多数事情。使用服务创建新实例,我的方法如下所示

public Mono<RunInstancesResponse> create(Instance instance) {
    RunInstancesRequest runInstancesRequest = RunInstancesRequest.builder()
            .instanceType(instance.getInstanceType())
            .imageId(instance.getImageId())
            .securityGroupIds(instance.getSecurityGroupIds())
            .keyName(instance.getKeyName())
            .minCount(1)
            .maxCount(1)
            .tagSpecifications(createTags(instance))
            .build();

    CompletableFuture<RunInstancesResponse> future = client.runInstances(runInstancesRequest);

    future.whenComplete((response, error) -> {
        response.reservation().instances().stream().map(aws -> Instance.builder()
                .imageId(aws.imageId())
                .build()
        ).forEach(instanceRepository::save);

    });

    return Mono.fromFuture(future);
}

我在这里的理解是,我几乎立即返回RunInstancesACK类型的Mono,而future.whenComplete将随时执行它的操作。

我从我的路由处理器调用它,它看起来像

public Mono<ServerResponse> create(ServerRequest request) {
    return request.bodyToMono(Instance.class)
            .flatMap(createService::create)
            .flatMap(i -> ServerResponse.accepted().build());
}

现在这几乎像我预期的那样工作,但是有几个关键的事情是错误的,我不知道如何解决。

1.)当Complete从未被调用过,我相信这是因为我没有订阅它。

2.)服务器在完成之前不会响应客户端(大约2.5秒),这并不理想,因为我希望它立即响应,然后在调用当完成时更新客户端。

我感觉我的整个服务和处理程序完全是错误的。

我想举一些例子,说明我应该如何处理从<code>Mono<code>或<code>Flux<code>类型的路由处理程序调用的服务中的未来。

共有1个答案

刘海
2023-03-14

我编写了一个包装SQS、SNS和DynamoDb的开源库,以使其更容易一些。为了避免仅链接的答案,您可以对其应用以下内容:

public Mono<ChangeMessageVisibilityResult> changeMessageVisibility(String queueUrl, String receiptHandle, Integer visibilityTimeout) {
    return Mono.create(subscriber -> amazonClient.changeMessageVisibilityAsync(queueUrl, receiptHandle, visibilityTimeout, AmazonWebServiceRequestAsyncHandler.valueEmittingHandlerFor(subscriber)));
  }

传递的处理程序在 2 个世界之间进行转换:

public class AmazonWebServiceRequestAsyncHandler<RQ extends AmazonWebServiceRequest, RS> implements AsyncHandler<RQ, RS> {
    private final MonoSink<? super RS> subscriber;
    private boolean shouldEmitValue;

    private AmazonWebServiceRequestAsyncHandler(MonoSink<? super RS> subscriber, boolean shouldEmitValue) {
        this.subscriber = subscriber;
        this.shouldEmitValue = shouldEmitValue;
    }

    @Override
    public void onError(Exception exception) {
        subscriber.error(exception);
    }

    @Override
    public void onSuccess(RQ request, RS response) {
        if (shouldEmitValue) {
            subscriber.success(response);
        } else {
            subscriber.success();
        }
    }

    public static <RQ extends AmazonWebServiceRequest, RS> AsyncHandler<RQ, RS> valueEmittingHandlerFor(final MonoSink<? super RS> subscriber) {
        return new AmazonWebServiceRequestAsyncHandler<>(subscriber, true);
    }

    public static <RQ extends AmazonWebServiceRequest> AsyncHandler<RQ, Void> voidHandlerFor(MonoSink<? super Void> subscriber) {
        return new AmazonWebServiceRequestAsyncHandler<>(subscriber, false);
    }
}
 类似资料:
  • 假设我有一个使用CustomObject列表的API操作。对于这些对象中的每一个,它都会调用一个创建Mono的服务方法。如何以一种惯用的无阻塞方式从这些单一对象创建流量? 我现在想到的是这个。我更改了方法名称,以更好地反映它们的预期目的。 此外,我需要订阅通量才能真正让它返回一些东西吗?

  • 我使用泽西API进行我的REST服务。我的问题是:是否有更优雅的方式以JSON形式返回异常?最好是自己创建一个json对象并将其直接附加到响应中? 这是服务中一种方法的简化示例。如您所见,我使用HashMap只是因为该方法可能引发异常,在这种情况下,我需要返回有关它的信息。

  • 我正在我的项目中使用spring webflux。我的controller类调用返回Mono或Flux的服务类方法。 我正在尝试为我的服务类编写单元测试。我不确定如何为返回mono/flux的方法编写单元测试。我查看的大多数文章都建议我使用WebClientTest。但重点是,我在这里测试我的服务类。当我通过模拟服务类方法测试我的web层(控制器类)时,我使用了WebclientTest。 对如何

  • 下一个metod在TokenService类中。

  • 我正在学习Spring WebFlux,在编写示例应用程序的过程中,我发现了一个与Spring Cache结合的反应类型(Mono/Flux)相关的问题。 考虑以下代码段(Kotlin格式): 下面的代码用于SimpleCacheResolver,但默认情况下,由于Mono不可序列化,在Redis中失败。为了使它们工作,例如,需要使用Kryo串行化器。