当前位置: 首页 > 知识库问答 >
问题:

Spring WebFlux&WebClient进行许多API调用

姚烨
2023-03-14

list urls=arrays.aslist(“www.example-rest.com/url1”、“www.example-rest.com/url2”、...、“www.example-rest.com/url600”);

  • 我希望以并行方式并使用无阻塞线程(如Python中的eventlet)
  • 我想用这样的线程创建单独的共享工作线程池,以便不为每个传入请求创建一个共享工作线程池。

以下是有关调度器的文档http://projectreactor.io/docs/core/release/reference/#schedulers

但是我不能为每个请求创建新的工作池,池中的线程仍然以阻塞的方式工作。

如果有人用Spring WebClient做类似的任务,请提供一个例子,解释什么是正确的方法

共有1个答案

卢黎昕
2023-03-14

我也做过类似的事...我的目标是创建一个带有如下签名的方法:

Flux<BasicIssue> getIssues(WebClient webClient);

因为我所调用的网站只提供了一个分页接口,所以我需要将多个REST调用的结果输入到一个流量中。下面是我的实现。请注意我对CachedThreadPool的使用。

Flux<BasicIssue> getIssues(WebClient webClient) {
    return Flux.generate(
        () -> new IssuesState(webClient),
        (state, sink) -> {
            BasicIssue ret = state.getNext();
            if (ret == null) {
                sink.complete();
            } else {
                sink.next(ret);
            }
            return state;
        }
}


class IssuesState {

    private final AtomicBoolean isDone = new AtomicBoolean(false);
    private final AtomicInteger threadCount = new AtomicInteger(1);
    private final Executor executor = Executors.newCachedThreadPool();
    private final LinkedBlockingQueue<BasicIssue> issueQueue = new LinkedBlockingQueue();

    public IssuesState(WebClient webClient) {
        executor.execute(() -> getNextBlock(webClient, 0));
    }

    private void getNextBlock(final WebClient webClient, final int startAt) {
        webClient
            .get()
            .uri(...)
            .header("Authorization", "Basic " + Base64Utils.encodeToString(("username:password".getBytes(UTF_8))))
            .accept(MediaType.APPLICATION_JSON)
            .retrieve()
            .bodyToMono(PageableIssue.class)
            .subscribe(pageableIssue -> {
                int maxResults = pageableIssue.getMaxResults();
                int total = pageableIssue.getTotal();
                if (startAt == 0) {
                    for (int i = startAt + maxResults; i < total; i += maxResults) {
                        threadCount.incrementAndGet();
                        final int x = i;
                        executor.execute(() -> getNextBlock(webClient, x));
                    }
                }
                synchronized (issueQueue) {
                    for (BasicIssue issue : pageableIssue.getIssues()) {
                        issueQueue.add(issue);
                    }

                    if (threadCount.decrementAndGet() == 0) {
                        isDone.set(true);
                    }
                }
            });
    }

    public BasicIssue getNext() {
        synchronized (issueQueue) {
            if (isDone.get() && issueQueue.isEmpty()) {
                return null;
            }
        }
        try {
            return issueQueue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
}

使用上面的方法…

getIssues(webClient)
    .subscribe(basicIssue -> System.out.println(basicIssue.getName());
 类似资料:
  • 我终于学会了用Reactor进行函数式编程。所以我是新手。 我要做的第一件事是使用WebClient调用外部API。这个调用需要是递归的,因为响应提供了调用参数的下一个值,我需要在下一个调用中使用它,直到满足微不足道的情况。 下面是我的想法: 似乎我需要把我的想法调整到这种编程风格,所以请给我一些例子 谢谢

  • 我需要进行异步调用,并使用其中存在的一些值对同一服务进行多次调用。将这些调用的响应与第一个调用结合起来,然后返回。 例如,当我第一次调用时,我会在JSON下面看到一个ID列表。现在,我必须使用这些ID对一个服务进行多次调用,并列出它们的响应列表,然后通过在同一个JSON中添加它们将其发送到下游。 我试过使用zipWhen和 但是结果列表总是以空或空的形式出现。我们如何才能做到这一点?我是不是漏了什

  • 我希望从spring reactive WebClient进行SOAP调用。我找不到任何文件。想知道会有什么方法。现在我在想 null 缺点和其他方法是什么?

  • 我需要进行3个相关的WebClient API调用。最后,我想要一个最终响应对象的Mono。我需要使用第一个API响应中的值来调用第二个API(它将返回Purchase类的Mono)。Purchase类将包含2个成员变量 用户对象 现在,对于列表中的每个值,我需要进行第三次API调用。然后将最终的mono对象返回到控制器 目前,我一直在研究如何为列表中的每个值(由第二个API返回)异步调用第三个A

  • 我必须调用api使用生成。我将在中发送令牌。我通过网络获得了一些,但我不知道如何与API一起传递参数。有人能帮忙吗? 我需要调用docusign apihttps://account-d.docusign.com/oauth/token 此外,我需要发送和。我不知道如何收到这个。有人能举个例子吗?以下是供参考的图像。 在此处输入图像描述

  • 返回的流量 返回一个 如果您不能回答我的问题,请至少告诉我如何并行地执行多个API调用,并在WebClient中等待结果