当前位置: 首页 > 知识库问答 >
问题:

什么是最好的方法来使一个非阻塞HTTP请求使用ReacterWebClient和反序列化对象的响应?

杨起运
2023-03-14

我有使用Vert等异步库的经验。x,但对Reactor/WebFlux来说是新的。我想在一个web应用程序上公开一个endpoint,当被击中时,它会掉头调用另一个web服务,将响应解析为Java对象,然后访问对象中的字段并对其进行处理。我正在使用WebClient进行HTTP调用,并使用Jackson ObjectMapper对其进行反序列化。我的代码大致如下(注意:RequestUtil.safeDeserialize只使用Jackson将字符串体解析为一个对象,并返回Optional)

    public Mono<String> function(final String param) {
        final String encodedRequestBody = RequestUtil.encodeRequestBody(param);
        final Mono<Response> responseMono = webClient.post()
                .uri("/endpoint")
                .header("Content-Type", "application/x-www-form-urlencoded")
                .header("Authorization", "Basic " + basicAuthHeader)
                .accept(MediaType.APPLICATION_JSON)
                .body(BodyInserters.fromPublisher(Mono.just(encodedRequestBody), String.class))
                .exchange()
                .flatMap(clientResponseMono -> clientResponseMono.bodyToMono(String.class))
                .map(RequestUtil::safeDeserialize)
                .map(resp -> resp.orElseThrow(() -> new RuntimeException("Failed to deserialize Oscar response!")));

        responseMono.subscribe(response -> {
            // Pull out some of the fields from the `Response` type object and do something with them
        });

        return responseMono.map(Response::aStringField);
    }

在针对遵循完全相同逻辑的相同应用程序对该代码进行性能测试后,但通过阻塞Java11HttpClient类进行HTTP调用,我发现两者几乎没有什么区别——事实上,WebClient实现的性能略低于阻塞实现。

很明显,我在某个地方犯了一个错误,无论是代码还是我对这里发生的事情的心理模型,所以非常感谢您的帮助/建议。谢谢

编辑:根据@Toerktumlare回复中的建议,我已将函数更新为以下内容:

    public Mono<String> function(final String param) {
        final Mono<String> encodedRequestBody = RequestUtil.encodeRequestBodyToMono(param);
        final Mono<Response> responseMono = webClient.post()
                .uri("/endpoint")
                .header("Content-Type", "application/x-www-form-urlencoded")
                .header("Authorization", "Basic " + basicAuthHeader)
                .accept(MediaType.APPLICATION_JSON)
                .body(encodedRequestBody, String.class)
                .retrieve()
                .bodyToMono(Response.class);

        return responseMono.flatMap(response -> {
            final String field = response.field();
            // Use `field` to do something that would produce a log message
            logger.debug("Field is: {}", field);
            return Mono.just(field);
        });
}

运行此代码时,我看不到任何日志记录。这让我觉得HTTP调用实际上没有发生(或者没有及时完成?)因为当我使用subscribe和相同的WebClient代码时,我可以成功地打印出响应中的字段。我错过了什么?

Edit2:此函数用于提供对endpoint的响应(为简洁起见,省略了几行代码):

    @Bean
    public RouterFunction<ServerResponse> routerFunction(ResponseHandler handler) {
        return RouterFunctions.route(RequestPredicates.GET("/my/endpoint")
                .and(RequestPredicates.accept(MediaType.ALL)), handler::endpoint);
    }
 

    public Mono<ServerResponse> endpoint(ServerRequest request) {
        // Pull out a comma-separated list from the request
        final List<String> params = Arrays.asList(fieldFromRequest.split(","));

        // For each param, call function(param), roll into list
        List<Mono<String>> results = params.stream()
                .map(nonBlockingClient::function)
                .collect(Collectors.toList());

        // Check to see if any of the requests failed
        if (results.size() != params.size()) {
            return ServerResponse.status(500).build();
        }

        logger.debug("Success");
        return ServerResponse.ok().build();
    }

共有1个答案

韩阳成
2023-03-14

最有可能的问题是使用subscribe

消费者订阅制作人。您的后端应用程序是一个生产者,它使调用客户端成为消费者。也就是说,通常是呼叫客户机应该订阅,而不是你。

