// example of stream per request
val connectionFlow = Http().cachedHostConnectionPool[UUID](host, port)
val httpFlow: Flow[HttpRequest, Try[HttpResponse], NotUsed] =
Flow[HttpRequest]
.map(req => req -> UUID.randomUUID()) // I don't care about id because it's a single request per stream.
.via(connectionFlow)
.map { case (response, _) => response }
val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.via(httpFlow)
.mapAsync(1) {
// response handling logic
}
.runWith(Sink.last)
})
// example of stream per request with long running http stream
// as defined in http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-with-a-queue
def queueRequest(request: HttpRequest): Future[HttpResponse]
val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.mapAsync(1)(queueRequest)
.mapAsync(1) {
// somehow reconcile request with response?
// response handling logic
}
.runWith(Sink.last)
})
我确实尝试实现了这两种解决方案,但在实现的每个阶段都有许多设计选择,因此即使在一条“正确”的道路上,似乎也很容易搞砸。
1虽然我相信它是可以忽略不计的,而且是akka-http服务器运行的相同方式。
通常,使用单个连接flow
并通过该单个流分派所有请求要好得多。主要原因是,新的物化可能会导致每次形成新的连接
(这取决于您的连接池设置)。
您认为这会导致一些并发症,这是正确的:
排序:通过提供一个随机的uuid
作为传递给连接流的元组中的第二个值,您将消除将请求与响应相关联的能力。元组中额外的t
值可以用作“相关ID”,以了解您从流中获得的httpresponse
。在您的特定示例中,您可以使用创建的范围
中的初始int
:
val responseSource : Source[(Try[HttpResponse], Int), _] =
Source
.fromIterator( () => Iterator range (0,5) )
.map(i => HttpRequest(...) -> i)
.via(connectionFlow)
关于如何解决此错误的建议,以便我可以使用最新版本的akka、akka streams和akka HTTP?谢了!
我不确定这是否是正确的方法,或者即使这是一个好的方法,或者我是否应该使用一个actor与路由交互,使用ask模式,然后在actor内部流式处理所有内容。 有什么想法吗?
我们有一个Akka HTTP服务器在AWS上提供一些scala JS内容。我们注意到,过了一段时间后,服务器开始在日志中抛出下面的错误,尽管某些URL有效,但有些文件无法正确下载,浏览器中出现错误。服务器日志如下所示(在所有实例中都是相同的错误): [ERROR][09/29/2016 21:29:22.150][designer-actor-system-akka.actor.default-d
我试图用akka-http测试一个TypedActor,但在尝试创建测试用例时遇到了一些问题。为了测试TypedActor,我将编写以下规范... 但是,当我必须编写一个与HTTP/+WS路由一起使用的TypedActor时,我无法编写... 我如何才能编写一个同时使用这两种测试呢? 请指教。