我想写一个GraphStage,可以通过发送另一个演员的消息来暂停/取消暂停。
下面截取的代码显示了一个简单的GraphStage
,它会生成随机数。当舞台具体化时,GraphStageLogic
会向主管发送一条包含StageActor
的消息(在preStart()
中)。主管保留舞台的ActorRef
,因此可用于控制舞台。
object RandomNumberSource {
case object Pause
case object UnPause
}
class RandomNumberSource(supervisor: ActorRef) extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("rnd.out")
override val shape: SourceShape[Int] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new RandomNumberSourceLogic(shape)
}
private class RandomNumberSourceLogic(shape: Shape) extends GraphStageLogic(shape) with StageLogging {
lazy val self: StageActor = getStageActor(onMessage)
val numberGenerator: Random = Random
var isPaused: Boolean = true
override def preStart(): Unit = {
supervisor ! AssignStageActor(self.ref)
}
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (!isPaused) {
push(out, numberGenerator.nextInt())
Thread.sleep(1000)
}
}
})
private def onMessage(x: (ActorRef, Any)): Unit =
{
x._2 match {
case Pause =>
isPaused = true
log.info("Stream paused")
case UnPause =>
isPaused = false
getHandler(out).onPull()
log.info("Stream unpaused!")
case _ =>
}
}
}
}
这是supervisor actor的一个非常简单的实现:
object Supervisor {
case class AssignStageActor(ref: ActorRef)
}
class Supervisor extends Actor with ActorLogging {
var stageActor: Option[ActorRef] = None
override def receive: Receive = {
case AssignStageActor(ref) =>
log.info("Stage assigned!")
stageActor = Some(ref)
ref ! Done
case Pause =>
log.info("Pause stream!")
stageActor match {
case Some(ref) => ref ! Pause
case _ =>
}
case UnPause =>
log.info("UnPause stream!")
stageActor match {
case Some(ref) => ref ! UnPause
case _ =>
}
}
}
我正在使用以下应用程序运行流:
object Application extends App {
implicit val system = ActorSystem("my-actor-system")
implicit val materializer = ActorMaterializer()
val supervisor = system.actorOf(Props[Supervisor], "supervisor")
val sourceGraph: Graph[SourceShape[Int], NotUsed] = new RandomNumberSource(supervisor)
val randomNumberSource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)
randomNumberSource.take(100).runForeach(println)
println("Start stream by pressing any key")
StdIn.readLine()
supervisor ! UnPause
StdIn.readLine()
supervisor ! Pause
StdIn.readLine()
println("=== Terminating ===")
system.terminate()
}
当应用程序以“暂停”状态启动阶段ia并且不产生任何数字时。当我按下键时,我的舞台开始发出数字。但我的问题是,在它启动后发送到舞台的所有消息都被忽略了。我无法暂停阶段。
我感兴趣的是根据从演员那里收到的消息来改变舞台的行为,但我发现的所有示例都将演员的消息传递到流中。
有人猜到我的代码为什么不起作用了吗?或者有人知道如何构建这样一个GraphStage
?
非常感谢你!
Akka Stream Contrib项目有一个阀
阶段,它具体化了一个可以暂停和恢复流量的值。从这个类的Scaladoc中:
具体化为未来的ValveSwitch,它提供了一个方法翻转,停止或重新启动通过舞台的元素流。只要阀门关闭,它就会产生背压。
例如:
val (switchFut, seqSink) = Source(1 to 10)
.viaMat(new Valve(SwitchMode.Close))(Keep.right)
.toMat(Sink.seq)(Keep.both)
.run()
switchFut
是一个未来的[Valves switch]
,由于开关最初是关闭的,因此阀门背压和任何东西都不会排放到下游。要打开阀门:
switchFut.onComplete {
case Success(switch) =>
switch.flip(SwitchMode.Open) // Future[Boolean]
case _ =>
log.error("the valve failed")
}
更多示例见ValveSpec
。
我写了一个kafka流代码,使用kafka 2.4 kafka客户端版本和kafka 2.2服务器版本。我的主题有50个分区 我的Kafka流代码有选择键()DSL操作,我有200万条记录使用相同的KEY。在流配置中,我已经完成了 因此,我能够使用完全相同的键使用不同的分区。如果我没有按预期使用轮循机制,我的所有消息都会转到同一分区。 直到现在一切都很好,但我意识到;当我使用RoundRobin分
问题内容: 因此,我有一堂课,必须编写程序来制作Simon。我知道我的做法不一定是最好的方法,但是,他有一些晦涩的要求,所以这就是我这样做的原因。 我的程序即将完成,但是有一个主要问题。当我按下重设”按钮时,我调用了一种称为“重设”的方法,该方法又将计算机设置为 播放其第一步。 在此期间,将进行图形更新。 当我自己调用reset方法时,它可以按预期工作。当我按下reset按钮时,它会执行所有图形更
我是Spring云流的新手。我使用的是我们团队成员之一写的活页夹。我使用执行器的/绑定endpoint暂停/恢复应用程序中的使用者。但我有个错误 问题1。我猜这是因为我使用的活页夹不支持暂停/恢复操作。有谁能给我举一些例子,在那里我可以找到如何将此功能添加到活页夹? Qn 2。我也使用执行器endpoint尝试启动/停止。停止工作正常,但在启动时,我得到了以下错误 是否有人可以提供一些关于此错误的
null
问题内容: 我正在JavaFx中创建一个应用程序,如果要打开任何子阶段,则应在其中进行操作,然后应在父阶段的中心打开它。我正在尝试使用它来执行此操作,但是它将子级分配到屏幕的中心,而不是父级的中心。如何将子阶段分配给父阶段的中心? 问题答案: 您可以使用父级的X / Y / width / height属性来执行此操作。除了使用,您可以执行以下操作: