当前位置: 首页 > 知识库问答 >
问题:

akka http:从http路由将元素发送到akka接收器

柴衡
2023-03-14

如何从Akka HTTP路由向Akka Sink发送元素/消息?我的HTTP路由仍然需要返回正常的HTTP响应。

我想这需要一个支流/枢纽。正常的HTTP路由是来自HttpRequest的流-

下面是一个非常简单的单路由akka http应用程序。为了简单起见,我使用了一个简单的println水槽。我的生产用例显然将涉及一个不那么琐碎的水槽。

def main(args: Array[String]): Unit = {
  implicit val actorSystem = ActorSystem("my-akka-http-test")
  val executor = actorSystem.dispatcher
  implicit val materializer = ActorMaterializer()(actorSystem)

  // I would like to send elements to this sink in response to HTTP GET operations.
  val sink: Sink[Any, Future[Done]] = Sink.foreach(println)

  val route: akka.http.scaladsl.server.Route =
    path("hello" / Segment) { p =>
      get {
        // I'd like to send a message to an Akka Sink as well as return an HTTP response.
        complete {
          s"<h1>Say hello to akka-http. p=$p</h1>"
        }
      }
    }

  val httpExt: akka.http.scaladsl.HttpExt = Http(actorSystem)
  val bindingFuture = httpExt.bindAndHandle(RouteResult.route2HandlerFlow(route), "localhost", 8080)

  println("Server online at http://localhost:8080/")
  println("Press RETURN to stop...")
  scala.io.StdIn.readLine()

  bindingFuture
    .flatMap(_.unbind())(executor) // trigger unbinding from the port
    .onComplete(_ => Await.result(actorSystem.terminate(), Duration.Inf))(executor) // and shutdown when done
}

编辑:或者在使用低级akka http API时,如何从特定的路由处理程序向接收器发送特定消息?

def main(args: Array[String]): Unit = {
  implicit val actorSystem = ActorSystem("my-akka-http-test")
  val executor = actorSystem.dispatcher
  implicit val materializer = ActorMaterializer()(actorSystem)

  // I would like to send elements to this sink in response to HTTP GET operations.
  val sink: Sink[Any, Future[Done]] = Sink.foreach(println)

  val requestHandler: HttpRequest => HttpResponse = {
    case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
      HttpResponse(entity = HttpEntity(
        ContentTypes.`text/html(UTF-8)`,
        "<html><body>Hello world!</body></html>"))

    case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
      HttpResponse(entity = "PONG!")

    case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
      sys.error("BOOM!")

    case r: HttpRequest =>
      r.discardEntityBytes() // important to drain incoming HTTP Entity stream
      HttpResponse(404, entity = "Unknown resource!")
  }

  val serverSource = Http().bind(interface = "localhost", port = 8080)

  val bindingFuture: Future[Http.ServerBinding] =
    serverSource.to(Sink.foreach { connection =>
      println("Accepted new connection from " + connection.remoteAddress)

      connection handleWithSyncHandler requestHandler
      // this is equivalent to
      // connection handleWith { Flow[HttpRequest] map requestHandler }
    }).run()

  println("Server online at http://localhost:8080/")
  println("Press RETURN to stop...")
  scala.io.StdIn.readLine()

  bindingFuture
    .flatMap(_.unbind())(executor) // trigger unbinding from the port
    .onComplete(_ => Await.result(actorSystem.terminate(), Duration.Inf))(executor) // and shutdown when done
}

共有2个答案

仰英发
2023-03-14

这是我使用的似乎很理想的解决方案。Akka Http似乎是为了让您的路由简单而设计的HttpRequest-

我没有将所有内容都构建到一个Akka流图中,而是有一个单独的队列源-

object HttpWithSinkTest {
  def buildQueueSourceGraph(): RunnableGraph[(SourceQueueWithComplete[String], Future[Done])] = {
    val annotateMessage: Flow[String, String, NotUsed] = Flow.fromFunction[String, String](s => s"got message from queue: $s")

    val sourceQueue = Source.queue[String](100, OverflowStrategy.dropNew)
    val sink: Sink[String, Future[Done]] = Sink.foreach(println)
    val annotatedSink = annotateMessage.toMat(sink)(Keep.right)
    val queueGraph = sourceQueue.toMat(annotatedSink)(Keep.both)

    queueGraph
  }

  def buildHttpFlow(queue: SourceQueueWithComplete[String],
                    actorSystem: ActorSystem, materializer: ActorMaterializer): Flow[HttpRequest, HttpResponse, NotUsed] = {
    implicit val actorSystemI = actorSystem
    implicit val materializerI = materializer

    val route: akka.http.scaladsl.server.Route =
      path("hello" / Segment) { p =>
        get {
          complete {
            queue.offer(s"got http event p=$p")

            s"<h1>Say hello to akka-http. p=$p</h1>"
          }
        }
      }

    val routeFlow = RouteResult.route2HandlerFlow(route)

    routeFlow
  }

  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("my-akka-http-test")
    val executor = actorSystem.dispatcher
    implicit val materializer = ActorMaterializer()(actorSystem)

