当前位置: 首页 > 知识库问答 >
问题:

Akka溪流-可暂停图形舞台(Akka 2.5.7)

堵毅然
2023-03-14

我想写一个GraphStage,可以通过发送另一个演员的消息来暂停/取消暂停。

下面截取的代码显示了一个简单的GraphStage,它会生成随机数。当舞台具体化时,GraphStageLogic会向主管发送一条包含StageActor的消息(在preStart()中)。主管保留舞台的ActorRef,因此可用于控制舞台。

object RandomNumberSource {
  case object Pause
  case object UnPause
}

class RandomNumberSource(supervisor: ActorRef) extends GraphStage[SourceShape[Int]] {

  val out: Outlet[Int] = Outlet("rnd.out")

  override val shape: SourceShape[Int] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new RandomNumberSourceLogic(shape)
  }

  private class RandomNumberSourceLogic(shape: Shape) extends GraphStageLogic(shape) with StageLogging {

    lazy val self: StageActor = getStageActor(onMessage)

    val numberGenerator: Random = Random
    var isPaused: Boolean = true

      override def preStart(): Unit = {
        supervisor ! AssignStageActor(self.ref)
      }

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          if (!isPaused) {
            push(out, numberGenerator.nextInt())
            Thread.sleep(1000)
          }
        }
      })

      private def onMessage(x: (ActorRef, Any)): Unit =
      {
        x._2 match {
          case Pause =>
            isPaused = true
            log.info("Stream paused")
          case UnPause =>
            isPaused = false
            getHandler(out).onPull()
            log.info("Stream unpaused!")
          case _ =>
        }
      }
    }
}

这是supervisor actor的一个非常简单的实现:

object Supervisor {
  case class AssignStageActor(ref: ActorRef)
}

class Supervisor extends Actor with ActorLogging {

  var stageActor: Option[ActorRef] = None

  override def receive: Receive = {

    case AssignStageActor(ref) =>
      log.info("Stage assigned!")
      stageActor = Some(ref)
      ref ! Done

    case Pause =>
      log.info("Pause stream!")
      stageActor match {
        case Some(ref) => ref ! Pause
        case _ =>
      }

    case UnPause =>
      log.info("UnPause stream!")
      stageActor match {
        case Some(ref) => ref ! UnPause
        case _ =>
      }
  }
}

我正在使用以下应用程序运行流:

object Application extends App {

  implicit val system = ActorSystem("my-actor-system")
  implicit val materializer = ActorMaterializer()

  val supervisor = system.actorOf(Props[Supervisor], "supervisor")

  val sourceGraph: Graph[SourceShape[Int], NotUsed] = new RandomNumberSource(supervisor)
  val randomNumberSource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)

  randomNumberSource.take(100).runForeach(println)

  println("Start stream by pressing any key")

  StdIn.readLine()

  supervisor ! UnPause

  StdIn.readLine()

  supervisor ! Pause

  StdIn.readLine()

  println("=== Terminating ===")
  system.terminate()
}

当应用程序以“暂停”状态启动阶段ia并且不产生任何数字时。当我按下键时,我的舞台开始发出数字。但我的问题是,在它启动后发送到舞台的所有消息都被忽略了。我无法暂停阶段。

我感兴趣的是根据从演员那里收到的消息来改变舞台的行为,但我发现的所有示例都将演员的消息传递到流中。

有人猜到我的代码为什么不起作用了吗?或者有人知道如何构建这样一个GraphStage

非常感谢你!

共有1个答案

祁承望
2023-03-14

Akka Stream Contrib项目有一个阶段,它具体化了一个可以暂停和恢复流量的值。从这个类的Scaladoc中:

具体化为未来的ValveSwitch,它提供了一个方法翻转,停止或重新启动通过舞台的元素流。只要阀门关闭,它就会产生背压。

例如:

val (switchFut, seqSink) = Source(1 to 10)
  .viaMat(new Valve(SwitchMode.Close))(Keep.right)
  .toMat(Sink.seq)(Keep.both)
  .run()

switchFut是一个未来的[Valves switch],由于开关最初是关闭的,因此阀门背压和任何东西都不会排放到下游。要打开阀门:

switchFut.onComplete {
  case Success(switch) =>  
    switch.flip(SwitchMode.Open) // Future[Boolean]
  case _ =>
    log.error("the valve failed")
}

更多示例见ValveSpec

 类似资料:
  • 我写了一个kafka流代码,使用kafka 2.4 kafka客户端版本和kafka 2.2服务器版本。我的主题有50个分区 我的Kafka流代码有选择键()DSL操作,我有200万条记录使用相同的KEY。在流配置中,我已经完成了 因此,我能够使用完全相同的键使用不同的分区。如果我没有按预期使用轮循机制,我的所有消息都会转到同一分区。 直到现在一切都很好,但我意识到;当我使用RoundRobin分

  • 问题内容: 因此,我有一堂课,必须编写程序来制作Simon。我知道我的做法不一定是最好的方法,但是,他有一些晦涩的要求,所以这就是我这样做的原因。 我的程序即将完成,但是有一个主要问题。当我按下重设”按钮时,我调用了一种称为“重设”的方法,该方法又将计算机设置为 播放其第一步。 在此期间,将进行图形更新。 当我自己调用reset方法时,它可以按预期工作。当我按下reset按钮时,它会执行所有图形更

  • 我是Spring云流的新手。我使用的是我们团队成员之一写的活页夹。我使用执行器的/绑定endpoint暂停/恢复应用程序中的使用者。但我有个错误 问题1。我猜这是因为我使用的活页夹不支持暂停/恢复操作。有谁能给我举一些例子,在那里我可以找到如何将此功能添加到活页夹? Qn 2。我也使用执行器endpoint尝试启动/停止。停止工作正常,但在启动时,我得到了以下错误 是否有人可以提供一些关于此错误的

  • 问题内容: 我正在JavaFx中创建一个应用程序,如果要打开任何子阶段,则应在其中进行操作,然后应在父阶段的中心打开它。我正在尝试使用它来执行此操作,但是它将子级分配到屏幕的中心,而不是父级的中心。如何将子阶段分配给父阶段的中心? 问题答案: 您可以使用父级的X / Y / width / height属性来执行此操作。除了使用,您可以执行以下操作: