如何从Akka HTTP路由向Akka Sink发送元素/消息?我的HTTP路由仍然需要返回正常的HTTP响应。
我想这需要一个支流/枢纽。正常的HTTP路由是来自HttpRequest的流-
下面是一个非常简单的单路由akka http应用程序。为了简单起见,我使用了一个简单的println水槽。我的生产用例显然将涉及一个不那么琐碎的水槽。
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem("my-akka-http-test")
val executor = actorSystem.dispatcher
implicit val materializer = ActorMaterializer()(actorSystem)
// I would like to send elements to this sink in response to HTTP GET operations.
val sink: Sink[Any, Future[Done]] = Sink.foreach(println)
val route: akka.http.scaladsl.server.Route =
path("hello" / Segment) { p =>
get {
// I'd like to send a message to an Akka Sink as well as return an HTTP response.
complete {
s"<h1>Say hello to akka-http. p=$p</h1>"
}
}
}
val httpExt: akka.http.scaladsl.HttpExt = Http(actorSystem)
val bindingFuture = httpExt.bindAndHandle(RouteResult.route2HandlerFlow(route), "localhost", 8080)
println("Server online at http://localhost:8080/")
println("Press RETURN to stop...")
scala.io.StdIn.readLine()
bindingFuture
.flatMap(_.unbind())(executor) // trigger unbinding from the port
.onComplete(_ => Await.result(actorSystem.terminate(), Duration.Inf))(executor) // and shutdown when done
}
编辑:或者在使用低级akka http API时,如何从特定的路由处理程序向接收器发送特定消息?
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem("my-akka-http-test")
val executor = actorSystem.dispatcher
implicit val materializer = ActorMaterializer()(actorSystem)
// I would like to send elements to this sink in response to HTTP GET operations.
val sink: Sink[Any, Future[Done]] = Sink.foreach(println)
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
HttpResponse(entity = HttpEntity(
ContentTypes.`text/html(UTF-8)`,
"<html><body>Hello world!</body></html>"))
case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
HttpResponse(entity = "PONG!")
case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
sys.error("BOOM!")
case r: HttpRequest =>
r.discardEntityBytes() // important to drain incoming HTTP Entity stream
HttpResponse(404, entity = "Unknown resource!")
}
val serverSource = Http().bind(interface = "localhost", port = 8080)
val bindingFuture: Future[Http.ServerBinding] =
serverSource.to(Sink.foreach { connection =>
println("Accepted new connection from " + connection.remoteAddress)
connection handleWithSyncHandler requestHandler
// this is equivalent to
// connection handleWith { Flow[HttpRequest] map requestHandler }
}).run()
println("Server online at http://localhost:8080/")
println("Press RETURN to stop...")
scala.io.StdIn.readLine()
bindingFuture
.flatMap(_.unbind())(executor) // trigger unbinding from the port
.onComplete(_ => Await.result(actorSystem.terminate(), Duration.Inf))(executor) // and shutdown when done
}
这是我使用的似乎很理想的解决方案。Akka Http似乎是为了让您的路由简单而设计的HttpRequest-
我没有将所有内容都构建到一个Akka流图中,而是有一个单独的队列源-
object HttpWithSinkTest {
def buildQueueSourceGraph(): RunnableGraph[(SourceQueueWithComplete[String], Future[Done])] = {
val annotateMessage: Flow[String, String, NotUsed] = Flow.fromFunction[String, String](s => s"got message from queue: $s")
val sourceQueue = Source.queue[String](100, OverflowStrategy.dropNew)
val sink: Sink[String, Future[Done]] = Sink.foreach(println)
val annotatedSink = annotateMessage.toMat(sink)(Keep.right)
val queueGraph = sourceQueue.toMat(annotatedSink)(Keep.both)
queueGraph
}
def buildHttpFlow(queue: SourceQueueWithComplete[String],
actorSystem: ActorSystem, materializer: ActorMaterializer): Flow[HttpRequest, HttpResponse, NotUsed] = {
implicit val actorSystemI = actorSystem
implicit val materializerI = materializer
val route: akka.http.scaladsl.server.Route =
path("hello" / Segment) { p =>
get {
complete {
queue.offer(s"got http event p=$p")
s"<h1>Say hello to akka-http. p=$p</h1>"
}
}
}
val routeFlow = RouteResult.route2HandlerFlow(route)
routeFlow
}
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystem("my-akka-http-test")
val executor = actorSystem.dispatcher
implicit val materializer = ActorMaterializer()(actorSystem)
val (queue, _) = buildQueueSourceGraph().run()(materializer)
val httpFlow = buildHttpFlow(queue, actorSystem, materializer)
val httpExt: akka.http.scaladsl.HttpExt = Http(actorSystem)
val bindingFuture = httpExt.bindAndHandle(httpFlow, "localhost", 8080)
println("Server online at http://localhost:8080/")
println("Press RETURN to stop...")
scala.io.StdIn.readLine()
println("Shutting down...")
val serverBinding = Await.result(bindingFuture, Duration.Inf)
Await.result(serverBinding.unbind(), Duration.Inf)
Await.result(actorSystem.terminate(), Duration.Inf)
println("Done. Exiting")
}
}
如果您想将整个HttpRequest
发送到您的接收器,我会说最简单的方法是使用alsoTo
组合器。结果将大致如下
val mySink: Sink[HttpRequest, NotUsed] = ???
val handlerFlow = Flow[HttpRequest].alsoTo(mySink).via(RouteResult.route2HandlerFlow(route))
val bindingFuture = Http().bindAndHandle(handlerFlow, "localhost", 8080)
仅供参考:alsoTo
实际上隐藏了一个广播
舞台。
如果您需要有选择地从特定的子例程向接收器发送消息,那么除了为每个传入的请求具体化一个新的流之外,您别无选择。见下面的例子
val sink: Sink[Any, Future[Done]] = Sink.foreach(println)
val route: akka.http.scaladsl.server.Route =
path("hello" / Segment) { p =>
get {
(extract(_.request) & extractMaterializer) { (req, mat) ⇒
Source.single(req).runWith(sink)(mat)
complete {
s"<h1>Say hello to akka-http. p=$p</h1>"
}
}
}
}
此外,请记住,您始终可以完全抛弃高级DSL,并使用低级DSL对整个路线进行建模。这将导致更详细的代码,但会让您完全控制流的具体化。
编辑:下面的例子
val sink: Sink[Any, Future[Done]] = Sink.foreach(println)
val handlerFlow =
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val partition = b.add(Partition[HttpRequest](2, {
case HttpRequest(GET, Uri.Path("/"), _, _, _) ⇒ 0
case _ ⇒ 1
}))
val merge = b.add(Merge[HttpResponse](2))
val happyPath = Flow[HttpRequest].map{ req ⇒
HttpResponse(entity = HttpEntity(
ContentTypes.`text/html(UTF-8)`,
"<html><body>Hello world!</body></html>"))
}
val unhappyPath = Flow[HttpRequest].map{
case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
HttpResponse(entity = "PONG!")
case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
sys.error("BOOM!")
case r: HttpRequest =>
r.discardEntityBytes() // important to drain incoming HTTP Entity stream
HttpResponse(404, entity = "Unknown resource!")
}
partition.out(0).alsoTo(sink) ~> happyPath ~> merge
partition.out(1) ~> unhappyPath ~> merge
FlowShape(partition.in, merge.out)
})
val bindingFuture = Http().bindAndHandle(handlerFlow, "localhost", 8080)
我正在开发一个使用Akka Http和Akka流的客户机-服务器应用程序。主要思想是服务器必须使用来自Akka Streams的源来提供http响应。 问题是服务器在向客户机发送第一条消息之前积累了一些元素。但是,我需要服务器在源生成新元素时立即发送元素到元素。 我同时获得所有元素,而不是每2秒接收一个元素。 有什么想法可以“强制”服务器发送每个元素,因为它是从源头出来的吗?
主要的类别是: 路由的执行元类为:
在以下React应用程序中,有两个路由URLhttp://myapp 正确布线到布局构件。但是,URLhttp://myapp/login 也路由到布局组件,而不是登录。如果我将path=“/login”更改为“/sign”,它将正确路由到登录组件。 React路由器中的“/login”路径将其路由到路由是否有特殊之处?或者我设置这个路由的方式有错误吗?
我正在使用最新版本的angular 9和最新版本的nodejs。下面是示例代码 来自app.component From app.component.ts 来自 app-routing.module 来自app.module 类属性 状态-component.ts 这就是问题所在。 如何使用路由器将整个对象传递给组件?(注意对象在应用程序组件中初始化) 如何在状态组件的“ngOnInit”方法中接
我正在通过一些示例学习Akka HTTP堆栈来创建一个新的REST项目(完全非UI)。我一直在使用和扩展Akka HTTP微服务示例,以通过一系列用例和配置来工作,并对Scala和Akka HTTP的良好工作感到惊喜。 目前我有一个这样的配置: 参数只是一个简单的值,其中包含使用、等的典型数据。 有没有什么方法可以在多个Scala文件或某个示例中设置路由? 这可能是我想得太多了,因为我是如何在Ja