    val (queue, _) = buildQueueSourceGraph().run()(materializer)

    val httpFlow = buildHttpFlow(queue, actorSystem, materializer)
    val httpExt: akka.http.scaladsl.HttpExt = Http(actorSystem)
    val bindingFuture = httpExt.bindAndHandle(httpFlow, "localhost", 8080)

    println("Server online at http://localhost:8080/")
    println("Press RETURN to stop...")
    scala.io.StdIn.readLine()

    println("Shutting down...")

    val serverBinding = Await.result(bindingFuture, Duration.Inf)
    Await.result(serverBinding.unbind(), Duration.Inf)
    Await.result(actorSystem.terminate(), Duration.Inf)

    println("Done. Exiting")
  }
}
司徒博容
2023-03-14

如果您想将整个HttpRequest发送到您的接收器,我会说最简单的方法是使用alsoTo组合器。结果将大致如下

val mySink: Sink[HttpRequest, NotUsed] = ???

val handlerFlow = Flow[HttpRequest].alsoTo(mySink).via(RouteResult.route2HandlerFlow(route))

val bindingFuture = Http().bindAndHandle(handlerFlow, "localhost", 8080)

仅供参考:alsoTo实际上隐藏了一个广播舞台。

如果您需要有选择地从特定的子例程向接收器发送消息,那么除了为每个传入的请求具体化一个新的流之外,您别无选择。见下面的例子

val sink: Sink[Any, Future[Done]] = Sink.foreach(println)

val route: akka.http.scaladsl.server.Route =
  path("hello" / Segment) { p =>
    get {

      (extract(_.request) & extractMaterializer) { (req, mat) ⇒
        Source.single(req).runWith(sink)(mat)

        complete {
          s"<h1>Say hello to akka-http. p=$p</h1>"
        }
      }
    }
  }

此外,请记住,您始终可以完全抛弃高级DSL,并使用低级DSL对整个路线进行建模。这将导致更详细的代码,但会让您完全控制流的具体化。

编辑:下面的例子

val sink: Sink[Any, Future[Done]] = Sink.foreach(println)

val handlerFlow =
  Flow.fromGraph(GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._

    val partition = b.add(Partition[HttpRequest](2, {
      case HttpRequest(GET, Uri.Path("/"), _, _, _) ⇒ 0
      case _                                        ⇒ 1
    }))
    val merge = b.add(Merge[HttpResponse](2))

    val happyPath = Flow[HttpRequest].map{ req ⇒
      HttpResponse(entity = HttpEntity(
        ContentTypes.`text/html(UTF-8)`,
        "<html><body>Hello world!</body></html>"))
    }        

    val unhappyPath = Flow[HttpRequest].map{
      case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
      HttpResponse(entity = "PONG!")

      case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
      sys.error("BOOM!")

      case r: HttpRequest =>
        r.discardEntityBytes() // important to drain incoming HTTP Entity stream
        HttpResponse(404, entity = "Unknown resource!")
    }

    partition.out(0).alsoTo(sink) ~> happyPath   ~> merge
    partition.out(1)              ~> unhappyPath ~> merge

    FlowShape(partition.in, merge.out)
  })

val bindingFuture = Http().bindAndHandle(handlerFlow, "localhost", 8080)
 类似资料:
  • 我正在开发一个使用Akka Http和Akka流的客户机-服务器应用程序。主要思想是服务器必须使用来自Akka Streams的源来提供http响应。 问题是服务器在向客户机发送第一条消息之前积累了一些元素。但是,我需要服务器在源生成新元素时立即发送元素到元素。 我同时获得所有元素,而不是每2秒接收一个元素。 有什么想法可以“强制”服务器发送每个元素,因为它是从源头出来的吗?

  • 主要的类别是: 路由的执行元类为:

  • 在以下React应用程序中,有两个路由URLhttp://myapp 正确布线到布局构件。但是,URLhttp://myapp/login 也路由到布局组件,而不是登录。如果我将path=“/login”更改为“/sign”,它将正确路由到登录组件。 React路由器中的“/login”路径将其路由到路由是否有特殊之处?或者我设置这个路由的方式有错误吗?

  • 我正在使用最新版本的angular 9和最新版本的nodejs。下面是示例代码 来自app.component From app.component.ts 来自 app-routing.module 来自app.module 类属性 状态-component.ts 这就是问题所在。 如何使用路由器将整个对象传递给组件?(注意对象在应用程序组件中初始化) 如何在状态组件的“ngOnInit”方法中接

  • 我正在通过一些示例学习Akka HTTP堆栈来创建一个新的REST项目(完全非UI)。我一直在使用和扩展Akka HTTP微服务示例,以通过一系列用例和配置来工作,并对Scala和Akka HTTP的良好工作感到惊喜。 目前我有一个这样的配置: 参数只是一个简单的值,其中包含使用、等的典型数据。 有没有什么方法可以在多个Scala文件或某个示例中设置路由? 这可能是我想得太多了,因为我是如何在Ja