override def receive: Receive = {
case GetTestData(p, id) =>
// Get the data and pipes it to itself through a message as recommended
// https://doc.akka.io/docs/akka-http/current/client-side/request-level.html
http.singleRequest(HttpRequest(uri = uri.format(p, id)))
.pipeTo(self)
case HttpResponse(StatusCodes.OK, _, entity, _) =>
val initialRes = entity.dataBytes.via(JsonFraming.objectScanner(Int.MaxValue)).map(bStr => ChunkStreamPart(bStr.utf8String))
// Forward the response to next job and pipes the request response to dedicated actor
http.singleRequest(HttpRequest(
method = HttpMethods.POST,
uri = "googl.cm/flow",
entity = HttpEntity.Chunked(ContentTypes.`application/json`,
initialRes)
))
case resp @ HttpResponse(code, _, _, _) =>
log.error("Request to test job failed, response code: " + code)
// Discard the flow to avoid backpressure
resp.discardEntityBytes()
case _ => log.warning("Unexpected message in TestJobActor")
}
这应该是一个与receive
等效的图形:
Http()
.cachedHostConnectionPool[Unit](uri.format(p, id))
.collect {
case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
val initialRes = entity.dataBytes
.via(JsonFraming.objectScanner(Int.MaxValue))
.map(bStr => ChunkStreamPart(bStr.utf8String))
Some(initialRes)
case (Success(resp @ HttpResponse(code, _, _, _)), _) =>
log.error("Request to test job failed, response code: " + code)
// Discard the flow to avoid backpressure
resp.discardEntityBytes()
None
}
.collect {
case Some(initialRes) => initialRes
}
.map { initialRes =>
(HttpRequest(
method = HttpMethods.POST,
uri = "googl.cm/flow",
entity = HttpEntity.Chunked(ContentTypes.`application/json`, initialRes)
),
())
}
.via(Http().superPool[Unit]())
它的类型是flow[(HttpRequest,Unit),(try[HttpReponse],Unit),HostConnectionPool]
,其中Unit
是一个相关ID,如果您想知道哪个请求对应于到达的响应,则可以使用它,并且HostConnectionPool
物化值可以用来关闭到主机的连接。只有cachedhostconnectionpool
返回这个物化值,superpool
可能会自己处理这个值(尽管我没有检查)。无论如何,我建议您在关闭应用程序时只使用http().ShutdownAllConnectionPools()
,除非出于某种原因需要这样做。根据我的经验,它更不容易出错(例如忘记关机)。
您还可以使用图形DSL来表示相同的图形:
val graph = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val host1Flow = b.add(Http().cachedHostConnectionPool[Unit](uri.format(p, id)))
val host2Flow = b.add(Http().superPool[Unit]())
val toInitialRes = b.add(
Flow[(Try[HttpResponse], Unit)]
.collect {
case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
val initialRes = entity.dataBytes
.via(JsonFraming.objectScanner(Int.MaxValue))
.map(bStr => ChunkStreamPart(bStr.utf8String))
Some(initialRes)
case (Success(resp @ HttpResponse(code, _, _, _)), _) =>
log.error("Request to test job failed, response code: " + code)
// Discard the flow to avoid backpressure
resp.discardEntityBytes()
None
}
)
val keepOkStatus = b.add(
Flow[Option[Source[HttpEntity.ChunkStreamPart, Any]]]
.collect {
case Some(initialRes) => initialRes
}
)
val toOtherHost = b.add(
Flow[Source[HttpEntity.ChunkStreamPart, Any]]
.map { initialRes =>
(HttpRequest(
method = HttpMethods.POST,
uri = "googl.cm/flow",
entity = HttpEntity.Chunked(ContentTypes.`application/json`, initialRes)
),
())
}
)
host1Flow ~> toInitialRes ~> keepOkStatus ~> toOtherHost ~> host2Flow
FlowShape(host1Flow.in, host2Flow.out)
})
我不确定这是否是正确的方法,或者即使这是一个好的方法,或者我是否应该使用一个actor与路由交互,使用ask模式,然后在actor内部流式处理所有内容。 有什么想法吗?
我的计算图的一个阶段是类型的流。显然,这个阶段应该为每个请求分配一个响应,并在所有请求都被解决后发出seq。 现在,底层API有一个苛刻的速率限制策略,所以我每秒只能激发一个请求。如果我有一个的单个,我可以使用每秒发出单个元素的来这个流(如何限制Akka流每秒只执行和发送一个消息一次?),但在这种情况下我没有看到类似的解决方案。 有什么好的表达方式吗?我想到的想法是使用低层图DSL并在那里使用一秒
我的服务代码如下所示, 在我的AKKA HTTP路由中,我尝试从返回的未来构建,如下所示, 我不确定如何提交给响应。被传递的未来实质上是一系列预期按顺序执行的平面映射的未来。但是,我不相信这会作为分块字节流返回到客户端。 然而,我只得到最后一个未来的结果如下, 亲切地问候Meeraj
这是我认为我应该用于这种方法的布局:并且为了适应404路由,还可以使用。现在,如果我的Akka流知识对我有用的话,我需要使用来处理这样的事情,然而,这就是我被困住的地方。 在中,我可以为不同的endpoint进行简单的映射和flatMap,但在流中,这意味着将流划分为多个流,我不太确定该如何进行。我想过使用UnzipWith和Options或通用广播。 如能在这方面提供任何帮助,将不胜感激。 如果
我过去成功地使用过Akka Streams,但是,我现在很难理解为什么Akka HTTP中定义了客户端Websocket流,并按照文档中显示的方式工作。 由于WebSocket连接允许全双工通信,我希望这样的连接在Akka HTTP中由两个独立的流表示,一个用于传入流量,一个用于传出流量。事实上,文件说明了以下内容: WebSocket由两个消息流组成[…] 它还指出,传入消息由表示,传出消息由表
我想使用akka-http-client作为流来链式http请求。链中的每个http请求都依赖于前一个请求的成功/响应,并使用它来构造一个新的请求。如果一个请求不成功,流应该返回不成功请求的响应。 如何在AKKA-HTTP中构造这样的流?我应该使用哪一个akka-http客户端级别的API?