当前位置: 首页 > 知识库问答 >
问题:

告诉发送者参与者在多次重试后失败的最佳方法是什么

咸晨
2023-03-14

我有父母 -

/**
 * An upload message.
 *
 * @param byte The byte array representing the content of a file.
 * @param path The path under which the file should be stored.
 */
case class UploadMsg(byte: Array[Byte], path: String)

/**
 * The upload supervisor.
 */
class UploadSupervisor extends Actor {

  /**
   * Stores the sender to the executing actor.
   */
  val senders: ParHashMap[String, ActorRef] = ParHashMap()

  /**
   * Defines the supervisor strategy
   */
  override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
    case _: DbxException => Restart
    case e: Exception => Stop
  }

  /**
   * Handles the received messages.
   */
  override def receive: Actor.Receive = {
    case msg: UploadMsg =>
      implicit val timeout = Timeout(60.seconds)

      val actor = context.actorOf(PropsContext.get(classOf[UploadActor]))
      senders += actor.path.toString -> sender
      context.watch(actor)
      ask(actor, msg).mapTo[Unit] pipeTo sender

    case Terminated(a) =>
      context.unwatch(a)
      senders.get(a.path.toString).map { sender =>
        sender ! akka.actor.Status.Failure(new Exception("Actor terminated"))
        senders - a.path.toString
      }
  }
}

/**
 * An aktor which uploads a file to Dropbox.
 */
class UploadActor @Inject() (client: DropboxClient) extends Actor {

  /**
   * Sends the message again after restart.
   *
   * @param reason The reason why an restart occurred.
   * @param message The message which causes the restart.
   */
  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    super.preRestart(reason, message)
    message foreach { self forward }
  }

  /**
   * Handles the received messages.
   */
  override def receive: Receive = {
    case UploadMsg(byte, path) =>
      val encrypted = encryptor.encrypt(byte)
      val is = new ByteArrayInputStream(encrypted)
      try {
        client.storeFile("/" + path, DbxWriteMode.force(), encrypted.length, is)
        sender ! (())
      } finally {
        is.close()
      }
  }
}

我现在的问题是:有没有一个更好的解决方案来告诉我的应用程序,在指定的次数或重试次数后,上传参与者失败了。尤其是为演员存储发送者的地图感觉有点别扭。

共有1个答案

杜俭
2023-03-14

你应该用断路器

val breaker =
 new CircuitBreaker(context.system.scheduler,
  maxFailures = 5,
  callTimeout = 10.seconds,
  resetTimeout = 1.minute)

然后通过断路器包装您的消息:

sender() ! breaker.withSyncCircuitBreaker(dangerousCall)

断路器有三种状态:闭合、断开和半断开。当消息failure$maxFailures times状态更改为Open时,正常状态为Closed。断路器为状态更改提供回调。如果你想做什么,就用它。例如:

breaker onOpen { sender ! FailureMessage()}
 类似资料:
  • 问题内容: 伪代码: 更好的是某种指数补偿 问题答案: 像这样:

  • 例如,我有两个演员——一个家长演员和一个孩子演员。当父级收到消息时,它会产生消息中指定的尽可能多的子角色。如何测试此功能?有没有一种方法可以模拟上下文,或者其他一些方法来检查参与者的创建是否正确和数量是否正确? 更新:基于@Tim answer的解决方案 更改类别: 测试:

  • 问题内容: 假设我有两个或两个以上相同长度的列表。遍历它们的好方法是什么? ,是列表。 要么 还是我缺少任何变体? 使用一个相对于另一个有什么特别的优势吗? 问题答案: 通常的方法是使用: 这将停止两个iterables时较短且耗尽。另外值得注意的是:(仅适用于Python 2)和(适用于Python 3)。

  • 我希望避免给终止的参与者发送死信消息,并避免向该参与者发送消息 actorSelection没有像我预期的那样工作,从ping发送的最后一条消息仍然以一纸空文告终: [信息][09/16/2016 00:47:46.237][MyActorSystem-Akka.Actor.Default-Dispatcher-4][Akka://MyActorSystem/User/PingActor/Pong

  • 我将python kafka consumer的< code>auto_commit设置为< code>False,我正在手动提交消息。然而,重启后,消费者再次消费来自每个分区的最后一条消息。只有最后一个,不能再多。 这就是所展示的: 不知道为什么会显示滞后,whu当前偏移设置为最后一条消息而不是下一条?当我提交偏移量3时,当前偏移量不应该移动到4吗? 我提交我使用的每条消息,但是在重启时,它总是

  • 我有一个存储库层,有许多方法组合,以匹配搜索标准…重用这个标准的最佳方法是什么?我认为像FindByNameAndidandBirthdayandAccouncAnumber这样的方法名称不是一个好主意!谢了! }