当前位置: 首页 > 知识库问答 >
问题:

通量-带webclient的并行平面图-固定批处理速率限制

林昱
2023-03-14

我的代码是这样的:

return Flux.fromIterable(new Generator()).log()
        .flatMap(
            s ->
                webClient
                    .head()
                    .uri(
                        MessageFormat.format(
                            "/my-{2,number,#00}.xml",
                            channel, timestamp, s))
                    .exchangeToMono(r -> Mono.just(r.statusCode()))
                    .filter(HttpStatus::is2xxSuccessful)
                    .map(r -> s),
            6)  //only request 6 segments in parallel via webClient
        .take(6) //we need only 6 200 OK responses
        .sort();

它只请求HEAD,直到前6个请求成功。

并行化在这里起作用,但问题是在一个请求完成后,它会立即触发下一个请求(以保持并行化级别为6)。这里我需要的是并行化级别为6,但要分批进行。所以-触发6个请求,等待所有完成,再次触发6个请求。。。

这是上面的log()的输出:

: | request(6)
: | onNext(7)
: | onNext(17)
: | onNext(27)
: | onNext(37)
: | onNext(47)
: | onNext(57)
: | request(1) <---- from here NOT OK; wait until all complete and request(6)
: | onNext(8)
: | request(1)
: | onNext(18)
: | request(1)
: | onNext(28)
: | request(1)
: | onNext(38)
: | request(1)
: | onNext(48)
: | request(1)
: | onNext(58)
: | cancel()

使现代化

这是我用缓冲区尝试的:

return Flux.fromIterable(new Generator())
        .buffer(6)
        .flatMap(Flux::fromIterable)
        .log()
        .flatMap(
            s ->
                webClient
                    .head()
                    .uri(
                        MessageFormat.format(
                            "/my-{2,number,#00}.xml",
                            channel, timestamp, s))
                    .exchangeToMono(r -> Mono.just(r.statusCode()))
                    .filter(HttpStatus::is2xxSuccessful)
                    .map(r -> s),
            6)  //only request 6 segments in parallel via webClient
        .take(6)
        .sort();

共有1个答案

丌官晔
2023-03-14

好的,看来我有代码了。这里我使用窗口

return Flux.fromIterable(new Generator())
        .window(6) //group 1,2,3,4,5,6,7... into [0,1,2,3,4,5],[6,7..,11],[12,..,17]
        .log()
        .flatMap(
            s -> s.log().flatMap(x -> webClient
                .head()
                .uri(
                    MessageFormat.format(
                        "/my-{2,number,#00}.xml",
                        channel, timestamp, x))
                .exchangeToMono(r -> Mono.just(r.statusCode()))
                .filter(HttpStatus::is2xxSuccessful)
                .map(r -> x), 6), 1)  //1 means take only 1 array ([0,1,2,3,4,5]). 6 means take in parallel all from array (0,1,2,3,4,5)
        .take(6, true) //pass through only 6 elements (cancel afterwards)
        .sort();

 类似资料:
  • 问题内容: API通常具有用户必须遵循的速率限制。举个例子,让我们50个请求/秒。连续的请求采取0.5-1秒,因此是来接近极限速度太慢。但是,使用aiohttp的并行请求超出了速率限制。 轮询API尽可能快地允许,需要限速并行调用。 例如,我发现到目前为止装饰,大约像这样: 这非常适用于连续通话。试图并行调用来实现这个按预期不起作用。 下面是一些代码示例: 这里的问题是,它会率限制 排队 的任务。

  • 我正在开发一个应用程序,它需要每x分钟上线一次,检查一些新数据。为了防止大量的网络和数据使用,任务应该以固定的速率运行,但是这种解决方案的最佳方法是什么?< code >处理程序或< code >计时器对象?

  • 当我使用Spring批处理管理运行长时间运行的批处理作业的多个实例时,它会在达到jobLauncher线程池任务执行程序池大小后阻止其他作业运行。但是从cron中提取多个工作似乎效果不错。下面是作业启动器配置。 Spring批处理管理员Restful API是否使用不同于xml配置中指定的作业启动器?

  • 我正在尝试对我们的一些内部服务(网格内部)应用速率限制。 我使用了文档中的示例并生成了redis速率限制配置,其中包括(redis)处理程序、配额实例、配额规范、配额规范绑定和应用处理程序的规则。 此redis处理程序: 配额实例(目前我只对按目的地限制感兴趣): 配额规格,如果我理解正确,每个请求收费1: 所有参与服务预取的配额绑定规范。我还尝试了,但也没有任何效果。 应用处理程序的规则。目前在

  • 我是项目Reactor或反应式编程的新手,所以我可能做错了什么。我正在努力构建一个执行以下操作的流程: 给定类实体: 从DB读取实体(

  • 我目前正在构建一个spring批处理应用程序,其中执行了几个步骤。除了一个,所有的步骤都是简单的tasklet(没有读取器或写入器),它们负责各种任务,如复制文件、发送请求、启动批处理(*.bat)文件等。 大多数步骤应该是串行执行的。在一个特定的步骤中,我希望启动X文件,这些文件最多可以有Y个实例。 null 如果:)我想我必须使用taskExecutor,下面我有一个示例,在这里我开始第一步(