当前位置: 首页 > 知识库问答 >
问题:

akka流的子类型分区

谷善
2023-03-14

我有一个akka流,我有一个ADT的形式。

sealed trait Message
sealed trait ThisMessage extends Message
sealed trait ThatMessage extends Message

现在我有一个这个消息处理程序流和一个那个消息处理程序流。我有一个接受消息类型的入口流。

为了创建拆分,我有以下分区器。我有以下分区器函数的定义。

 /**
  * Creates a Partition stage that, given a type A, makes a decision to whether to partition to subtype B or subtype C
  *
  * @tparam A type of input
  * @tparam B type of output on the first outlet.
  * @tparam C type of output on the second outlet.
  *
  * @return A partition stage
  */
  def binaryPartitionByType[A, B <: A, C <: A](): Graph[FanOutShape2[A, B, C], NotUsed] =
GraphDSL.create[FanOutShape2[A, B, C]]() { implicit builder =>
  import GraphDSL.Implicits._

  // This is wrong, but I have no idea how to write this.
  val partitioner: UniformFanOutShape[A, A] = builder.add(Partition[A](2, {
    case _: B => 0
    case _: C => 1
  }))

  new FanOutShape2(partitioner.in, partitioner.out(0).outlet, partitioner.out(1).outlet)
}

我希望使用上述方法,并使用类型params中的ADT来初始化分区程序。

编译器抛出这个错误。

Error:(63, 7) type mismatch;
 found   : akka.stream.FanOutShape2[A,A,A]
 required: akka.stream.FanOutShape2[A,B,C]
      new FanOutShape2(partitioner.in, partitioner.out(0).outlet, 
partitioner.out(1).outlet)

据我所知,分区对象只有Inlet(在本例中为A,参数化类型。

有人知道我该怎么解决这个问题吗?

共有2个答案

危飞跃
2023-03-14

问题是你试图颠覆类型系统UniformFanOutShape被命名为“uniform”,因为它的所有输出都是同一类型的。如果不是这样,你就不需要再创建一个额外的FanOutShape2。如果你要插入类型系统,你应该始终如一地这样做,因此你应该更改出口的类型

new FanOutShape2(partitioner.in, partitioner.out(0).outlet.as[B], partitioner.out(1).outlet.as[C])

仰经武
2023-03-14

下面是一种实例化aFanOutShape2[a,B]的方法

import akka.stream.scaladsl._
import akka.stream.{Graph, FanOutShape2}
import akka.NotUsed
import scala.reflect.ClassTag

def binaryPartitionByType[A, B <: A : ClassTag, C <: A : ClassTag](): Graph[FanOutShape2[A, B, C], NotUsed] =
  GraphDSL.create[FanOutShape2[A, B, C]]() { implicit builder =>
    import GraphDSL.Implicits._

    val partitioner = builder.add(Partition[A](2, {
      case _: B => 0
      case _: C => 1
    }))

    val partitionB = builder.add(Flow[A].collect{ case b: B => b })
    val partitionC = builder.add(Flow[A].collect{ case c: C => c })

    partitioner.out(0) ~> partitionB
    partitioner.out(1) ~> partitionC

    new FanOutShape2(partitioner.in, partitionB.out, partitionC.out)
}

// binaryPartitionByType: [A, B <: A, C <: A]()(
//   implicit evidence$1: scala.reflect.ClassTag[B], implicit evidence$2: scala.reflect.ClassTag[C]
// ) akka.stream.Graph[akka.stream.FanOutShape2[A,B,C],akka.NotUsed]

请注意,需要ClassTag来避免类型擦除。

 类似资料:
  • 我有以下简单的case类层次结构: 我有一个(来自一个基于Websocket的协议,已经有了编解码器)。 我想将此解复用为Foo和Baz类型的单独流,因为它们由完全不同的路径处理。 最简单的方法是什么?应该很明显,但我错过了一些东西。。。

  • 我的问题是,我有一个未知的组数,如果mapAsync的并行数少于我得到的组数,并且在最后一个接收器中出错 由于上游错误(Akka.Stream.Impl.StreamSubscriptionTimeoutSupport$$Anon$2),正在拆除SynchronousFileSink(/users/sam/dev/projects/akka-streams/target/log-error.txt

  • 我试图用akka-http测试一个TypedActor,但在尝试创建测试用例时遇到了一些问题。为了测试TypedActor,我将编写以下规范... 但是,当我必须编写一个与HTTP/+WS路由一起使用的TypedActor时,我无法编写... 我如何才能编写一个同时使用这两种测试呢? 请指教。

  • 我如何测试给定的行为是否发送了我期望的消息? 比如说,三条某种类型的信息,一条接一条... 对于常规参与者(非类型化),有来自常规Akka的TestProbe,方法如: http://doc.akka.io/api/akka/current/index.html#akka.testkit.testprobe 有了Akka型的我还在挠头。有一个叫做EffectfulActorContext的东西,但

  • 我有一个Akka Streams,我想根据谓词将其分成两个源。 例如。有一个源(类型被有意简化): 还有两种方法: 我希望能够拆分根据谓词,并将右侧部分传递给方法,左侧部分传递给方法。 我尝试使用拆分器,但它需要s结尾。