当前位置: 首页 > 知识库问答 >
问题:

将Akka流源拆分为两个

张丰
2023-03-14

我有一个Akka StreamsSource,我想根据谓词将其分成两个源。

例如。有一个源(类型被有意简化):

val source: Source[Either[Throwable, String], NotUsed] = ???

还有两种方法:

def handleSuccess(source: Source[String, NotUsed]): Future[Unit] = ???
def handleFailure(source: Source[Throwable, NotUsed]): Future[Unit] = ???

我希望能够拆分source根据_. isright谓词,并将右侧部分传递给handle成功方法,左侧部分传递给handleFailure方法。

我尝试使用广播拆分器,但它需要接收器s结尾。

共有3个答案

汪甫
2023-03-14

为此,您可以使用广播,然后过滤并映射GraphDSL中的流:

val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s"))
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s"))


val flow = RunnableGraph.fromGraph(GraphDSL.create(eitherSource, leftSink, rightSink)((_, _, _)) { implicit b => (s, l, r) =>

       import GraphDSL.Implicits._

       val broadcast = b.add(Broadcast[Either[Throwable,String]](2))


       s ~> broadcast.in
       broadcast.out(0).filter(_.isLeft).map(_.left.get) ~> l.in
       broadcast.out(1).filter(_.isRight).map(_.right.get) ~> r.in


       ClosedShape
  })


val r = flow.run()
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)

我希望您能够在地图中运行您想要的功能。

丁曦哲
2023-03-14

编辑:在我看来,另一个带有diverto的答案比我的答案更好。我将把我的答案留给后代。

原始答案:

这在akka stream contrib中实现为PartitionWith。将此依赖项添加到SBT以将其拉入项目:

