当前位置: 首页 > 知识库问答 >
问题:

基于输入将WebClient并行调用的批处理结果转换为HashMap条目,而不阻止每个单独的调用

易雅畅
2023-03-14

我想将Mono从WebClient响应转换/添加到以输入为键的映射中

我正在使用WebClient并行执行一批REST调用,但是我不想返回用户列表,而是想返回一个ID的HashMap作为键,从REST调用返回的用户作为值。

我不想在添加到HashMap之前阻止每个单独的调用来获取值。

有没有一种方法可以将结果从WebClient转换为HashMap条目,而不会影响REST调用的并行执行?

我尝试了Mono的doOnSuccesscallback,但不确定这样做是否正确。

当前执行情况

public List<<User> fetchUsers(List<Integer> ids) {
        List<Mono<User>> userMonos = new ArrayList();
        
        for (int id : ids) {
            userMonos.add(webClient.get()
                .uri("/otheruser/{id}", id)
                .retrieve()
                .bodyToMono(User.class));
        }

       List<User> list = Flux.merge(userMonos).collectList().block();
       return list; 
    }

因此,预期产出为:

HashMap<Integer, User>()

如果我不能恰当地表达预期的结果,我道歉。请随时让我知道,如果我需要添加更多的细节或添加更多的清晰的问题。

我真的很感激你的帮助。与此同时,我也在努力寻找解决办法。

共有2个答案

段超
2023-03-14

您正在混合命令式代码和反应式代码。你必须选择一种方式,并坚持下去。

如果你想要的实际值,而不是MonoFlux必须块。把它想象成一个Future,在我们等待值出现之前,那里没有“值”。所以阻塞是ONLY方式。

如果我正确理解您的代码,我将执行以下操作。

public HashMap<Integer, User> fetchUsers(List<Integer> ids) {
    final Map<Integer, User> userMap = new HashMap();
    return Flux.fromIterable(ids)
                .flatMap(id -> webClient.get()
                    .uri("/otheruser/{id}", id)
                    .retrieve()
                    .bodyToMono(User.class)
                    .doOnSuccess(user -> {
                        userMap.put(id, user);
                    })
                .thenReturn(userMap)
                .block()
}

那么这段代码做什么呢?

它获取一个id的列表,并将其放入一个Flux中。通量将async同时启动所有请求,因为我们使用的是flatMap。当所有的请求完成后,我们将通过将值添加到hashmap来执行副作用。因为我们不关心返回类型,我们使用然后来忽略返回。我们告诉它返回hashmap。最后,我们调用block使代码实际运行,并等待所有的请求等完成并产生最终的hashmap。

我已经在手机上写了这篇文章,所以我不能检查编译器,但类似的东西应该可以让你开始。如果有人看到任何错误,请随意编辑。

戚均
2023-03-14

如果可能,最好避免修改副作用操作符(如doOnSuccess)的外部状态。例如,在这种特殊情况下,如果外部Map不是线程安全的,则可能会导致并发问题。

作为更好的选择,您可以使用Reactor操作符收集到地图

public Map<Integer, User> fetchUsers(List<Integer> ids) {
    return Flux.fromIterable(ids)
               .flatMap(id -> webClient.get()
                                       .uri("/otheruser/{id}", id)
                                       .retrieve()
                                       .bodyToMono(User.class)
                                       .map(user -> Tuples.of(id, user)))
               .collectMap(Tuple2::getT1, Tuple2::getT2)
               .block();
}

您可以创建一个小类来提高可读性,而不是Tuple。或者更好的是,如果用户知道它的ID,那么您可以完全省略Tuple,您可以执行类似的操作。collectMap(用户::getId,用户-

 类似资料:
  • 我有一种情况,我可以使用阻塞I/O库访问外部服务上的RESTendpoint,或者在这种情况下,我可以直接使用HttpClient(如WebClient)调用REST。现在,我想知道在包装对该库的调用和将其发布到弹性线程或使用WebClient访问endpoint之间是否存在性能差异。 如何准确地处理调用这两个选项。所以我们假设网络流量使用单线程来处理请求。然后,请求将由WebClient处理。这

  • 返回的流量 返回一个 如果您不能回答我的问题,请至少告诉我如何并行地执行多个API调用,并在WebClient中等待结果

  • 问题内容: 我正在使用的库使用回调对象发出一系列对象。 使用某些调用添加回调,并使用非阻塞方法调用启动该过程。 创建将发射这些对象的的最佳方法是什么? 如果阻止了该怎么办? 问题答案: 我认为您需要这样的东西(scala中给出的示例) 至于阻塞/非阻塞:通常,基于回调的体系结构将回调订阅与进程启动分开。在这种情况下,您可以完全独立于进程的时间创建任意数量的。另外,是否分叉的决定完全取决于您。您的体

  • 我正在使用spring批处理,我需要实现以下内容 读取包含日期和金额等详细信息的csv文件 将同一日期的所有金额的总和合计 保留一个带有日期和总和的条目 我在过去使用过批处理,我想到了下面的方法。用2个步骤创建批处理。 步骤1: 读取器:使用FlatFileItemReader遍历整个文件 处理器:用键作为日期,值作为金额填充映射。如果存在条目,则获取该值并将其添加到新值 编写器:没有操作编写器,

  • 这是我的源对象: 这是我的目标对象: 我的地图绘制程序如下所示: 目前它运行良好,但我想知道是否有一种更“优雅”的方法将地图条目设置为源。因为我没有使用“qualifiedByName”属性添加转换函数,所以看起来它只能在指定“源”时工作。我误解了什么吗? 我尝试了以下方法,但没有取得令人满意的结果: > 覆盖我的记录类中特定字段的getter 添加一个具有“qualifiedByName”属性的

  • 我从这个链接中阅读并应用了一些内容:如何将OnPostExecute()的结果获取到main activity,因为AsyncTask是一个单独的类?但是我在行委托上得到一个错误NullPointerException onPostExecute。ProcessFinish(result);我的代码有什么问题?代码如下: