当前位置: 首页 > 知识库问答 >
问题:

Akka流选项输出

端木宏才
2023-03-14

我创建了一个Akka流,它有一个简单的。有了这个,我可以很容易地通过它发送元素。现在我想更改这个流,以便返回一个选项。根据选项的结果我想更改流的输出

有可能创造出这样的建筑吗?

共有3个答案

宰父跃
2023-03-14

您可以将两个水槽的水流视为一个水槽。为了构造更复杂的图,我们可以使用GraphDSL中提供的函数。

考虑一下在一般情况下

def splittingSink[T, M1, M2, Mat](f: T ⇒ Option[T], someSink: Sink[T, M1], noneSink: Sink[None.type, M2], combineMat: (M1, M2) ⇒ Mat): Sink[T, Mat] = {
    val graph = GraphDSL.create(someSink, noneSink)(combineMat) { implicit builder ⇒
        (sink1, sink2) ⇒ {
            import GraphDSL.Implicits._

            //Here we broadcast the Some[T] values to 2 flows,
            // each filtering to the correct type for each sink
            val bcast = builder.add(Broadcast[Option[T]](2))
            bcast.out(0) ~> Flow[Option[T]].collect { case Some(t) ⇒ t } ~> sink1.in
            bcast.out(1) ~> Flow[Option[T]].collect { case None ⇒ None } ~> sink2.in

            //The flow that maps T => Some[T]
            val mapper = builder.add(Flow.fromFunction(f))
            mapper.out ~> bcast.in

            //The whole thing is a Sink[T]
            SinkShape(mapper.in)
        }
    }
    Sink.fromGraph(graph)
}

这将返回一个Sink[T,Mat],该函数使用提供的函数将传入的T元素映射到一个选项[T],然后将其定向到提供的一个Sink。

用法示例:

val sink = splittingSink(
    (s: String) ⇒ if (s.length % 2 == 0) Some(s) else None,
    Sink.foreach[String](s),
    Sink.foreach[None.type](_ ⇒ println("None")),
    (f1: Future[_], f2: Future[_]) ⇒ Future.sequence(Seq(f1, f2)).map(_ ⇒ Done)
)

Source(List("One", "Two", "Three", "Four", "Five", "Six"))
        .runWith(sink)
        .onComplete(_ ⇒ println("----\nDone"))

输出:

None
None
None
Four
Five
None
----
Done

关于流图的文档部分将进一步讨论GraphDSL的使用。

祁刚毅
2023-03-14

假设你有这样的东西

val source = Source(1 to 100)
val flow = Flow[Int].map {
  case x if x % 2 == 0 ⇒ Some(x.toString)
  case _ ⇒ None
}
val sink1 = Sink.foreach[String](println)
val sink2 = Sink.foreach[None.type](x ⇒ println("dropped element"))

您可以制作具有所需结构的可运行图,如下所示:

val runnable = source
  .via(flow)
  .alsoTo(Flow[Option[String]].collect { case None ⇒ None }.to(sink2))
  .to(Flow[Option[String]].collect { case Some(x) ⇒ x }.to(sink1))
上官自明
2023-03-14

此时给出的两个答案都涉及广播。请注意,它可能在这个特定的示例中有效,但在更复杂的图中,广播可能不是一个明智的选择。原因是,如果至少有一个下游背压,则广播总是背压。最好的背压感知解决方案是分区,它能够有选择地从分区器函数选择的分支传播背压。

下面的例子(详细说明T-Foll的一个答案)

  def splittingSink[T, M1, M2, Mat](f: T ⇒ Option[T], someSink: Sink[T, M1], noneSink: Sink[None.type, M2], combineMat: (M1, M2) ⇒ Mat): Sink[T, Mat] = {
    val graph = GraphDSL.create(someSink, noneSink)(combineMat) { implicit builder ⇒
      (sink1, sink2) ⇒ {
        import GraphDSL.Implicits._

        def partitioner(o: Option[T]) = o.map(_ => 0).getOrElse(1)
        val partition = builder.add(Partition[Option[T]](2, partitioner))
        partition.out(0) ~> Flow[Option[T]].collect { case Some(t) ⇒ t } ~> sink1.in
        partition.out(1) ~> Flow[Option[T]].collect { case None ⇒ None } ~> sink2.in

        val mapper = builder.add(Flow.fromFunction(f))
        mapper.out ~> partition.in

        SinkShape(mapper.in)
      }
    }
    Sink.fromGraph(graph)
  }
 类似资料:
  • 学习Akka Streams。我有一个记录流,每个时间单位很多,已经按时间排序(来自Slick),我想通过检测时间步长何时变化来将它们批处理为时间组。 实例 如果传入流是 我想把它转换成 到目前为止,我只发现了按固定数量的记录进行分组,或者拆分成许多子流,但从我的角度来看,我不需要多个子流。 更新:我发现了,但它看起来更关心背压,而不仅仅是一直批处理。

  • 当接收到消息时,它将运行,并将接收到的每个项发布到。 我怎么能那么做? 以防万一它可能会添加更多选项,请注意,另一个代码块是的Websocket处理程序。

  • 我不知道为什么,但是在scrren尺寸1024 x 768上,我从select中选择的选项太大了。我怎样才能让这个选项像select一样宽?我想只在bootstrap 4上这样做,只使用bootstrap中的类,但我不知道或者这是可能的,我不知道为什么这个选择会这样。在更大的屏幕尺寸上,一切都可以。如何修复它?这是我的代码:

  • 根据文档[1],我一直试图在Akka stream中并行化一个流,但由于某些原因,我没有得到预期的结果。 我遵循了留档中列出的步骤,我不认为我错过了什么。然而,我的流的计算都是按顺序一个接一个地发生的。 我错过了什么? [1] https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html 示例输出 我希望看到两个计算同时进