当前位置: 首页 > 知识库问答 >
问题:

Akka Http-主机级客户端API源.队列模式

淳于凯
2023-03-14

这是文档中的(简化的)示例

val poolClientFlow = Http()
  .cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")

val queue =
  Source.queue[(HttpRequest, Promise[HttpResponse])](
     QueueSize, OverflowStrategy.dropNew
  )
    .via(poolClientFlow)
    .toMat(Sink.foreach({
      case ((Success(resp), p)) => p.success(resp)
      case ((Failure(e), p))    => p.failure(e)
    }))(Keep.left)
    .run()

def queueRequest(request: HttpRequest): Future[HttpResponse] = {
  val responsePromise = Promise[HttpResponse]()
  queue.offer(request -> responsePromise).flatMap {
    case QueueOfferResult.Enqueued    => responsePromise.future
    case QueueOfferResult.Dropped     => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
    case QueueOfferResult.Failure(ex) => Future.failed(ex)
    case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
  }
}

val responseFuture: Future[HttpResponse] = queueRequest(HttpRequest(uri = "/"))

文档指出使用source.single(request)是一种反模式,应该避免。但是,它并没有阐明为什么使用source.queue以及使用source.queue>会带来什么

在这里,我们前面展示了一个使用source.single(request).via(pool).runwith(sink.head)示例。事实上,这是一个表现不佳的反模式。请使用队列或流式方式提供请求,如下所示。

    null

这些是我们开始在应用程序中实现这种模式时出现的问题。

队列实现不是线程安全的。当我们在不同的路由/参与者中使用队列时,我们会遇到以下情况:

一个已排队的请求可以覆盖最近一个已排队的请求,从而导致一个未解决的未来。

AKKA/AKKA/Issues/23081号文件讨论了这一问题。队列实际上是线程安全的。

过滤请求时会发生什么?例如。当有人更改实现时

Source.queue[(HttpRequest, Promise[HttpResponse])](
    QueueSize, OverflowStrategy.dropNew)
  .via(poolClientFlow)
  // only successful responses
  .filter(_._1.isSuccess)
  // failed won't arrive here
  .to(Sink.foreach({
    case ((Success(resp), p)) => p.success(resp)
    case ((Failure(e), p)) => p.failure(e)
  }))

未来不会解决吗?对于单个请求流,这很简单:

Source.single(request).via(poolClientFlow).runWith(Sink.headOption)

queuesizemax-open-requests之间的区别并不清楚。最终,两者都是缓冲器。我们的实现最终使用queuesize==max-open-requests

到目前为止,我发现了使用source.queue而不是source.single的两个原因

  1. 性能-仅物化一次流。然而,根据这个答案,这不应该是一个问题
  2. 显式配置背压和处理故障情况。在我看来,ConnectionPool对于过多的负载具有足够的处理能力。您可以在结果的将来进行映射并处理异常。

共有1个答案

翁文康
2023-03-14

我会直接回答你的每一个问题,然后对整体问题做一个笼统的间接回答。

可能是业绩提升?

您认为每个incomingconnection都有一个flow是正确的,但如果连接有多个请求来自它,则仍然可以获得性能增益。

过滤请求时会发生什么?

通常,流在源元素和汇元素之间没有1:1的映射。可以是1:0(如您的示例所示),也可以是1:MANY(如果单个请求以某种方式产生多个响应)。

队列化vs max-open-request?

val queue = ??? //as in the example in your question

queue.offer(httpRequest1)
queue.offer(httpRequest2)
queue.offer(httpRequest3)
val allRequests = Iterable(httpRequest1, httpRequest2, httpRequest3)

//no queue necessary
val allResponses : Future[Seq[HttpResponse]] = 
  Source(allRequests)
    .via(poolClientFlow)
    .to(Sink.seq[HttpResponse])
    .run()

现在就不需要担心队列、最大队列大小等问题了,所有的东西都捆绑在一个很好的紧凑流中。

即使请求的源是动态的,您通常仍然可以使用一个源。假设我们正在从控制台stdin获取请求路径,这仍然可以是一个完整的流:

import scala.io.{Source => ioSource}

val consoleLines : () => Iterator[String] = 
  () => ioSource.stdin.getLines()

Source
  .fromIterator(consoleLines)
  .map(consoleLine => HttpRequest(GET, uri = Uri(consoleLine)))
  .via(poolClientFlow)
  .to(Sink.foreach[HttpResponse](println))
  .run()

现在,即使每一行都以随机的时间间隔输入到控制台中,流仍然可以在没有队列的情况下进行反应。

 类似资料:
  • 问题内容: 我想让RasPi充当由三个Linux客户端组成的小型家庭网络中的小型打印和文件服务器。不幸的是,Brother只为x86提供二进制驱动程序,所以我不能在RasPi上运行Brother打印机。但是,我发现了一个博客条目,提议在RasPi的CUPS安装上创建一个原始队列,并使用客户端上安装的二进制驱动程序从客户端访问此队列。这是博客条目:http : //chemdroid.net/en/

  • ngrok客户端公开了一个REST API,它授予对以下各项的编程访问权限: 收集状态和指标信息 收集和重放捕获的请求 动态启动和停止隧道 基本URL和身份验证 Base URL http://127.0.0.1:4040/api Authentication None ngrok客户端API作为ngrok的本地Web检查接口的一部分公开。因为它在本地接口上提供,所以API没有身份验证。如果您覆盖

  • http://redis.cn/clients.html

  • Docusaurus 提供了一组客户端 API,对于构建网站很有帮助。 组件 <Head/> 这是一个可重用 React 组件,用于管理对 HTML 文档标头(即 <head> 中的标签)的修改。此组件接收纯 HTML 标签并输出纯 HTML 标签,对初学者很友好。此组件始对 React Helmet 的二次包装。 用法示例: import React from 'react'; import

  • 问题内容: 是否可以在Node.js中获取 主机名 ? 这是我获取客户IP的方式: 那么,如何获得客户的主机名? 谢谢您的回复! 问题答案: 我认为唯一的方法是这样的: 但我建议您实际上并不需要它,这并不是您可以对信息做任何有用的事情。如果您只想让字符串标识用户的计算机,则可以进行一些处理。 如果您真正想要的是FQDN,那么我建议它仍然对您没有太大帮助,但为此您需要反向DNS查找。如果您使用的是V