我的计算图的一个阶段是flow[seq[Request],seq[Response],NotUsed]
类型的流。显然,这个阶段应该为每个请求分配一个响应,并在所有请求都被解决后发出seq。
现在,底层API有一个苛刻的速率限制策略,所以我每秒只能激发一个请求。如果我有一个flow
的单个request
,我可以使用每秒发出单个元素的zip
来zip
这个流(如何限制Akka流每秒只执行和发送一个消息一次?),但在这种情况下我没有看到类似的解决方案。
有什么好的表达方式吗?我想到的想法是使用低层图DSL并在那里使用一秒滴答流作为状态,并使用它来处理请求序列,但我怀疑它会变得好看。
就像维克多说的,你应该使用默认油门。但如果你想自己动手,它可能看起来像这样
private def throttleFlow[T](rate: FiniteDuration) = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val ticker = Source.tick(rate, rate, Unit)
val zip = builder.add(Zip[T, Unit.type])
val map = Flow[(T, Unit.type)].map { case (value, _) => value }
val messageExtractor = builder.add(map)
ticker ~> zip.in1
zip.out ~> messageExtractor.in
FlowShape.of(zip.in0, messageExtractor.out)
})
// And it will be used in your flow as follows
// .via(throttleFlow(FiniteDuration(200, MILLISECONDS)))
此外,由于您限制了对某些API的访问,您可能希望以集中的方式限制对它的调用。假设您的项目中有多个地方对同一个外部API进行调用,但是由于调用来自同一个IP,因此应该对所有这些地方应用节流。对于这种情况,请考虑使用mergehub.source
作为您(假定)的akka-http流。每个调用者将创建并执行新的图形来进行调用。
问题内容: 我正在用GRequests和lxml在Python 2.7.3中编写一个小脚本,这将允许我从各个网站收集一些可收集的卡价格并进行比较。问题是网站之一限制了请求的数量,如果我超过了它,则会发回HTTP错误429。 有没有一种方法可以限制GRequestes中的请求数量,以使我不超过我指定的每秒请求数量?另外-如果发生HTTP 429,如何让GRequestes在一段时间后重试? 附带说明
我正在用Python 2.7.3编写一个小脚本,其中包含GRequests和lxml,它将允许我从各种网站收集一些可收集的卡价格并进行比较。问题是其中一个网站限制了请求的数量,如果我超过它,就会发回HTTP错误429。 有没有办法在grequests中增加限制请求数,这样我就不会超过我指定的每秒请求数?还有——如果HTTP 429出现,我如何让GRequestes在一段时间后重试? 另一方面,他们
我正试图将mkv文件(见下面的属性)发送到Kinesis视频流。我想有10-15帧每秒的FPS。
速率限制配置参考 filter.http.RateLimit filter.http.RateLimit proto { "domain": "...", "stage": "...", "request_type": "...", "timeout": "{...}" } domain (string, REQUIRED) 需要调用速率限制服务时的域。 stage (uint3
速率限制配置参考。 filter.network.RateLimit filter.network.RateLimit proto { "stat_prefix": "...", "domain": "...", "descriptors": [], "timeout": "{...}" } stat_prefix (string, REQUIRED) 发布统计信息时使用的前缀。