当前位置: 首页 > 知识库问答 >
问题:

Spring webflux webclient在评估第一次调用的响应时再次调用

潘国源
2023-03-14

我正在使用webClient调用endpoint并希望将我得到的响应映射到另一个对象。在映射该对象时,我想对响应的某些参数进行额外调用。

我的第一个调用返回以下对象

{
  "type": "Collection",
  "key": "some.key",
  "settings": [
    {
      "type": "Struct",
      "key": "steps",
      "value": [
        {
          "type": "Struct",
          "key": "1",
          "value": [
            {
              "type": "String",
              "key": "headline",
              "value": "someheadline"
            },
            {
              "type": "String",
              "key": "subheadline",
              "value": "somesubheadline"
            },
            {
              "type": "Link",
              "key": "link.to.another.object",
              "value": {
                "linkType": "Boilerplate",
                "key": "configurabletextkey"
              }
            }
          ]
        }
      ]
    },
    {
      "type": "Struct",
      "key": "commons",
      "value": [
        {
          "type": "String",
          "key": "mandatory.fields.text",
          "value": "Pflichtfelder"
        }
      ]
    }
  ]
}

我把这种反应描绘成这样:

webClient
        .get()
        .uri(
            uriBuilder ->
                uriBuilder
                    .path(headlessConfig.getEndpoint())
                    .pathSegment(contentId)
                    .build())
        .retrieve()
        .bodyToMono(Collection.class)
        .map(response -> {
                  return getCollectionContent(response);
                })

在getCollectionContent方法中,我迭代设置数组,从响应中提取数据并将其映射到PageContent对象。

public class PageContent {

  private String pageId;

  private List<Message> messages;
}
public class Message {

  @NonNull private String key;

  @NonNull private String text;

  private Boolean containsHtml = false;
}

如果响应包含类型“String”,我只需将数据添加到消息对象中,并将其添加到页面内容列表中。

现在到问题了。如果类型是“链接”,我想像上面一样使用webClient对同一个endpoint进行另一次调用以获取该对象的键和文本,从中创建一个消息对象并将其添加到我现有的列表中。

相应的代码如下所示:

webClient
        .get()
        .uri(
            uriBuilder ->
                uriBuilder
                    .path(headlessConfig.getEndpoint())
                    .pathSegment(contentKey)
                    .build())
        .retrieve()
        .bodyToMono(ConfigurableText.class)
        .map(
                configurableTextResponse -> {
                  messages.add(
                      new Message(
                          prefix + configurableTextResponse.getKey(),
                          configurableTextResponse.getText(),
                          true));
                  return Mono.empty();
                })

现在,当我尝试这样做时,什么都没有发生,我只收到PageContent对象,没有链接的消息。

以resttemplate的阻塞方式,这种逻辑应该可以工作,但我想让它与webclient一起工作。

编辑:

迭代列表并提取消息数据的代码:

private PageContent getCollectionContent(Collection response) {

    PageContent pageContent = new PageContent();
    pageContent.setPageId(response.getKey());

    List<Message> messages = new ArrayList<>();

    response
        .getSettings()
        .forEach(
            settingsItemsArray -> {
              var settingsItemList = (List<?>) settingsItemsArray.getValue();
              String prefix = settingsItemsArray.getKey() + ".";

              extractMessageText(prefix, (LinkedHashMap<?, ?>) settingsItemList.get(0), messages);
            });

    pageContent.setMessages(messages);

    return pageContent;
  }

代码以提取MessageText、进一步迭代或获取链接类型的缺失文本。

private void extractMessageText(
      String prefix, LinkedHashMap<?, ?> settingsItem, List<Message> messages) {

    String itemKey = (String) settingsItem.get(KEY);
    String itemType = (String) settingsItem.get(TYPE);

    switch (itemType) {
      case "String":
        messages.add(new Message(prefix + itemKey, (String) settingsItem.get(VALUE)));
        break;
      case "Struct":
        ((List<?>) settingsItem.get(VALUE))
            .forEach(
                structItems ->
                    extractMessageText(
                        prefix + settingsItem.get(KEY) + ".",
                        (LinkedHashMap<?, ?>) structItems,
                        messages));
        break;
      case "Link":
        webClient
        .get()
        .uri(
            uriBuilder ->
                uriBuilder
                    .path(headlessConfig.getEndpoint())
                    .pathSegment(contentKey)
                    .build())
        .retrieve()
        .bodyToMono(ConfigurableText.class)
        .map(
                configurableTextResponse -> {
                  messages.add(
                      new Message(
                          prefix + configurableTextResponse.getKey(),
                          configurableTextResponse.getText(),
                          true));
                  return Mono.empty();
                })
        break;
      default:
        break;
    }
  }

共有1个答案

阮俊弼
2023-03-14

