当前位置: 首页 > 知识库问答 >
问题:

akka-stream+akka-http生命周期

谈萧迟
2023-03-14
// example of stream per request

val connectionFlow = Http().cachedHostConnectionPool[UUID](host, port)
val httpFlow: Flow[HttpRequest, Try[HttpResponse], NotUsed] =
    Flow[HttpRequest]
      .map(req => req -> UUID.randomUUID()) // I don't care about id because it's a single request per stream.
      .via(connectionFlow)
      .map { case (response, _) => response }

val result = Range(1 to 5).foreach{ i => {
  Source.single(i)
    .map(HttpRequest(...))
    .via(httpFlow)
    .mapAsync(1) {
       // response handling logic
    }
    .runWith(Sink.last)
})


// example of stream per request with long running http stream

// as defined in http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-with-a-queue
def queueRequest(request: HttpRequest): Future[HttpResponse]

val result = Range(1 to 5).foreach{ i => {
  Source.single(i)
    .map(HttpRequest(...))
    .mapAsync(1)(queueRequest)
    .mapAsync(1) {
       // somehow reconcile request with response?
       // response handling logic
    }
    .runWith(Sink.last)
})

我确实尝试实现了这两种解决方案,但在实现的每个阶段都有许多设计选择,因此即使在一条“正确”的道路上,似乎也很容易搞砸。

1虽然我相信它是可以忽略不计的,而且是akka-http服务器运行的相同方式。

共有1个答案

司徒高寒
2023-03-14

通常,使用单个连接flow并通过该单个流分派所有请求要好得多。主要原因是,新的物化可能会导致每次形成新的连接(这取决于您的连接池设置)。

您认为这会导致一些并发症,这是正确的:

排序:通过提供一个随机的uuid作为传递给连接流的元组中的第二个值,您将消除将请求与响应相关联的能力。元组中额外的t值可以用作“相关ID”,以了解您从流中获得的httpresponse。在您的特定示例中,您可以使用创建的范围中的初始int:

val responseSource : Source[(Try[HttpResponse], Int), _] = 
  Source
    .fromIterator( () => Iterator range (0,5) )
    .map(i => HttpRequest(...) -> i)
    .via(connectionFlow)
 类似资料:
  • 关于如何解决此错误的建议,以便我可以使用最新版本的akka、akka streams和akka HTTP?谢了!

  • 我不确定这是否是正确的方法,或者即使这是一个好的方法,或者我是否应该使用一个actor与路由交互,使用ask模式,然后在actor内部流式处理所有内容。 有什么想法吗?

  • 我们有一个Akka HTTP服务器在AWS上提供一些scala JS内容。我们注意到,过了一段时间后,服务器开始在日志中抛出下面的错误,尽管某些URL有效,但有些文件无法正确下载,浏览器中出现错误。服务器日志如下所示(在所有实例中都是相同的错误): [ERROR][09/29/2016 21:29:22.150][designer-actor-system-akka.actor.default-d

  • 我试图用akka-http测试一个TypedActor,但在尝试创建测试用例时遇到了一些问题。为了测试TypedActor,我将编写以下规范... 但是,当我必须编写一个与HTTP/+WS路由一起使用的TypedActor时,我无法编写... 我如何才能编写一个同时使用这两种测试呢? 请指教。