libraryDependencies += "com.typesafe.akka" %% "akka-stream-contrib" % "0.9"```

`PartitionWith` is shaped like a `Broadcast(2)`, but with potentially different types for each of the two outlets. You provide it with a predicate to apply to each element, and depending on the outcome, they get routed to the applicable outlet. You can then attach a `Sink` or `Flow` to each of these outlets independently as appropriate. Building on [cessationoftime's example](https://stackoverflow.com/a/39744355/147806), with the `Broadcast` replaced with a `PartitionWith`:

    val eitherSource: Source[Either[Throwable, String], NotUsed] = Source.empty
    val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s"))
    val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s"))

    val flow = RunnableGraph.fromGraph(GraphDSL.create(eitherSource, leftSink, rightSink)
                                      ((_, _, _)) { implicit b => (s, l, r) =>

      import GraphDSL.Implicits._

      val pw = b.add(
        PartitionWith.apply[Either[Throwable, String], Throwable, String](identity)
      )

      eitherSource ~> pw.in
      pw.out0 ~> leftSink
      pw.out1 ~> rightSink

      ClosedShape
    })

    val r = flow.run()
    Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)
公良泰宁
2023-03-14

虽然您可以选择要从的哪一侧检索项目,但不可能创建一个,从而产生两个输出,这似乎是您最终想要的。

给定下面的GraphStage,它基本上将左右值拆分为两个输出...

/**
  * Fans out left and right values of an either
  * @tparam L left value type
  * @tparam R right value type
  */
class EitherFanOut[L, R] extends GraphStage[FanOutShape2[Either[L, R], L, R]] {
  import akka.stream.{Attributes, Outlet}
  import akka.stream.stage.GraphStageLogic

  override val shape: FanOutShape2[Either[L, R], L, R] = new FanOutShape2[Either[L, R], L, R]("EitherFanOut")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {

    var out0demand = false
    var out1demand = false

    setHandler(shape.in, new InHandler {
      override def onPush(): Unit = {

        if (out0demand && out1demand) {
          grab(shape.in) match {
            case Left(l) =>
              out0demand = false
              push(shape.out0, l)
            case Right(r) =>
              out1demand = false
              push(shape.out1, r)
          }
        }
      }
    })

    setHandler(shape.out0, new OutHandler {
      @scala.throws[Exception](classOf[Exception])
      override def onPull(): Unit = {
        if (!out0demand) {
          out0demand = true
        }

        if (out0demand && out1demand) {
          pull(shape.in)
        }
      }
    })

    setHandler(shape.out1, new OutHandler {
      @scala.throws[Exception](classOf[Exception])
      override def onPull(): Unit = {
        if (!out1demand) {
          out1demand = true
        }

        if (out0demand && out1demand) {
          pull(shape.in)
        }
      }
    })
  }
}

…您可以将其路由到仅接收一侧:

val sourceRight: Source[String, NotUsed] = Source.fromGraph(GraphDSL.create(source) { implicit b => s =>
  import GraphDSL.Implicits._

  val eitherFanOut = b.add(new EitherFanOut[Throwable, String])

  s ~> eitherFanOut.in
  eitherFanOut.out0 ~> Sink.ignore

  SourceShape(eitherFanOut.out1)
})

Await.result(sourceRight.runWith(Sink.foreach(println)), Duration.Inf)

... 或者可能更可取的方法是,将它们路由到两个独立的Sinks:

val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s"))
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s"))

val flow = RunnableGraph.fromGraph(GraphDSL.create(source, leftSink, rightSink)((_, _, _)) { implicit b => (s, l, r) =>

  import GraphDSL.Implicits._

  val eitherFanOut = b.add(new EitherFanOut[Throwable, String])

  s ~> eitherFanOut.in
  eitherFanOut.out0 ~> l.in
  eitherFanOut.out1 ~> r.in

  ClosedShape
})


val r = flow.run()
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)

(导入和初始设置)

import akka.NotUsed
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source}
import akka.stream.stage.{GraphStage, InHandler, OutHandler}
import akka.stream._
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent.Await
import scala.concurrent.duration.Duration

val classLoader = getClass.getClassLoader
implicit val system = ActorSystem("QuickStart", ConfigFactory.load(classLoader), classLoader)
implicit val materializer = ActorMaterializer()

val values: List[Either[Throwable, String]] = List(
  Right("B"),
  Left(new Throwable),
  Left(new RuntimeException),
  Right("B"),
  Right("C"),
  Right("G"),
  Right("I"),
  Right("F"),
  Right("T"),
  Right("A")
)

val source: Source[Either[Throwable, String], NotUsed] = Source.fromIterator(() => values.toIterator)
 类似资料:
  • 我试图将一个流媒体源缓存到磁盘上,同时也将其作为发送出去,也就是说,我有一个,我想把它交给,但我还想将相同的数据运行到sink。 似乎我应该使用进行扇出,但从描述来看,它写入两个接收器,而是一个接收器,需要一个。 还有,它看起来像是从GraphStage创建源,例如阶段,但我不太清楚如何将接收器放在那里。

  • 我有以下简单的case类层次结构: 我有一个(来自一个基于Websocket的协议,已经有了编解码器)。 我想将此解复用为Foo和Baz类型的单独流,因为它们由完全不同的路径处理。 最简单的方法是什么?应该很明显,但我错过了一些东西。。。

  • 我尝试了一些代码片段将wordpress子菜单拆分为两个Colum,我获得了一些成功。但它们并没有完全分开。奇数菜单项下面有空白,偶数菜单项上面有空白。菜单链接 在上面的网页中,菜单-->纳维什塔黄金时段-->第二季 菜单项似乎不在一条线上,造成了白色的空隙。下面是我使用的CSS代码: 我在wordpress菜单设置中的菜单项“第2季”上应用了这个类“子菜单-列”。

  • 我正在使用来自我无法控制的java库的数据发布者。发布者库使用典型的回调设置;在库代码的某个地方(库是java,但我将在scala中描述简洁): 库的用户需要编写一个实现方法的类,并将其传递给,库代码如下所示: 有自己的内部线程我无法控制,以及随附的数据缓冲区,即每当有另一个对象要使用时调用。 所以,我的问题是:如何编写一个层,将原始库模式转换/转换为akka流源对象? 提前谢谢你。

  • 问题内容: 我有一个表字段,其中包含用户的姓氏和名字。是否有可能分裂成那些2场,? 所有记录的格式均为“名字的姓氏”(不带引号,中间还有空格)。 问题答案: 不幸的是,MySQL没有分割字符串功能。但是,您可以为此创建一个用户定义的函数,例如以下文章中描述的函数: Federico Cargnelutti撰写的MySQL Split String Function 使用该功能: 您将可以按照以下方

  • 当接收到消息时,它将运行,并将接收到的每个项发布到。 我怎么能那么做? 以防万一它可能会添加更多选项,请注意,另一个代码块是的Websocket处理程序。