这是文档中的(简化的)示例
val poolClientFlow = Http()
.cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")
val queue =
Source.queue[(HttpRequest, Promise[HttpResponse])](
QueueSize, OverflowStrategy.dropNew
)
.via(poolClientFlow)
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))(Keep.left)
.run()
def queueRequest(request: HttpRequest): Future[HttpResponse] = {
val responsePromise = Promise[HttpResponse]()
queue.offer(request -> responsePromise).flatMap {
case QueueOfferResult.Enqueued => responsePromise.future
case QueueOfferResult.Dropped => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
case QueueOfferResult.Failure(ex) => Future.failed(ex)
case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
}
}
val responseFuture: Future[HttpResponse] = queueRequest(HttpRequest(uri = "/"))
文档指出使用source.single(request)
是一种反模式,应该避免。但是,它并没有阐明为什么使用source.queue
以及使用source.queue>会带来什么
在这里,我们前面展示了一个使用
source.single(request).via(pool).runwith(sink.head)
的示例。事实上,这是一个表现不佳的反模式。请使用队列或流式方式提供请求,如下所示。
null
这些是我们开始在应用程序中实现这种模式时出现的问题。
队列实现不是线程安全的。当我们在不同的路由/参与者中使用队列时,我们会遇到以下情况:
一个已排队的请求可以覆盖最近一个已排队的请求,从而导致一个未解决的未来。
AKKA/AKKA/Issues/23081号文件讨论了这一问题。队列实际上是线程安全的。
过滤请求时会发生什么?例如。当有人更改实现时
Source.queue[(HttpRequest, Promise[HttpResponse])](
QueueSize, OverflowStrategy.dropNew)
.via(poolClientFlow)
// only successful responses
.filter(_._1.isSuccess)
// failed won't arrive here
.to(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))
未来不会解决吗?对于单个请求流,这很简单:
Source.single(request).via(poolClientFlow).runWith(Sink.headOption)
queuesize
和max-open-requests
之间的区别并不清楚。最终,两者都是缓冲器。我们的实现最终使用queuesize==max-open-requests
到目前为止,我发现了使用
source.queue
而不是source.single
的两个原因
- 性能-仅物化一次流。然而,根据这个答案,这不应该是一个问题
- 显式配置背压和处理故障情况。在我看来,ConnectionPool对于过多的负载具有足够的处理能力。您可以在结果的将来进行映射并处理异常。
我会直接回答你的每一个问题,然后对整体问题做一个笼统的间接回答。
可能是业绩提升?
您认为每个incomingconnection
都有一个flow
是正确的,但如果连接有多个请求来自它,则仍然可以获得性能增益。
过滤请求时会发生什么?
通常,流在源元素和汇元素之间没有1:1的映射。可以是1:0(如您的示例所示),也可以是1:MANY(如果单个请求以某种方式产生多个响应)。
队列化vs max-open-request?
val queue = ??? //as in the example in your question
queue.offer(httpRequest1)
queue.offer(httpRequest2)
queue.offer(httpRequest3)
val allRequests = Iterable(httpRequest1, httpRequest2, httpRequest3)
//no queue necessary
val allResponses : Future[Seq[HttpResponse]] =
Source(allRequests)
.via(poolClientFlow)
.to(Sink.seq[HttpResponse])
.run()
现在就不需要担心队列、最大队列大小等问题了,所有的东西都捆绑在一个很好的紧凑流中。
即使请求的源是动态的,您通常仍然可以使用一个源。假设我们正在从控制台stdin获取请求路径,这仍然可以是一个完整的流:
import scala.io.{Source => ioSource}
val consoleLines : () => Iterator[String] =
() => ioSource.stdin.getLines()
Source
.fromIterator(consoleLines)
.map(consoleLine => HttpRequest(GET, uri = Uri(consoleLine)))
.via(poolClientFlow)
.to(Sink.foreach[HttpResponse](println))
.run()
现在,即使每一行都以随机的时间间隔输入到控制台中,流仍然可以在没有队列的情况下进行反应。
误差 你好,Gnana
问题内容: 我想让RasPi充当由三个Linux客户端组成的小型家庭网络中的小型打印和文件服务器。不幸的是,Brother只为x86提供二进制驱动程序,所以我不能在RasPi上运行Brother打印机。但是,我发现了一个博客条目,提议在RasPi的CUPS安装上创建一个原始队列,并使用客户端上安装的二进制驱动程序从客户端访问此队列。这是博客条目:http : //chemdroid.net/en/
ngrok客户端公开了一个REST API,它授予对以下各项的编程访问权限: 收集状态和指标信息 收集和重放捕获的请求 动态启动和停止隧道 基本URL和身份验证 Base URL http://127.0.0.1:4040/api Authentication None ngrok客户端API作为ngrok的本地Web检查接口的一部分公开。因为它在本地接口上提供,所以API没有身份验证。如果您覆盖
http://redis.cn/clients.html
Docusaurus 提供了一组客户端 API,对于构建网站很有帮助。 组件 <Head/> 这是一个可重用 React 组件,用于管理对 HTML 文档标头(即 <head> 中的标签)的修改。此组件接收纯 HTML 标签并输出纯 HTML 标签,对初学者很友好。此组件始对 React Helmet 的二次包装。 用法示例: import React from 'react'; import
问题内容: 是否可以在Node.js中获取 主机名 ? 这是我获取客户IP的方式: 那么,如何获得客户的主机名? 谢谢您的回复! 问题答案: 我认为唯一的方法是这样的: 但我建议您实际上并不需要它,这并不是您可以对信息做任何有用的事情。如果您只想让字符串标识用户的计算机,则可以进行一些处理。 如果您真正想要的是FQDN,那么我建议它仍然对您没有太大帮助,但为此您需要反向DNS查找。如果您使用的是V