当前位置: 首页 > 知识库问答 >
问题:

将Akka-http-client请求链接到流中

鲁明知
2023-03-14

我想使用akka-http-client作为流来链式http请求。链中的每个http请求都依赖于前一个请求的成功/响应,并使用它来构造一个新的请求。如果一个请求不成功,流应该返回不成功请求的响应。

如何在AKKA-HTTP中构造这样的流?我应该使用哪一个akka-http客户端级别的API?

共有1个答案

廖鸿达
2023-03-14

如果你正在做一个网络爬虫,看看这个帖子。这个答案处理一个更简单的情况,例如下载分页资源,其中到下一页的链接在当前页响应的一个标题中。

您可以使用source.UnfoldAsync方法创建一个链接源-其中一个项指向下一个项。这将接受一个函数,该函数接受元素S并返回future[option[(S,E)]]以确定流是否应继续发射E类型的元素,并将状态传递给下一次调用。

在你的例子中,这有点像:

  1. 获取初始httpRequest
  2. 生成将来[HttpResponse]
  3. 如果响应指向另一个URL,则返回some(请求->响应),否则返回none

但是,有一个问题,即如果流不包含指向下一个请求的指针,则不会从流中发出响应。

    null
import scala.concurrent.duration._

def convertToStrict(r: HttpResponse): Future[HttpResponse] =
  r.entity.toStrict(10.minutes).map(e => r.withEntity(e))

接下来,使用几个函数从httpResponse创建选项[HttpRequest]。此示例使用类似GitHub的分页链接的方案,其中links头包含,例如: rel=“next” :

def nextUri(r: HttpResponse): Seq[Uri] = for {
  linkHeader <- r.header[Link].toSeq
  value <- linkHeader.values
  params <- value.params if params.key == "rel" && params.value() == "next"
} yield value.uri

def getNextRequest(r: HttpResponse): Option[HttpRequest] =
  nextUri(r).headOption.map(next => HttpRequest(HttpMethods.GET, next))

接下来,我们将传递给UnfoldAsync的真实函数。它使用Akka HTTPHTTP().SingleRequest()API获取HttpRequest并生成Future[HttpResponse]:

def chainRequests(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] =
  reqOption match {
    case Some(req) => Http().singleRequest(req).flatMap { response =>
      // handle the error case. Here we just return the errored response
      // with no next item.
      if (response.status.isFailure()) Future.successful(Some(None -> response))

      // Otherwise, convert the response to a strict response by
      // taking up the body and looking for a next request.
      else convertToStrict(response).map { strictResponse =>
        getNextRequest(strictResponse) match {
          // If we have no next request, return Some containing an
          // empty state, but the current value
          case None => Some(None -> strictResponse)

          // Otherwise, pass on the request...
          case next => Some(next -> strictResponse)
        }
      }
    }
    // Finally, there's no next request, end the stream by
    // returning none as the state.
    case None => Future.successful(None)
  }

注意,如果我们得到错误的响应,流将不会继续,因为我们在下一个状态中返回none

您可以调用此命令来获取httpresponse对象流,如下所示:

val initialRequest = HttpRequest(HttpMethods.GET, "http://www.my-url.com")
Source.unfoldAsync[Option[HttpRequest], HttpResponse](
    Some(initialRequest)(chainRequests)

至于返回最后一个(或错误的)响应的值,您只需使用sink.last,因为流将在成功完成或在第一个错误的响应时结束。例如:

def getStatus: Future[StatusCode] = Source.unfoldAsync[Option[HttpRequest], HttpResponse](
      Some(initialRequest))(chainRequests)
    .map(_.status)
    .runWith(Sink.last)
 类似资料:
  • 我正在尝试使用circe在Akka-Http应用程序中执行我的JSON(de)序列化,而不是Spray-JSON。因此,我想使用指令和来获取请求主体的字符串表示,然后执行自己的反序列化。但是似乎太聪明了,它拒绝任何内容类型不为的东西。 我现在明白了,我误解了,它只会缩小而不会扩大解封程序将接受的内容类型范围。这就解释了为什么我的解决方案不起作用。

  • 我正在使用Akka 2.4.4并尝试从Apache HttpAsyncClient转移(未成功)。 还有代理支持是怎么回事?似乎对我也不起作用。

  • 我不确定这是否是正确的方法,或者即使这是一个好的方法,或者我是否应该使用一个actor与路由交互,使用ask模式,然后在actor内部流式处理所有内容。 有什么想法吗?

  • 如何从Akka HTTP路由向Akka Sink发送元素/消息?我的HTTP路由仍然需要返回正常的HTTP响应。 我想这需要一个支流/枢纽。正常的HTTP路由是来自HttpRequest的流- 下面是一个非常简单的单路由akka http应用程序。为了简单起见,我使用了一个简单的println水槽。我的生产用例显然将涉及一个不那么琐碎的水槽。 编辑:或者在使用低级akka http API时,如何

  • 先决条件 Apache Tomcat 7 Spring3.2.11。释放 Apache Camel 2.14.1 Camel HTTP4endpoint(Camel-HTTP4) 问题 我尝试通过http4组件调用https加密站点。位于我的服务器和Internet(目标服务器)之间的代理检查标头“User-Agent”并在请求为空时拒绝请求。 连接请求不包含http标头“User Agent”。