当前位置: 首页 > 知识库问答 >
问题:

akka流http速率限制

商麒
2023-03-14

我的计算图的一个阶段是flow[seq[Request],seq[Response],NotUsed]类型的流。显然,这个阶段应该为每个请求分配一个响应,并在所有请求都被解决后发出seq。

现在,底层API有一个苛刻的速率限制策略,所以我每秒只能激发一个请求。如果我有一个flow的单个request,我可以使用每秒发出单个元素的zipzip这个流(如何限制Akka流每秒只执行和发送一个消息一次?),但在这种情况下我没有看到类似的解决方案。

有什么好的表达方式吗?我想到的想法是使用低层图DSL并在那里使用一秒滴答流作为状态,并使用它来处理请求序列,但我怀疑它会变得好看。

共有1个答案

长孙泉
2023-03-14

就像维克多说的,你应该使用默认油门。但如果你想自己动手,它可能看起来像这样

private def throttleFlow[T](rate: FiniteDuration) = Flow.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val ticker = Source.tick(rate, rate, Unit)

  val zip = builder.add(Zip[T, Unit.type])
  val map = Flow[(T, Unit.type)].map { case (value, _) => value }
  val messageExtractor = builder.add(map)

  ticker ~> zip.in1
  zip.out ~> messageExtractor.in

  FlowShape.of(zip.in0, messageExtractor.out)
})

// And it will be used in your flow as follows
// .via(throttleFlow(FiniteDuration(200, MILLISECONDS)))

此外,由于您限制了对某些API的访问,您可能希望以集中的方式限制对它的调用。假设您的项目中有多个地方对同一个外部API进行调用,但是由于调用来自同一个IP,因此应该对所有这些地方应用节流。对于这种情况,请考虑使用mergehub.source作为您(假定)的akka-http流。每个调用者将创建并执行新的图形来进行调用。

 类似资料:
  • 问题内容: 我正在用GRequests和lxml在Python 2.7.3中编写一个小脚本,这将允许我从各个网站收集一些可收集的卡价格并进行比较。问题是网站之一限制了请求的数量,如果我超过了它,则会发回HTTP错误429。 有没有一种方法可以限制GRequestes中的请求数量,以使我不超过我指定的每秒请求数量?另外-如果发生HTTP 429,如何让GRequestes在一段时间后重试? 附带说明

  • 我正在用Python 2.7.3编写一个小脚本,其中包含GRequests和lxml,它将允许我从各种网站收集一些可收集的卡价格并进行比较。问题是其中一个网站限制了请求的数量,如果我超过它,就会发回HTTP错误429。 有没有办法在grequests中增加限制请求数,这样我就不会超过我指定的每秒请求数?还有——如果HTTP 429出现,我如何让GRequestes在一段时间后重试? 另一方面,他们

  • 我正试图将mkv文件(见下面的属性)发送到Kinesis视频流。我想有10-15帧每秒的FPS。

  • 速率限制配置参考 filter.http.RateLimit filter.http.RateLimit proto { "domain": "...", "stage": "...", "request_type": "...", "timeout": "{...}" } domain (string, REQUIRED) 需要调用速率限制服务时的域。 stage (uint3

  • 速率限制配置参考。 filter.network.RateLimit filter.network.RateLimit proto { "stat_prefix": "...", "domain": "...", "descriptors": [], "timeout": "{...}" } stat_prefix (string, REQUIRED) 发布统计信息时使用的前缀。