我终于学会了用Reactor进行函数式编程。所以我是新手。
我要做的第一件事是使用WebClient调用外部API。这个调用需要是递归的,因为响应提供了调用参数的下一个值,我需要在下一个调用中使用它,直到满足微不足道的情况。
下面是我的想法:
Flux.from(p -> queryUntilNow())
.flatMap(res -> // res is object )
.subscribe( // process )
private Flux<ApiResp> queryUntilNow() {
return Flux.from(p -> {
queryAPI(since)
.doOnError(System.out::println)
.subscribe(apiResp -> {
if (since == apiResp.last) return;
since = apiResp.last;
queryUntilNow();
});
});
}
private Flux<ApiResp> queryAPI(int last) {
Flux<ApiResp> resp = kapi.get()
.uri("/OHLC?pair={pair}&since={since}&interval={int}", pair, last, interval)
.retrieve()
.bodyToFlux(ApiResp.class);
return resp;
}
似乎我需要把我的想法调整到这种编程风格,所以请给我一些例子
谢谢
如果只需要循环成批返回的线性结果(与递归树相反),可以使用一个重复的通量,其起点在每次重复时都会发生变化。
下面是一个模拟api调用的完整示例。您可以在您的WebClient调用中替换queryfrom
:
public class Main {
private static class ApiResp {
private final int last;
private ApiResp(int last) {
this.last = last;
}
}
public static void main(String[] args) {
queryBetween(0, 15)
.doOnNext(apiResp -> System.out.println(apiResp.last))
.blockLast();
}
public static Flux<ApiResp> queryBetween(int startInclusive, int endExclusive) {
// The starting point of the next iteration
final AtomicReference<Integer> nextIterationStart = new AtomicReference<>(startInclusive);
return Flux
// defer will cause a new Flux with a new starting point to be created for each subscription
.defer(() -> queryFrom(nextIterationStart.get()))
// update the starting point of the next iteration
.doOnNext(apiResp -> nextIterationStart.set(apiResp.last + 1))
// repeat with a new subscription if we haven't reached the end yet
.repeat(() -> nextIterationStart.get() < endExclusive)
// make sure we didn't go past the end if queryFrom returned more results than we need
.takeWhile(apiResp -> apiResp.last < endExclusive);
}
public static Flux<ApiResp> queryFrom(int start) {
// simulates an api call that always returns 10 results from the starting point
return Flux.range(start, 10)
.map(ApiResp::new);
}
}
我希望以并行方式并使用无阻塞线程(如Python中的eventlet) 我想用这样的线程创建单独的共享工作线程池,以便不为每个传入请求创建一个共享工作线程池。 以下是有关调度器的文档http://projectreactor.io/docs/core/release/reference/#schedulers 但是我不能为每个请求创建新的工作池,池中的线程仍然以阻塞的方式工作。 如果有人用Spri
需要从单声道递归调用单声道以获得完整的项目。我有一个Pojo项目,在这里我将传递根ID,并尝试从另一个服务获得项目。我写我的服务使用sprignwebFlow。所以我正在使用webClient调用服务并返回Mono 另一项服务将提供该项目及其直接子项。因此,我的要求是,当我传递根id时,我将获得根项及其直接子项,根将LM类型项作为子项。 获得Root项目后,我需要收集所有的LM id,并再次调用每
我必须调用api使用生成。我将在中发送令牌。我通过网络获得了一些,但我不知道如何与API一起传递参数。有人能帮忙吗? 我需要调用docusign apihttps://account-d.docusign.com/oauth/token 此外,我需要发送和。我不知道如何收到这个。有人能举个例子吗?以下是供参考的图像。 在此处输入图像描述
RestTemplate文档中的Spring注意: 注意:从5.0开始,这个类处于维护模式,只接受微小的更改请求和bug。请考虑使用org.springframework.web.reactive.client.WebClient,它具有更现代的API,支持同步、异步和流场景。 https://docs.spring.io/spring-framework/docs/current/javadoc
我搜索了许多网站和文档,但异步调用使用的代码相同。但不确定为什么它不起作用。如果我错过了什么,有人能帮我吗?
我希望从spring reactive WebClient进行SOAP调用。我找不到任何文件。想知道会有什么方法。现在我在想 null 缺点和其他方法是什么?