当前位置: 首页 > 知识库问答 >
问题:

使用WebClient和Reactor 3.0进行递归API调用

周瀚
2023-03-14

我终于学会了用Reactor进行函数式编程。所以我是新手。

我要做的第一件事是使用WebClient调用外部API。这个调用需要是递归的,因为响应提供了调用参数的下一个值,我需要在下一个调用中使用它,直到满足微不足道的情况。

下面是我的想法:

    Flux.from(p -> queryUntilNow())
            .flatMap(res -> // res is object )
            .subscribe( // process )



private Flux<ApiResp> queryUntilNow() {
    return Flux.from(p -> {
        queryAPI(since)
                .doOnError(System.out::println)
                .subscribe(apiResp -> {
                    if (since == apiResp.last) return;

                    since = apiResp.last;
                    queryUntilNow();
                });
    });
}

private Flux<ApiResp> queryAPI(int last) {
    Flux<ApiResp> resp = kapi.get()
            .uri("/OHLC?pair={pair}&since={since}&interval={int}", pair, last, interval)
            .retrieve()
            .bodyToFlux(ApiResp.class);

    return resp;
}

似乎我需要把我的想法调整到这种编程风格,所以请给我一些例子

谢谢

共有1个答案

韩弘阔
2023-03-14

如果只需要循环成批返回的线性结果(与递归树相反),可以使用一个重复的通量,其起点在每次重复时都会发生变化。

下面是一个模拟api调用的完整示例。您可以在您的WebClient调用中替换queryfrom

public class Main {

    private static class ApiResp {
        private final int last;
        private ApiResp(int last) {
            this.last = last;
        }
    }

    public static void main(String[] args) {
        queryBetween(0, 15)
                .doOnNext(apiResp -> System.out.println(apiResp.last))
                .blockLast();
    }

    public static Flux<ApiResp> queryBetween(int startInclusive, int endExclusive) {
        // The starting point of the next iteration
        final AtomicReference<Integer> nextIterationStart = new AtomicReference<>(startInclusive);
        return Flux
                // defer will cause a new Flux with a new starting point to be created for each subscription
                .defer(() -> queryFrom(nextIterationStart.get()))
                // update the starting point of the next iteration
                .doOnNext(apiResp -> nextIterationStart.set(apiResp.last + 1))
                // repeat with a new subscription if we haven't reached the end yet
                .repeat(() -> nextIterationStart.get() < endExclusive)
                // make sure we didn't go past the end if queryFrom returned more results than we need
                .takeWhile(apiResp -> apiResp.last < endExclusive);
    }

    public static Flux<ApiResp> queryFrom(int start) {
        // simulates an api call that always returns 10 results from the starting point
        return Flux.range(start, 10)
                .map(ApiResp::new);
    }
}
 类似资料:
  • 我希望以并行方式并使用无阻塞线程(如Python中的eventlet) 我想用这样的线程创建单独的共享工作线程池,以便不为每个传入请求创建一个共享工作线程池。 以下是有关调度器的文档http://projectreactor.io/docs/core/release/reference/#schedulers 但是我不能为每个请求创建新的工作池,池中的线程仍然以阻塞的方式工作。 如果有人用Spri

  • 需要从单声道递归调用单声道以获得完整的项目。我有一个Pojo项目,在这里我将传递根ID,并尝试从另一个服务获得项目。我写我的服务使用sprignwebFlow。所以我正在使用webClient调用服务并返回Mono 另一项服务将提供该项目及其直接子项。因此,我的要求是,当我传递根id时,我将获得根项及其直接子项,根将LM类型项作为子项。 获得Root项目后,我需要收集所有的LM id,并再次调用每

  • 我必须调用api使用生成。我将在中发送令牌。我通过网络获得了一些,但我不知道如何与API一起传递参数。有人能帮忙吗? 我需要调用docusign apihttps://account-d.docusign.com/oauth/token 此外,我需要发送和。我不知道如何收到这个。有人能举个例子吗?以下是供参考的图像。 在此处输入图像描述

  • RestTemplate文档中的Spring注意: 注意:从5.0开始,这个类处于维护模式,只接受微小的更改请求和bug。请考虑使用org.springframework.web.reactive.client.WebClient,它具有更现代的API,支持同步、异步和流场景。 https://docs.spring.io/spring-framework/docs/current/javadoc

  • 我搜索了许多网站和文档,但异步调用使用的代码相同。但不确定为什么它不起作用。如果我错过了什么,有人能帮我吗?

  • 我希望从spring reactive WebClient进行SOAP调用。我找不到任何文件。想知道会有什么方法。现在我在想 null 缺点和其他方法是什么?