当前位置: 首页 > 知识库问答 >
问题:

http=>akka流=>http

王骏
2023-03-14
override def receive: Receive = {
   case GetTestData(p, id) =>
     // Get the data and pipes it to itself through a message as recommended
     // https://doc.akka.io/docs/akka-http/current/client-side/request-level.html
     http.singleRequest(HttpRequest(uri = uri.format(p, id)))
       .pipeTo(self)

   case HttpResponse(StatusCodes.OK, _, entity, _) =>
     val initialRes = entity.dataBytes.via(JsonFraming.objectScanner(Int.MaxValue)).map(bStr => ChunkStreamPart(bStr.utf8String))

     // Forward the response to next job and pipes the request response to dedicated actor
     http.singleRequest(HttpRequest(
       method = HttpMethods.POST,
       uri = "googl.cm/flow",
       entity = HttpEntity.Chunked(ContentTypes.`application/json`, 
       initialRes)
     ))


   case resp @ HttpResponse(code, _, _, _) =>
     log.error("Request to test job failed, response code: " + code)
     // Discard the flow to avoid backpressure
     resp.discardEntityBytes()

   case _ => log.warning("Unexpected message in TestJobActor")
 }

共有1个答案

庄子平
2023-03-14

这应该是一个与receive等效的图形:

Http()
.cachedHostConnectionPool[Unit](uri.format(p, id))
.collect {
  case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
    val initialRes = entity.dataBytes
      .via(JsonFraming.objectScanner(Int.MaxValue))
      .map(bStr => ChunkStreamPart(bStr.utf8String))
    Some(initialRes)

  case (Success(resp @ HttpResponse(code, _, _, _)), _) =>
    log.error("Request to test job failed, response code: " + code)
    // Discard the flow to avoid backpressure
    resp.discardEntityBytes()
    None
}
.collect {
  case Some(initialRes) => initialRes
}
.map { initialRes =>
  (HttpRequest(
     method = HttpMethods.POST,
     uri = "googl.cm/flow",
     entity = HttpEntity.Chunked(ContentTypes.`application/json`, initialRes)
   ),
   ())
}
.via(Http().superPool[Unit]())

它的类型是flow[(HttpRequest,Unit),(try[HttpReponse],Unit),HostConnectionPool],其中Unit是一个相关ID,如果您想知道哪个请求对应于到达的响应,则可以使用它,并且HostConnectionPool物化值可以用来关闭到主机的连接。只有cachedhostconnectionpool返回这个物化值,superpool可能会自己处理这个值(尽管我没有检查)。无论如何,我建议您在关闭应用程序时只使用http().ShutdownAllConnectionPools(),除非出于某种原因需要这样做。根据我的经验,它更不容易出错(例如忘记关机)。

您还可以使用图形DSL来表示相同的图形:

val graph = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._

    val host1Flow = b.add(Http().cachedHostConnectionPool[Unit](uri.format(p, id)))
    val host2Flow = b.add(Http().superPool[Unit]())

    val toInitialRes = b.add(
      Flow[(Try[HttpResponse], Unit)]
        .collect {
          case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
            val initialRes = entity.dataBytes
              .via(JsonFraming.objectScanner(Int.MaxValue))
              .map(bStr => ChunkStreamPart(bStr.utf8String))
            Some(initialRes)

          case (Success(resp @ HttpResponse(code, _, _, _)), _) =>
            log.error("Request to test job failed, response code: " + code)
            // Discard the flow to avoid backpressure
            resp.discardEntityBytes()
            None
        }
    )

    val keepOkStatus = b.add(
      Flow[Option[Source[HttpEntity.ChunkStreamPart, Any]]]
        .collect {
          case Some(initialRes) => initialRes
        }
    )

    val toOtherHost = b.add(
      Flow[Source[HttpEntity.ChunkStreamPart, Any]]
        .map { initialRes =>
          (HttpRequest(
             method = HttpMethods.POST,
             uri = "googl.cm/flow",
             entity = HttpEntity.Chunked(ContentTypes.`application/json`, initialRes)
           ),
           ())
        }
    )

    host1Flow ~> toInitialRes ~> keepOkStatus ~> toOtherHost ~> host2Flow

    FlowShape(host1Flow.in, host2Flow.out)
})
 类似资料:
  • 我不确定这是否是正确的方法,或者即使这是一个好的方法,或者我是否应该使用一个actor与路由交互,使用ask模式,然后在actor内部流式处理所有内容。 有什么想法吗?

  • 我的计算图的一个阶段是类型的流。显然,这个阶段应该为每个请求分配一个响应,并在所有请求都被解决后发出seq。 现在,底层API有一个苛刻的速率限制策略,所以我每秒只能激发一个请求。如果我有一个的单个,我可以使用每秒发出单个元素的来这个流(如何限制Akka流每秒只执行和发送一个消息一次?),但在这种情况下我没有看到类似的解决方案。 有什么好的表达方式吗?我想到的想法是使用低层图DSL并在那里使用一秒

  • 我的服务代码如下所示, 在我的AKKA HTTP路由中,我尝试从返回的未来构建,如下所示, 我不确定如何提交给响应。被传递的未来实质上是一系列预期按顺序执行的平面映射的未来。但是,我不相信这会作为分块字节流返回到客户端。 然而,我只得到最后一个未来的结果如下, 亲切地问候Meeraj

  • 这是我认为我应该用于这种方法的布局:并且为了适应404路由,还可以使用。现在,如果我的Akka流知识对我有用的话,我需要使用来处理这样的事情,然而,这就是我被困住的地方。 在中,我可以为不同的endpoint进行简单的映射和flatMap,但在流中,这意味着将流划分为多个流,我不太确定该如何进行。我想过使用UnzipWith和Options或通用广播。 如能在这方面提供任何帮助,将不胜感激。 如果

  • 我过去成功地使用过Akka Streams,但是,我现在很难理解为什么Akka HTTP中定义了客户端Websocket流,并按照文档中显示的方式工作。 由于WebSocket连接允许全双工通信,我希望这样的连接在Akka HTTP中由两个独立的流表示,一个用于传入流量,一个用于传出流量。事实上,文件说明了以下内容: WebSocket由两个消息流组成[…] 它还指出,传入消息由表示,传出消息由表

  • 我想使用akka-http-client作为流来链式http请求。链中的每个http请求都依赖于前一个请求的成功/响应,并使用它来构造一个新的请求。如果一个请求不成功,流应该返回不成功请求的响应。 如何在AKKA-HTTP中构造这样的流?我应该使用哪一个akka-http客户端级别的API?