def read(request: HttpRequest): Source[HttpResponse, _] =
Source.unfoldAsync[Option[HttpRequest], HttpResponse](Some(request))(Crawl.crawl)
val parse: Flow[HttpResponse, General.BodyPart, _] = Flow[HttpResponse]
.flatMapConcat(r => Source.fromFuture(Unmarshal(r).to[Multipart.General]))
.flatMapConcat(_.parts)
....
def crawl(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] = reqOption match {
case Some(req) =>
Http().singleRequest(req).map { response =>
if (response.status.isFailure()) Some((None, response))
else nextRequest(response, HttpMethods.GET)
}
case None => Future.successful(None)
}
因此,一般的想法是使用source.unfoldAsync
来爬取页面并执行HTTP请求(想法和实现与本答案中描述的非常接近。这将创建一个源[HttpResponse,_]
,然后可以使用它(解封为多部分,拆分为各个部分,...)。
我现在的问题是HttpResponse
的消耗可能需要一段时间(如果页面很大,解组需要一些时间,可能最后会有一些数据库请求来持久化一些数据,...)。因此,如果下游速度较慢,我希望source.unfoldAsync
为背压。默认情况下,下一个HTTP请求将在上一个请求完成后立即启动。
所以我的问题是:有什么方法可以使source.unfoldAsync
对一个缓慢的下游产生背压吗?如果没有,是否有一种替代方法使背压成为可能?
我可以想象一个利用akka-http提供的主机级客户端API的解决方案,正如这里所描述的那样,其中第一个请求的响应将被用作生成第二个请求的输入,但我还没有尝试过,我不确定这是否可行。
编辑:在玩了几天,阅读了一些文档和博客之后,我不确定我是否正确地假设source.unfoldAsync
的背压行为是根本原因。要添加更多的观察:
httpresponse
被及时使用(请参阅此处的描述)响应-实体-订阅-超时
,将出现以下错误(我删除了URL):[WARN][03/30/2019 13:44:58.984][default-akka.actor.default-dispatcher-16][default/pool(共享->http://....)][1(WaitingForResponseEntitySubscription)]响应实体在%1秒后未被订阅。确保读取响应实体正文或对其调用discardBytes()。GET...Empty->200 OK Chunked
IllegalStateException
终止流:java.lang.IllegalStateException:子流源不能多次物化
source.UnfoldAsync
部分)发出较少的需求信号。这将导致发出的请求较少。httpresponse
。在对这个问题的讨论中,也有人怀疑将Akka Http与Akka流结合是否是一个好主意。因此,我可能不得不更改实现,以便直接在UnfoldAsync
调用的函数内部执行解组操作。根据source.UnfoldAsync
的实现,只在拉出源时调用传入函数:
def onPull(): Unit = f(state).onComplete(asyncHandler)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
因此,如果下游没有拉(背压),传递给源的函数就不会被调用。
在您的要旨中,您使用了runforeach
(它与runwith(sink.foreach)
)在println
完成后立即拉动上游。所以很难注意到这里的背压。
我需要创建一个具有以下接口的函数: 我的问题是,我不知道如何定义符合上述接口的流。 当我做这种事的时候 结果类型为Flow[Item,OtherItem,NotUsed]。到目前为止,我还没有在Akka文档中找到任何东西。还有akka上的功能。流动scaladsl。流只提供“未使用”而不是控制。如果有人能给我指出正确的方向那就太好了。 一些背景:我需要设置几个只在转换部分区分的管道。这些管道是主流
如何使用项目Reactor背压功能与Kafka粘合剂在斯佩林云流? 如果我使用这种方式,比发布者发送延迟1秒的消息,但消费者消费消息没有任何延迟。 有可能在春雨云流中使用cunsumer上的BackPereSure吗?
当接收到消息时,它将运行,并将接收到的每个项发布到。 我怎么能那么做? 以防万一它可能会添加更多选项,请注意,另一个代码块是的Websocket处理程序。
我想要一个以给定的时间间隔计算函数并发出其输出的源。作为一种变通方法,我可以通过,但还没有找到更干净的方法。理想情况下,我会有 有什么想法吗?
这看起来不可思议,但我找不到源代码存储库。主github repo包含一个akka stream dir,但不包含当前的发布源。 目前,我设法通过发布:http://search.maven.org/remotecontent?filepath=com/typesafe/akka/akka-stream-experimental_2.11/2.0.1/akka-stream-experimenta
如何应用背压来限制生产比并行运行的更多的项目? 为了说明起见,这里有一个快速twitter用户名生成器、一个慢速twitter查找调用、一个慢速twitter文件编写器和一个打印方法。 最终目标是并行运行twitter查找,同时对生成器施加反压力,使其不会发出超出可处理范围的用户名(预计会有一些预取)。 这很好地在一个单独的线程上生成了5个twitter用户名 不确定它是正确的,但我的理由是,从一