当前位置: 首页 > 知识库问答 >
问题:

Akka Streams按类型拆分流

景正文
2023-03-14

我有以下简单的case类层次结构:

sealed trait Message
case class Foo(bar: Int) extends Message
case class Baz(qux: String) extends Message

我有一个流[Message,Message,NotUsed](来自一个基于Websocket的协议,已经有了编解码器)。

我想将此Flow[Message]解复用为Foo和Baz类型的单独流,因为它们由完全不同的路径处理。

最简单的方法是什么?应该很明显,但我错过了一些东西。。。

共有1个答案

章承
2023-03-14

一种方法是使用创建一个RunnableGraph,其中包含每种消息类型的流。

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>

  val in = Source(...)  // Some message source
  val out = Sink.ignore

  val foo = builder.add(Flow[Message].map (x => x match { case f@Foo(_) => f }))
  val baz = builder.add(Flow[Message].map (x => x match { case b@Baz(_) => b }))
  val partition = builder.add(Partition[Message](2, {
    case Foo(_) => 0
    case Baz(_) => 1
  }))

  partition ~> foo ~> // other Flow[Foo] here ~> out
  partition ~> baz ~> // other Flow[Baz] here ~> out

  ClosedShape
}

g.run()
 类似资料:
  • 问题内容: 我目前正在使用MYSQL中的函数,我有另一个表中的逗号分隔字符串(1,22,344,55),如何在MYSQL中将其拆分为数组(不是temp_table)。另外,MYSQL中有类似的函数可以执行foreach()吗? 问题答案: MySQL不包含拆分定界字符串的函数。但是,创建自己的函数非常容易。 用法 从这里:http : //blog.fedecarg.com/2009/02/22/

  • 问题内容: 我有以下 我想将其拆分,以便我有一个字符串数组,例如 以便对象是数组的元素。重要的是包含封闭的[和]。我到目前为止: 但这给了我: 并不是我真正想要的。 问题答案: 我更喜欢使用并指定我 想要的内容, 而不是尝试描述以下内容的分隔符 火柴 [ 匹配任何东西,但] 火柴 ]

  • 我有列。 如何根据值将其拆分为2? 第一个将包含

  • 拆分成微服务疑问,按 controller 还是按照 project 拆? 先说个人不懂微服务,也没搞懂过 按照我的经验,通常就是拆分 controller 跟 service 由不同同事负责 不会刻意拆分不同 project,除非像是统一账号验证才会额外拆 但现在遇到一个顾问说,微服务要尽量拆分到不同 project 维护,各自有自己的 docker 这样才不会有严重依赖耦合 我不太能理解这样概

  • 我想用句子截断文本。 示例文本:'Lorem ipsum dolor坐在amet,奉献adipiscing elit!UT车辆laoreet urna, commodo,在马萨诸塞州。赛德volutpat nunc简历urna拍卖,在tempus enim rhoncus。马蒂斯康莫多的莫尔比交流电击器?Morbi在ornare Arcu,sagittis scelerisque risus。Ae

  • 我在csv文件中有一个列,其中包含此格式的人员详细信息: 实际csv格式: 我想将它们拆分为一个新的csv文件,如下所示: 拆分详细信息: 拆分行分隔符: