当前位置: 首页 > 知识库问答 >
问题:

Akka流-源的背压.展开异步

亢嘉茂
2023-03-14
def read(request: HttpRequest): Source[HttpResponse, _] =
  Source.unfoldAsync[Option[HttpRequest], HttpResponse](Some(request))(Crawl.crawl)

val parse: Flow[HttpResponse, General.BodyPart, _] = Flow[HttpResponse]
  .flatMapConcat(r => Source.fromFuture(Unmarshal(r).to[Multipart.General]))
  .flatMapConcat(_.parts)

....

def crawl(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] = reqOption match {
  case Some(req) =>
    Http().singleRequest(req).map { response =>
      if (response.status.isFailure()) Some((None, response))
      else nextRequest(response, HttpMethods.GET)
    }
  case None => Future.successful(None)
}

因此,一般的想法是使用source.unfoldAsync来爬取页面并执行HTTP请求(想法和实现与本答案中描述的非常接近。这将创建一个源[HttpResponse,_],然后可以使用它(解封为多部分,拆分为各个部分,...)。

我现在的问题是HttpResponse的消耗可能需要一段时间(如果页面很大,解组需要一些时间,可能最后会有一些数据库请求来持久化一些数据,...)。因此,如果下游速度较慢,我希望source.unfoldAsync为背压。默认情况下,下一个HTTP请求将在上一个请求完成后立即启动。

所以我的问题是:有什么方法可以使source.unfoldAsync对一个缓慢的下游产生背压吗?如果没有,是否有一种替代方法使背压成为可能?

我可以想象一个利用akka-http提供的主机级客户端API的解决方案,正如这里所描述的那样,其中第一个请求的响应将被用作生成第二个请求的输入,但我还没有尝试过,我不确定这是否可行。

编辑:在玩了几天,阅读了一些文档和博客之后,我不确定我是否正确地假设source.unfoldAsync的背压行为是根本原因。要添加更多的观察:

  • 当流启动时,我看到几个请求发出。首先,这是没有问题的,只要得到的httpresponse被及时使用(请参阅此处的描述)
  • 如果不更改默认的响应-实体-订阅-超时,将出现以下错误(我删除了URL):
    [WARN][03/30/2019 13:44:58.984][default-akka.actor.default-dispatcher-16][default/pool(共享->http://....)][1(WaitingForResponseEntitySubscription)]响应实体在%1秒后未被订阅。确保读取响应实体正文或对其调用discardBytes()。GET...Empty->200 OK Chunked
    这将导致IllegalStateException终止流:java.lang.IllegalStateException:子流源不能多次物化
  • 我注意到响应的解组是流中最慢的部分,这可能是有意义的,因为响应主体是多部分文档,因此相对较大。但是,我希望流的这一部分向上游(在我的示例中是source.UnfoldAsync部分)发出较少的需求信号。这将导致发出的请求较少。
  • 一些谷歌搜索将我引向了一个似乎描述了一个类似问题的问题的讨论。他们还讨论了响应处理不够快时出现的问题。关联的合并请求将带来文档更改,这些更改建议在继续处理流之前完全使用httpresponse。在对这个问题的讨论中,也有人怀疑将Akka Http与Akka流结合是否是一个好主意。因此,我可能不得不更改实现,以便直接在UnfoldAsync调用的函数内部执行解组操作。

共有1个答案

许彦
2023-03-14

根据source.UnfoldAsync的实现,只在拉出源时调用传入函数:

def onPull(): Unit = f(state).onComplete(asyncHandler)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)

因此,如果下游没有拉(背压),传递给源的函数就不会被调用。

在您的要旨中,您使用了runforeach(它与runwith(sink.foreach))在println完成后立即拉动上游。所以很难注意到这里的背压。

 类似资料:
  • 我需要创建一个具有以下接口的函数: 我的问题是,我不知道如何定义符合上述接口的流。 当我做这种事的时候 结果类型为Flow[Item,OtherItem,NotUsed]。到目前为止,我还没有在Akka文档中找到任何东西。还有akka上的功能。流动scaladsl。流只提供“未使用”而不是控制。如果有人能给我指出正确的方向那就太好了。 一些背景:我需要设置几个只在转换部分区分的管道。这些管道是主流

  • 如何使用项目Reactor背压功能与Kafka粘合剂在斯佩林云流? 如果我使用这种方式,比发布者发送延迟1秒的消息,但消费者消费消息没有任何延迟。 有可能在春雨云流中使用cunsumer上的BackPereSure吗?

  • 当接收到消息时,它将运行,并将接收到的每个项发布到。 我怎么能那么做? 以防万一它可能会添加更多选项,请注意,另一个代码块是的Websocket处理程序。

  • 我想要一个以给定的时间间隔计算函数并发出其输出的源。作为一种变通方法,我可以通过,但还没有找到更干净的方法。理想情况下,我会有 有什么想法吗?

  • 这看起来不可思议,但我找不到源代码存储库。主github repo包含一个akka stream dir,但不包含当前的发布源。 目前,我设法通过发布:http://search.maven.org/remotecontent?filepath=com/typesafe/akka/akka-stream-experimental_2.11/2.0.1/akka-stream-experimenta

  • 如何应用背压来限制生产比并行运行的更多的项目? 为了说明起见,这里有一个快速twitter用户名生成器、一个慢速twitter查找调用、一个慢速twitter文件编写器和一个打印方法。 最终目标是并行运行twitter查找,同时对生成器施加反压力,使其不会发出超出可处理范围的用户名(预计会有一些预取)。 这很好地在一个单独的线程上生成了5个twitter用户名 不确定它是正确的,但我的理由是,从一