你现在做的基本上是消费你自己的生产。这在某种程度上是阻塞。

一般来说,你永远不应该订阅一个网络流量应用程序,除非你的应用程序调用一个api,然后消耗响应(例如将其保存在数据库中等)。发起呼叫的人通常是订户

我会重写最后一部分,并删除订阅

return responseMono.flatMap(response -> {
        final string = doSomething(response);
        return OscarGuacResponse.aStringField(string);
    });

我还看到,在RequestUtil::safeDeserializez中,您返回了一个可选值

也可以考虑在许多地方使用flatMap而不是map。要理解这两者之间的差异,你可以看看这个答案。

此外,在以后研究性能时,您应该了解,在衡量webflux性能时,需要考虑内存占用和线程数等因素,而不是非阻塞应用程序。在速度方面,您可能看不到任何性能提升,但可以看到应用程序使用的线程更少,这反过来意味着内存占用更小,这本身就是一个提升。

更新:

在进行反应式编程时,您试图编写常规java代码,但这是行不通的。为什么你的代码不起作用是因为你正在“断链”。

我在没有IDE的情况下写了这篇文章,所以它可能无法编译,但你应该了解它。您总是需要在前面的基础上进行链接,在反应式编程中通常不需要java流等东西。

public Mono<ServerResponse> endpoint(ServerRequest request) {
    final List<String> params = Arrays.asList(fieldFromRequest.split(","));
    return Flux.fromIterable(params)
               .flatMap(param -> nonBlockingClient.function(param))
               .collectList()
               .flatMap(list -> {
                   if (list.size() != params.size()) {
                       return ServerResponse.status(500).build();
                   }
                   return ServerResponse.ok().build();
               })
}

这是基本的反应式编程,我强烈建议您阅读reactor文档的“getting started”部分,以便了解基本知识,因为如果您要在反应式应用程序中编写常规java代码,您会遇到麻烦。

 类似资料:
  • 问题内容: 我正在使用PHP从远程服务器下载一个(大)文件,并且此下载是通过单击网页上的下载按钮触发的。 因此,当我单击网页上的按钮时,就会向PHP函数发出请求(带有angulars )。该函数使用触发下载。 同时,我想使用Ajax向我的PHP网站提出其他请求。但是,只要下载正在进行,所有其他Ajax请求都会显示状态。 因此,基本上,下载阻止了对PHP的所有其他请求。有什么办法可以避免这种阻塞?

  • 我有一个顶点,它有一个处理程序,可以在事件循环线程中调用Vertx的Web客户端。实际的底层API调用是同步的还是异步的?它会阻塞我的事件循环线程吗?假设我的API调用需要30秒才能返回。 我是否需要用Vertx.execute阻塞(p-

  • 不允许使用任何导入。 输入: 所需输出: 我在找一个简洁的方法。我知道如何用多行for循环来解包它。

  • 问题内容: 我正在打开一个具有100,000个URL的文件。我需要向每个URL发送一个HTTP请求并打印状态代码。我正在使用Python 2.6,到目前为止,我们研究了Python实现线程/并发性的许多令人困惑的方式。我什至看过python 并发库,但无法弄清楚如何正确编写此程序。有没有人遇到过类似的问题?我想通常我需要知道如何尽快地在Python中执行数千个任务-我想这意味着“同时”。 问题答案

  • 我在尝试用netty 4.1写一个非阻塞代理。我有一个处理传入连接的“FrontHandler ”,还有一个处理传出连接的“BackHandler”。我在关注hexdumproxyhandler(https://github . com/netty/netty/blob/ed4a 89082 bb 29 b 9 e 7d 869 C5 d 25d 6 b 9 ea 8 fc 9d 25 b/exa

  • 我需要根据一些配置数据启动多个独立的周期性任务——总数事先不知道。例如,我想检查具有不同间隔和不同参数的不同目录的内容,其中列表和参数是从配置中读取的。 在我看来,夸克斯调度器只能调度固定的、预先已知的方法。动态/编程调度的最佳方法是什么?<代码>vertx.set周期 是正确的方法还是我应该以某种方式访问Quartz?