list
以下是有关调度器的文档http://projectreactor.io/docs/core/release/reference/#schedulers
但是我不能为每个请求创建新的工作池,池中的线程仍然以阻塞的方式工作。
如果有人用Spring WebClient做类似的任务,请提供一个例子,解释什么是正确的方法。
我也做过类似的事...我的目标是创建一个带有如下签名的方法:
Flux<BasicIssue> getIssues(WebClient webClient);
因为我所调用的网站只提供了一个分页接口,所以我需要将多个REST调用的结果输入到一个流量中。下面是我的实现。请注意我对CachedThreadPool的使用。
Flux<BasicIssue> getIssues(WebClient webClient) {
return Flux.generate(
() -> new IssuesState(webClient),
(state, sink) -> {
BasicIssue ret = state.getNext();
if (ret == null) {
sink.complete();
} else {
sink.next(ret);
}
return state;
}
}
class IssuesState {
private final AtomicBoolean isDone = new AtomicBoolean(false);
private final AtomicInteger threadCount = new AtomicInteger(1);
private final Executor executor = Executors.newCachedThreadPool();
private final LinkedBlockingQueue<BasicIssue> issueQueue = new LinkedBlockingQueue();
public IssuesState(WebClient webClient) {
executor.execute(() -> getNextBlock(webClient, 0));
}
private void getNextBlock(final WebClient webClient, final int startAt) {
webClient
.get()
.uri(...)
.header("Authorization", "Basic " + Base64Utils.encodeToString(("username:password".getBytes(UTF_8))))
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(PageableIssue.class)
.subscribe(pageableIssue -> {
int maxResults = pageableIssue.getMaxResults();
int total = pageableIssue.getTotal();
if (startAt == 0) {
for (int i = startAt + maxResults; i < total; i += maxResults) {
threadCount.incrementAndGet();
final int x = i;
executor.execute(() -> getNextBlock(webClient, x));
}
}
synchronized (issueQueue) {
for (BasicIssue issue : pageableIssue.getIssues()) {
issueQueue.add(issue);
}
if (threadCount.decrementAndGet() == 0) {
isDone.set(true);
}
}
});
}
public BasicIssue getNext() {
synchronized (issueQueue) {
if (isDone.get() && issueQueue.isEmpty()) {
return null;
}
}
try {
return issueQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
使用上面的方法…
getIssues(webClient)
.subscribe(basicIssue -> System.out.println(basicIssue.getName());
我终于学会了用Reactor进行函数式编程。所以我是新手。 我要做的第一件事是使用WebClient调用外部API。这个调用需要是递归的,因为响应提供了调用参数的下一个值,我需要在下一个调用中使用它,直到满足微不足道的情况。 下面是我的想法: 似乎我需要把我的想法调整到这种编程风格,所以请给我一些例子 谢谢
我需要进行异步调用,并使用其中存在的一些值对同一服务进行多次调用。将这些调用的响应与第一个调用结合起来,然后返回。 例如,当我第一次调用时,我会在JSON下面看到一个ID列表。现在,我必须使用这些ID对一个服务进行多次调用,并列出它们的响应列表,然后通过在同一个JSON中添加它们将其发送到下游。 我试过使用zipWhen和 但是结果列表总是以空或空的形式出现。我们如何才能做到这一点?我是不是漏了什
我希望从spring reactive WebClient进行SOAP调用。我找不到任何文件。想知道会有什么方法。现在我在想 null 缺点和其他方法是什么?
我需要进行3个相关的WebClient API调用。最后,我想要一个最终响应对象的Mono。我需要使用第一个API响应中的值来调用第二个API(它将返回Purchase类的Mono)。Purchase类将包含2个成员变量 用户对象 现在,对于列表中的每个值,我需要进行第三次API调用。然后将最终的mono对象返回到控制器 目前,我一直在研究如何为列表中的每个值(由第二个API返回)异步调用第三个A
我必须调用api使用生成。我将在中发送令牌。我通过网络获得了一些,但我不知道如何与API一起传递参数。有人能帮忙吗? 我需要调用docusign apihttps://account-d.docusign.com/oauth/token 此外,我需要发送和。我不知道如何收到这个。有人能举个例子吗?以下是供参考的图像。 在此处输入图像描述
返回的流量 返回一个 如果您不能回答我的问题,请至少告诉我如何并行地执行多个API调用,并在WebClient中等待结果