我正在使用BackoffSupervisor策略来创建一个必须处理某些消息的子参与者。我想实现一个非常简单的重启策略,其中在发生异常时:
>
supervisor重新启动子程序并再次发送失败消息。
主管重试3次后放弃
val childProps = Props(new SenderActor())
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
childProps,
childName = cmd.hashCode.toString,
minBackoff = 1.seconds,
maxBackoff = 2.seconds,
randomFactor = 0.2
)
.withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
case msg: MessageException => {
println("caught specific message!")
SupervisorStrategy.Restart
}
case _: Exception => SupervisorStrategy.Restart
case _ ⇒ SupervisorStrategy.Escalate
})
)
val sup = context.actorOf(supervisor)
sup ! cmd
class SenderActor() extends Actor {
def fakeSendMail():Unit = {
Thread.sleep(1000)
throw new Exception("surprising exception")
}
override def receive: Receive = {
case cmd: NewMail =>
println("new mail received routee")
try {
fakeSendMail()
} catch {
case t => throw MessageException(cmd, t)
}
}
}
class SenderActor() extends Actor {
def fakeSendMail():Unit = {
Thread.sleep(1000)
// println("mail sent!")
throw new Exception("surprising exception")
}
override def preStart(): Unit = {
println("child starting")
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
reason match {
case m: MessageException => {
println("aaaaa")
message.foreach(self ! _)
}
case _ => println("bbbb")
}
}
override def postStop(): Unit = {
println("child stopping")
}
override def receive: Receive = {
case cmd: NewMail =>
println("new mail received routee")
try {
fakeSendMail()
} catch {
case t => throw MessageException(cmd, t)
}
}
}
这给出了类似于以下输出的内容:
new mail received routee
caught specific message!
child stopping
[ERROR] [01/26/2018 10:15:35.690]
[example-akka.actor.default-dispatcher-2]
[akka://example/user/persistentActor-4-scala/$a/1962829645] Could not
process message sample.persistence.MessageException:
Could not process message <stacktrace>
child starting
但是没有来自prerestart
钩子的日志
没有调用子级的preRestart
钩子的原因是backoff.onfailure
在封面下面使用backoffonrestartsupervisor
,这将默认的重新启动行为替换为与退避策略一致的停止和延迟启动行为。换句话说,当使用backoff.onfailure
时,当重新启动子程序时,不会调用该子程序的prerestart
方法,因为基础主管实际上停止了该子程序,然后在以后再次启动它。(使用backoff.onstop
可以触发子级的prerestart
钩子,但这与当前的讨论无关。)
backoffsupervisor
API不支持在supervisor的子程序重新启动时自动重发消息:您必须自己实现此行为。重试消息的一个方法是让backoffsupervisor
的supervisor处理它。例如:
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
...
).withReplyWhileStopped(ChildIsStopped)
).withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
case msg: MessageException =>
println("caught specific message!")
self ! Error(msg.cmd) // replace cmd with whatever the property name is
SupervisorStrategy.Restart
case ...
})
)
val sup = context.actorOf(supervisor)
def receive = {
case cmd: NewMail =>
sup ! cmd
case Error(cmd) =>
timers.startSingleTimer(cmd.id, Replay(cmd), 10.seconds)
// We assume that NewMail has an id field. Also, adjust the time as needed.
case Replay(cmd) =>
sup ! cmd
case ChildIsStopped =>
println("child is stopped")
}
在上面的代码中,嵌入在MessageException
中的newmail
消息被包装在一个自定义case类中(以便于将其与“普通”/newnewmail
消息区分开来),并发送给self
。在此上下文中,self
是创建backoffsupervisor
的参与者。然后,这个封闭的参与者使用一个计时器在某个时刻重播原始消息。将来这个时间点应该足够长,这样backoffsupervisor
可能会耗尽senderactor
的重新启动尝试,以便子节点在接收到resent消息之前有足够的机会进入“良好”状态。显然,这个示例只涉及一个消息重发,而不管子重启的次数如何。
另一种方法是为每个newmail
邮件创建一个backoffsupervisor
-senderactor
对,并让senderactor
在prestart
钩子中将newmail
消息发送到其自身。这种方法的一个问题是清理资源;即,当处理成功或子级重新启动程序耗尽时,关闭backoffsupervisors
(这将依次关闭它们各自的senderactor
子级)。在这种情况下,Newmail
ID到(ActorRef,Int)
元组的映射(其中ActorRef
是对backoffsupervisor
参与者的引用,Int
是重启尝试的次数)将很有帮助:
class Overlord extends Actor {
var state = Map[Long, (ActorRef, Int)]() // assuming the mail id is a Long
def receive = {
case cmd: NewMail =>
val childProps = Props(new SenderActor(cmd, self))
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
...
).withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
case msg: MessageException =>
println("caught specific message!")
self ! Error(msg.cmd)
SupervisorStrategy.Restart
case ...
})
)
val sup = context.actorOf(supervisor)
state += (cmd.id -> (sup, 0))
case ProcessingDone(cmdId) =>
state.get(cmdId) match {
case Some((backoffSup, _)) =>
context.stop(backoffSup)
state -= cmdId
case None =>
println(s"${cmdId} not found")
}
case Error(cmd) =>
val cmdId = cmd.id
state.get(cmdId) match {
case Some((backoffSup, numRetries)) =>
if (numRetries == 3) {
println(s"${cmdId} has already been retried 3 times. Giving up.")
context.stop(backoffSup)
state -= cmdId
} else
state += (cmdId -> (backoffSup, numRetries + 1))
case None =>
println(s"${cmdId} not found")
}
case ...
}
}
请注意,上面示例中的senderactor
将newmail
和actorref
作为构造函数参数。后一个参数允许senderactor
向包含的actor发送自定义processingdone
消息:
class SenderActor(cmd: NewMail, target: ActorRef) extends Actor {
override def preStart(): Unit = {
println(s"child starting, sending ${cmd} to self")
self ! cmd
}
def fakeSendMail(): Unit = ...
def receive = {
case cmd: NewMail => ...
}
}
显然,SenderActor
被设置为每次使用FakeSendMail
的当前实现都失败。我将在senderactor
中保留所需的其他更改,以实现愉快的路径,在该路径中,senderactor
向target
发送processingdone
消息。
spring-boot消费者微服务无法在kafka重新启动后向主题发送消息。 > 消费者和生产者(spring boot微服务)位于同一覆盖网络“Net-Broker”上,因此它们使用服务名“kafka:9092”访问kafka。 一开始一切都很顺利。 然后kafka仅被重新启动,在此之后,消费者不能再从kafka主题发送消息。 由于docker-compose.yml中的一个小更改(例如max_
我是storm的新手,当我提交拓扑主管日志时 配置是 zookeeper的版本为 结果喷口不能发出消息和螺栓也有什么我可以分享它来解决这个问题吗?谁能帮忙?!
在官方的akka 2.0.4文档中,它说: actor重新启动只替换实际的actor对象;邮箱的内容不受重新启动的影响,因此在postRestart钩子返回后,将继续处理邮件。不会再收到触发异常的消息。在重新启动时发送给参与者的任何消息都将像往常一样排队到其邮箱。 我唯一能想到的是,如果消息由于某种原因出现畸形,它将永远不会离开系统,并导致演员定期重新启动...
我正在使用kafkapython来消费来自kafka队列(kafka版本0.10.2.0)的消息。特别是我使用的是KafkaConsumer类型。如果消费者停止,并且在一段时间后重新启动,我希望从最新生成的消息重新启动,即删除消费者停止时生成的所有消息。我怎样才能做到这一点? 谢谢
我有一个可流动的工作流,我需要在某个阶段暂停,然后根据JMS消息从JMS监听器重新启动相同的工作流。
我正试图制造一个不和谐的机器人,我有一些问题。我想要一种方法来获取某个频道的所有消息,但在bot重新启动后,他无法“看到”旧消息。以下是一个例子: 因此,我发送了3条随机消息,然后启动bot并启动命令。对于调用命令的消息,结果是1。