当前位置: 首页 > 知识库问答 >
问题:

从Supervisor重新启动后向actor发送消息

蓟俊杰
2023-03-14

我正在使用BackoffSupervisor策略来创建一个必须处理某些消息的子参与者。我想实现一个非常简单的重启策略,其中在发生异常时:

>

  • 子级将失败消息传播给主管
  • supervisor重新启动子程序并再次发送失败消息。

    主管重试3次后放弃

    val childProps = Props(new SenderActor())
    val supervisor = BackoffSupervisor.props(
      Backoff.onFailure(
        childProps,
        childName = cmd.hashCode.toString,
        minBackoff = 1.seconds,
        maxBackoff = 2.seconds,
        randomFactor = 0.2 
      )
        .withSupervisorStrategy(
          OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
            case msg: MessageException => {
              println("caught specific message!")
              SupervisorStrategy.Restart
            }
            case _: Exception => SupervisorStrategy.Restart
            case _              ⇒ SupervisorStrategy.Escalate
          })
    )
    
    val sup = context.actorOf(supervisor)
    
    
    sup ! cmd
    
    class SenderActor() extends Actor {
    
      def fakeSendMail():Unit =  {
        Thread.sleep(1000)
        throw new Exception("surprising exception")
      } 
    
      override def receive: Receive = {
        case cmd: NewMail =>
    
          println("new mail received routee")
          try {
            fakeSendMail()
          } catch {
            case t => throw MessageException(cmd, t)
          }
    
      }
    }
    
    class SenderActor() extends Actor {
    
      def fakeSendMail():Unit =  {
        Thread.sleep(1000)
        //    println("mail sent!")
        throw new Exception("surprising exception")
      }
    
      override def preStart(): Unit = {
        println("child starting")
      }
    
    
      override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
        reason match {
          case m: MessageException => {
            println("aaaaa")
            message.foreach(self ! _)
          }
          case _ => println("bbbb")
        }
      }
    
      override def postStop(): Unit = {
        println("child stopping")
      }
    
      override def receive: Receive = {
        case cmd: NewMail =>
    
          println("new mail received routee")
          try {
            fakeSendMail()
          } catch {
            case t => throw MessageException(cmd, t)
          }
    
      }
    }
    

    这给出了类似于以下输出的内容:

    new mail received routee
    caught specific message!
    child stopping
    [ERROR] [01/26/2018 10:15:35.690]
    [example-akka.actor.default-dispatcher-2]
    [akka://example/user/persistentActor-4-scala/$a/1962829645] Could not
    process message sample.persistence.MessageException:
    Could not process message <stacktrace>
    child starting
    

    但是没有来自prerestart钩子的日志

  • 共有1个答案

    归鸿朗
    2023-03-14

    没有调用子级的preRestart钩子的原因是backoff.onfailure在封面下面使用backoffonrestartsupervisor,这将默认的重新启动行为替换为与退避策略一致的停止和延迟启动行为。换句话说,当使用backoff.onfailure时,当重新启动子程序时,不会调用该子程序的prerestart方法,因为基础主管实际上停止了该子程序,然后在以后再次启动它。(使用backoff.onstop可以触发子级的prerestart钩子,但这与当前的讨论无关。)

    backoffsupervisorAPI不支持在supervisor的子程序重新启动时自动重发消息:您必须自己实现此行为。重试消息的一个方法是让backoffsupervisor的supervisor处理它。例如:

    val supervisor = BackoffSupervisor.props(
      Backoff.onFailure(
        ...
      ).withReplyWhileStopped(ChildIsStopped)
      ).withSupervisorStrategy(
        OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
          case msg: MessageException =>
            println("caught specific message!")
            self ! Error(msg.cmd) // replace cmd with whatever the property name is
            SupervisorStrategy.Restart
          case ...
        })
    )
    
    val sup = context.actorOf(supervisor)
    
    def receive = {
      case cmd: NewMail =>
        sup ! cmd
      case Error(cmd) =>
        timers.startSingleTimer(cmd.id, Replay(cmd), 10.seconds)
        // We assume that NewMail has an id field. Also, adjust the time as needed.
      case Replay(cmd) =>
        sup ! cmd
      case ChildIsStopped =>
        println("child is stopped")
    }
    

    在上面的代码中,嵌入在MessageException中的newmail消息被包装在一个自定义case类中(以便于将其与“普通”/newnewmail消息区分开来),并发送给self。在此上下文中,self是创建backoffsupervisor的参与者。然后,这个封闭的参与者使用一个计时器在某个时刻重播原始消息。将来这个时间点应该足够长,这样backoffsupervisor可能会耗尽senderactor的重新启动尝试,以便子节点在接收到resent消息之前有足够的机会进入“良好”状态。显然,这个示例只涉及一个消息重发,而不管子重启的次数如何。

    另一种方法是为每个newmail邮件创建一个backoffsupervisor-senderactor对,并让senderactorprestart钩子中将newmail消息发送到其自身。这种方法的一个问题是清理资源;即,当处理成功或子级重新启动程序耗尽时,关闭backoffsupervisors(这将依次关闭它们各自的senderactor子级)。在这种情况下,NewmailID到(ActorRef,Int)元组的映射(其中ActorRef是对backoffsupervisor参与者的引用,Int是重启尝试的次数)将很有帮助:

    class Overlord extends Actor {
    
      var state = Map[Long, (ActorRef, Int)]() // assuming the mail id is a Long
    
      def receive = {
        case cmd: NewMail =>
          val childProps = Props(new SenderActor(cmd, self))
          val supervisor = BackoffSupervisor.props(
            Backoff.onFailure(
              ...
            ).withSupervisorStrategy(
              OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
                case msg: MessageException =>
                  println("caught specific message!")
                  self ! Error(msg.cmd)
                  SupervisorStrategy.Restart
                case ...
              })
          )
          val sup = context.actorOf(supervisor)
          state += (cmd.id -> (sup, 0))
    
        case ProcessingDone(cmdId) =>
          state.get(cmdId) match {
            case Some((backoffSup, _)) =>
              context.stop(backoffSup)
              state -= cmdId
            case None =>
              println(s"${cmdId} not found")
          }
    
        case Error(cmd) =>
           val cmdId = cmd.id
           state.get(cmdId) match {
             case Some((backoffSup, numRetries)) =>
               if (numRetries == 3) {
                 println(s"${cmdId} has already been retried 3 times. Giving up.")
                 context.stop(backoffSup)
                 state -= cmdId
               } else
                 state += (cmdId -> (backoffSup, numRetries + 1))
             case None =>
               println(s"${cmdId} not found")
           }
    
        case ...
      }
    }
    

    请注意,上面示例中的senderactornewmailactorref作为构造函数参数。后一个参数允许senderactor向包含的actor发送自定义processingdone消息:

    class SenderActor(cmd: NewMail, target: ActorRef) extends Actor {
      override def preStart(): Unit = {
        println(s"child starting, sending ${cmd} to self")
        self ! cmd
      }
    
      def fakeSendMail(): Unit = ...
    
      def receive = {
        case cmd: NewMail => ...
      }
    }
    

    显然,SenderActor被设置为每次使用FakeSendMail的当前实现都失败。我将在senderactor中保留所需的其他更改,以实现愉快的路径,在该路径中,senderactortarget发送processingdone消息。

     类似资料:
    • spring-boot消费者微服务无法在kafka重新启动后向主题发送消息。 > 消费者和生产者(spring boot微服务)位于同一覆盖网络“Net-Broker”上,因此它们使用服务名“kafka:9092”访问kafka。 一开始一切都很顺利。 然后kafka仅被重新启动,在此之后,消费者不能再从kafka主题发送消息。 由于docker-compose.yml中的一个小更改(例如max_

    • 我是storm的新手,当我提交拓扑主管日志时 配置是 zookeeper的版本为 结果喷口不能发出消息和螺栓也有什么我可以分享它来解决这个问题吗?谁能帮忙?!

    • 在官方的akka 2.0.4文档中,它说: actor重新启动只替换实际的actor对象;邮箱的内容不受重新启动的影响,因此在postRestart钩子返回后,将继续处理邮件。不会再收到触发异常的消息。在重新启动时发送给参与者的任何消息都将像往常一样排队到其邮箱。 我唯一能想到的是,如果消息由于某种原因出现畸形,它将永远不会离开系统,并导致演员定期重新启动...

    • 我正在使用kafkapython来消费来自kafka队列(kafka版本0.10.2.0)的消息。特别是我使用的是KafkaConsumer类型。如果消费者停止,并且在一段时间后重新启动,我希望从最新生成的消息重新启动,即删除消费者停止时生成的所有消息。我怎样才能做到这一点? 谢谢

    • 我有一个可流动的工作流,我需要在某个阶段暂停,然后根据JMS消息从JMS监听器重新启动相同的工作流。

    • 我在AWS中有一个简单的t2实例,我必须更改RDS DB实例运行的默认时区;由于没有反映任何更改,我决定重新启动RDS实例。 一旦发生这种情况,运行我的的EC2实例开始失败,应用程序返回一个错误,试图连接到数据库。 我还决定重新启动EC2实例(我是AWS的新手)。 在此之后,我无法访问我的webapp,所以我继续通过SSH和连接到EC2实例。 几分钟后,我注意到webapp无法工作,并在一个空白页