我的代码是这样的:
return Flux.fromIterable(new Generator()).log()
.flatMap(
s ->
webClient
.head()
.uri(
MessageFormat.format(
"/my-{2,number,#00}.xml",
channel, timestamp, s))
.exchangeToMono(r -> Mono.just(r.statusCode()))
.filter(HttpStatus::is2xxSuccessful)
.map(r -> s),
6) //only request 6 segments in parallel via webClient
.take(6) //we need only 6 200 OK responses
.sort();
它只请求HEAD
,直到前6个请求成功。
并行化在这里起作用,但问题是在一个请求完成后,它会立即触发下一个请求(以保持并行化级别为6)。这里我需要的是并行化级别为6,但要分批进行。所以-触发6个请求,等待所有完成,再次触发6个请求。。。
这是上面的log()
的输出:
: | request(6)
: | onNext(7)
: | onNext(17)
: | onNext(27)
: | onNext(37)
: | onNext(47)
: | onNext(57)
: | request(1) <---- from here NOT OK; wait until all complete and request(6)
: | onNext(8)
: | request(1)
: | onNext(18)
: | request(1)
: | onNext(28)
: | request(1)
: | onNext(38)
: | request(1)
: | onNext(48)
: | request(1)
: | onNext(58)
: | cancel()
使现代化
这是我用缓冲区尝试的:
return Flux.fromIterable(new Generator())
.buffer(6)
.flatMap(Flux::fromIterable)
.log()
.flatMap(
s ->
webClient
.head()
.uri(
MessageFormat.format(
"/my-{2,number,#00}.xml",
channel, timestamp, s))
.exchangeToMono(r -> Mono.just(r.statusCode()))
.filter(HttpStatus::is2xxSuccessful)
.map(r -> s),
6) //only request 6 segments in parallel via webClient
.take(6)
.sort();
好的,看来我有代码了。这里我使用窗口
:
return Flux.fromIterable(new Generator())
.window(6) //group 1,2,3,4,5,6,7... into [0,1,2,3,4,5],[6,7..,11],[12,..,17]
.log()
.flatMap(
s -> s.log().flatMap(x -> webClient
.head()
.uri(
MessageFormat.format(
"/my-{2,number,#00}.xml",
channel, timestamp, x))
.exchangeToMono(r -> Mono.just(r.statusCode()))
.filter(HttpStatus::is2xxSuccessful)
.map(r -> x), 6), 1) //1 means take only 1 array ([0,1,2,3,4,5]). 6 means take in parallel all from array (0,1,2,3,4,5)
.take(6, true) //pass through only 6 elements (cancel afterwards)
.sort();
问题内容: API通常具有用户必须遵循的速率限制。举个例子,让我们50个请求/秒。连续的请求采取0.5-1秒,因此是来接近极限速度太慢。但是,使用aiohttp的并行请求超出了速率限制。 轮询API尽可能快地允许,需要限速并行调用。 例如,我发现到目前为止装饰,大约像这样: 这非常适用于连续通话。试图并行调用来实现这个按预期不起作用。 下面是一些代码示例: 这里的问题是,它会率限制 排队 的任务。
我正在开发一个应用程序,它需要每x分钟上线一次,检查一些新数据。为了防止大量的网络和数据使用,任务应该以固定的速率运行,但是这种解决方案的最佳方法是什么?< code >处理程序或< code >计时器对象?
当我使用Spring批处理管理运行长时间运行的批处理作业的多个实例时,它会在达到jobLauncher线程池任务执行程序池大小后阻止其他作业运行。但是从cron中提取多个工作似乎效果不错。下面是作业启动器配置。 Spring批处理管理员Restful API是否使用不同于xml配置中指定的作业启动器?
我正在尝试对我们的一些内部服务(网格内部)应用速率限制。 我使用了文档中的示例并生成了redis速率限制配置,其中包括(redis)处理程序、配额实例、配额规范、配额规范绑定和应用处理程序的规则。 此redis处理程序: 配额实例(目前我只对按目的地限制感兴趣): 配额规格,如果我理解正确,每个请求收费1: 所有参与服务预取的配额绑定规范。我还尝试了,但也没有任何效果。 应用处理程序的规则。目前在
我是项目Reactor或反应式编程的新手,所以我可能做错了什么。我正在努力构建一个执行以下操作的流程: 给定类实体: 从DB读取实体(
我目前正在构建一个spring批处理应用程序,其中执行了几个步骤。除了一个,所有的步骤都是简单的tasklet(没有读取器或写入器),它们负责各种任务,如复制文件、发送请求、启动批处理(*.bat)文件等。 大多数步骤应该是串行执行的。在一个特定的步骤中,我希望启动X文件,这些文件最多可以有Y个实例。 null 如果:)我想我必须使用taskExecutor,下面我有一个示例,在这里我开始第一步(