我已经更改了你的一些代码,使其与Reactor模式更兼容。我已经将递归更改为excandDeep,并且还使用Jackson解析JSON。我希望这能给你一些想法如何解决你的问题。

List<Message> messages = Flux
                .fromIterable(jsonNode.get("settings"))
                //expand the graph into a stream of flat data and track the address of the node with 'prefix'
                //expand/exapndDeep operators are alternatives of recursion in project reactor
                .expandDeep(parent -> {
                    String parentPrefix = Optional.ofNullable(parent.get("prefix")).map(JsonNode::asText)
                            .orElse(parent.get("key").asText());
                    String type = parent.get("type").asText();
                    if (type.equals("Struct")) {
                        return Flux.fromIterable(parent.get("value"))
                                .cast(ObjectNode.class)
                                .map(child -> child.put("prefix", parentPrefix + ":" + child.get("key").asText()));
                    }
                    return Mono.empty();
                })
                //we have to choose only leaf nodes aka String and Link nodes
                .filter(node -> Arrays.asList("String", "Link").contains(node.get("type").asText()))
                //now process expanded leaf nodes
                .flatMap(leaf -> {
                    if ("String".equals(leaf.get("type").asText())) {
                        return Mono.just(new Message(leaf.get("prefix").asText(), leaf.get("value").asText(), true));
                    }
                    if ("Link".equals(leaf.get("type").asText())) {
                        return webClient
                                .get()
                                .uri(
                                        uriBuilder ->
                                                uriBuilder
                                                        .pathSegment(leaf.get("key").asText())
                                                        .build())
                                .retrieve()
                                .bodyToMono(JsonNode.class)
                                .map(configurableTextResponse -> new Message(
                                        leaf.get("prefix") + configurableTextResponse.get("key").asText(),
                                        configurableTextResponse.get("text").asText(),
                                        true));
                    }
                    return Mono.empty();
                })
                // at this point we are getting stream of the Message objects from the Link/String nodes
                //collect them into a list
                .collectList()
                //we have to subscribe()/block() the mono to actually invoke the pipline.
                .block();

代码没有执行任何操作的主要原因是您没有订阅WebClient管道。

编辑:

改变

  .map(response -> {
                  return getCollectionContent(response);
                })

.flatMap(response -> {
                      return getCollectionContent(response);
                    })

并从getCollection Content(响应)Mono返回

类似于:

     // at this point we are getting stream of the Message objects from the Link/String nodes
                    //collect them into a list
                    .collectList()
                    .map(messages -> {
                        PageContent pageContent = new PageContent();
                        pageContent.setPageId(response.get("pageId").asText());
pageContent.setMessages(messages);
                        return pageContent;
                    });

在这些更改之后,您的getCollectionContent()将返回一个发布者

 类似资料:
  • 我有一个项目使用以下代码调用componentDidMount()上的API服务 我的组件将过滤器从redux存储区传递到API调用。 但是,如果过滤器通过另一个组件发生变化怎么办?我似乎找不到正确的生命周期方法来在发生这种情况时重新调用api。

  • 我尝试做什么: 我希望HashSet中充满程序不知道的新词。用户按下主机上的“转换”按钮。带有单词的文件的路径在主框架上给出。 如果单词是新单词,则会打开一个J对话框并要求插入新单词(因此您可以更改拼写,例如第一个字母大…)。 如果用户按下JDialog上的“写入”按钮,该单词将添加到HashSet中。 但是如果我在那之后打印我的HashSet,则只显示“旧”值。当我第二次按下主框架上的“转换”按

  • 我刚认识科特林·科鲁廷。我刚刚创建了测试livedata的新项目,但我无法观察到数据的变化。我不明白LiveData的概念。什么时候会触发?因为当我观察ROOM数据库时(不是coroutines方式,我使用的是MutableLiveData),它工作得非常好。Observer总是在数据更改时触发。 我的存储库: 我的ViewModel: 我的主要活动:

  • 我试图将jBPM 5.4集成到现有的servlet中。 servlet在JBoss 7.1中运行良好,但在我添加一个与jBPM相关的API时,由于抛出异常而失败。(我使用了安装中的评估样本作为模板。)我向servlet的doPost()中添加了: 执行此语句后,将引发以下异常: 20:51:06394错误[org.apache.catalina.core.ContainerBase.[jboss.

  • 本文向大家介绍ajax的定时调用每5秒调用一次,包括了ajax的定时调用每5秒调用一次的使用技巧和注意事项,需要的朋友参考一下

  • 我在windows 8.1电脑上使用SFML 1.6,代码为::Blocks 12.11。我一直有问题,所以我做了一个非常简单的测试程序,看起来像这样: 当我尝试运行此程序时,它会打印“开始”,然后崩溃。我已经检查了我的链接器设置,我认为它们是正确的,因为它编译得很好,没有错误或警告。我有什么遗漏吗?