当前位置: 首页 > 知识库问答 >
问题:

Akka流在通过连接池发出http请求时挂起

苏嘉志
2023-03-14

我正在使用Akka 2.4.4并尝试从Apache HttpAsyncClient转移(未成功)。

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.Referer
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.Supervision.Decider
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorAttributes, Supervision}
import com.typesafe.config.ConfigFactory

import scala.collection.immutable.{Seq => imSeq}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.util.Try

object Main {

  implicit val system = ActorSystem("root")
  implicit val executor = system.dispatcher
  val config = ConfigFactory.load()

  private val baseDomain = "www.google.com"
  private val poolClientFlow = Http()(system).cachedHostConnectionPool[Any](baseDomain, 80, ConnectionPoolSettings(config))

  private val decider: Decider = {
    case ex =>
      ex.printStackTrace()
      Supervision.Stop
  }

  private def sendMultipleRequests[T](items: Seq[(HttpRequest, T)]): Future[Seq[(Try[HttpResponse], T)]] =

    Source.fromIterator(() => items.toIterator)
      .via(poolClientFlow)
      .log("Logger")(log = myAdapter)
      .recoverWith {
        case ex =>
          println(ex)
          null
      }
      .withAttributes(ActorAttributes.supervisionStrategy(decider))
      .runWith(Sink.seq)
      .map { v =>
        println(s"Got ${v.length} responses in Flow")
        v.asInstanceOf[Seq[(Try[HttpResponse], T)]]
      }

  def main(args: Array[String]) {

    val headers = imSeq(Referer("https://www.google.com/"))
    val reqPair = HttpRequest(uri = "/intl/en/policies/privacy").withHeaders(headers) -> "some req ID"
    val requests = List.fill(10)(reqPair)
    val qwe = sendMultipleRequests(requests).map { case responses =>
      println(s"Got ${responses.length} responses")

      system.terminate()
    }

    Await.ready(system.whenTerminated, Duration.Inf)
  }
}

还有代理支持是怎么回事?似乎对我也不起作用。

共有1个答案

薛滨海
2023-03-14

您需要完全使用响应的主体,以便连接可用于后续请求。如果您根本不关心响应实体,那么您可以将它排到sink.ignore,如下所示:

resp.entity.dataBytes.runWith(Sink.ignore)

根据默认配置,使用主机连接池时,最大连接数设置为4。每个池都有自己的队列,请求在其中等待,直到其中一个打开的连接变得可用。如果队列超过32(默认配置,可以更改,必须是2的幂),那么yo将开始看到失败。在您的情况下,您只执行了10个请求,所以您没有达到这个限制。但是如果不使用响应实体,就不会释放连接,其他的东西都排在后面,等待连接释放。

 类似资料:
  • 在客户机工厂内部,我定义了断路器策略,并尝试使用该策略执行上面的lambda,如下所示 作为一个整体,我对lambda是相当陌生的,将它作为函数传递会变得更加混乱。如何配置函数和第一行代码来执行客户机并返回HttpResponSemessage?我认为或不正确

  • 我想使用akka-http-client作为流来链式http请求。链中的每个http请求都依赖于前一个请求的成功/响应,并使用它来构造一个新的请求。如果一个请求不成功,流应该返回不成功请求的响应。 如何在AKKA-HTTP中构造这样的流?我应该使用哪一个akka-http客户端级别的API?

  • 第一种情况下的错误: java.net.ConnectException:连接超时:连接在java.net.DualStackPlainSockeTimpl.Connect0(本机方法)在java.net.DualStackPlainSockeTimpl.SocketConnect(未知源)在java.net.AbstractPlainSockeTimpl.SocketConnect(未知源)在j

  • 问题内容: 当我通过套接字发送正常的HTTP请求时,服务器不会以OK响应进行响应。我从Firefox复制了HTTP标头。这是代码: 但是,这是我收到的回复: 我知道我可以使用来做到这一点,但是当我手动发送HTTP请求时,服务器为什么不识别HTTP请求? 问题答案: 两件事情: 您应该使用而不是将条目打印到单独的行。 HTTP请求应以空白行(link)结尾。所以加

  • 我正在尝试让客户端/服务器程序通过ssl交换http消息。首先,我创建了使用DefaultHttpRequest成功交换http请求的客户端和服务器程序。发送请求的代码如下所示: 客户端管道工厂包含以下内容: 服务器管道工厂包含以下内容: .... 到目前为止一切顺利。客户端发送请求,服务器接收并解码请求。使用正确的数据调用我的处理程序上的messageReceived方法。 为了启用 SSL,我

  • 我正在尝试使用circe在Akka-Http应用程序中执行我的JSON(de)序列化,而不是Spray-JSON。因此,我想使用指令和来获取请求主体的字符串表示,然后执行自己的反序列化。但是似乎太聪明了,它拒绝任何内容类型不为的东西。 我现在明白了,我误解了,它只会缩小而不会扩大解封程序将接受的内容类型范围。这就解释了为什么我的解决方案不起作用。