当前位置: 首页 > 知识库问答 >
问题:

如何处理Akka执行元内部的WS调用超时

洪光霁
2023-03-14

我有以下actor向WebService发送请求:

class VigiaActor extends akka.actor.Actor {
  val log = Logging(context.system, this)

  context.setReceiveTimeout(5 seconds)

  import VigiaActor._
  def receive = {
    case ObraExists(numero: String, unidadeGestora: String) =>
      WS.url(baseURL + s"""/obras/exists/$unidadeGestora/$numero""").withHeaders("Authorization" -> newToken).get.pipeTo(sender)
    case ReceiveTimeout =>
      val e = TimeOutException("VIGIA: Receive timed out")
      throw e
  }

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 minute) {
      case _: ArithmeticException      => Resume
      case _: NullPointerException     => Restart
      case _: IllegalArgumentException => Stop
      case _: TimeOutException         => Resume       
      case _: Exception                => Restart
    }
}

对此执行元的调用是验证方法的一部分,如果尝试与ws:

implicit val timeout = Timeout(5 seconds)
lazy val vigiaActor : ActorRef = Akka.system.actorOf(Props[VigiaActor])

(vigiaActor ? VigiaActor.ObraExists(empenho.obra.get, empenho.unidadeGestora)).map {
  case r : WSResponse =>
    val exists = r.body.toBoolean

    if (!exists && empenho.tipoMeta.get.equals(4)) {
      erros.adicionarErro(controle.codigoArquivo, row, line, s"Nº de Obra não informado ou inválido para o Tipo de Meta 4 - Obras" , TipoErroImportacaoEnum.WARNING)
    }

  case _ => erros.adicionarErro(controle.codigoArquivo, row, line, s"Nº de Obra não informado ou inválido para o Tipo de Meta 4 - Obras" , TipoErroImportacaoEnum.WARNING)
}
protected def validateRow(row: Int, line: String, empenho: Empenho, calendarDataEnvioArquivo: Calendar)(implicit s: Session, controle: ControleArquivo, erros:ImportacaoException): Unit = {
    implicit val timeout = Timeout(5 seconds)
    lazy val vigiaActor : ActorRef = Akka.system.actorOf(Props[VigiaActor])

    (vigiaActor ? VigiaActor.ObraExists(empenho.obra.get, empenho.unidadeGestora)).map {
      case e: TimeOutException => println("TIMOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOUT!!!!")
      case r: WSResponse => {...}
    }
}
case ReceiveTimeout =>
  val e = TimeOutException("VIGIA: Receive timed out")
  sender ! e

与以前一样,我将收到以下日志消息:

[信息][07/20/2017 10:28:05.738][Application-Akka.Actor.Default-Dispatcher-5][Akka://Application/Deadletters]未传递从执行元[Akka://Application/User/$C#1834419855]到执行元[Akka:/Application/Deadletters]的消息[Model.Exception.TimeoutException]。[1]遇到的死信。可以使用配置设置'akka.log-dead-letters'和'akka.log-dead-letters-dider-shutdown'关闭或调整此日志记录。

共有1个答案

钱德海
2023-03-14

Context.SetReceiveTimeout(5秒)触发将ReceiveTimeout消息发送到VigiaActor(如果该执行元在5秒内没有收到消息)。Akka在内部将ReceiveTimeout发送给您的执行元,这就是为什么在更新的代码中,尝试将异常发送给Sender并没有达到您预期的效果。换句话说,case ReceiveTimeout=>子句中的sender不是ObraExists消息的原始发送方。

VigiaActor中设置接收超时与WS请求超时无关,因为如果请求超时,则不会向VigiaActor发送任何消息。即使在ws请求未在5秒内完成时向执行元发送了一条消息,也可能同时在执行元的邮箱中列队了另一条ObraExists消息,因此无法触发ReceiveTimeout

简而言之,设置执行元的接收超时不是处理ws请求超时的正确机制。(使用当前将Get请求的结果通过管道传送到发件人的方法,您可以调整发件人以处理超时。事实上,我会完全放弃VigiaActor并直接在ValidateRow方法中调用WS。但删除执行元可能不是您问题的重点。)

如果必须在执行元中处理ws请求超时,可以使用如下方法:

import scala.util.{Failure, Success}

class VigiaActor extends akka.actor.Actor {
  import VigiaActor._
  val log = Logging(context.system, this)

  def receive = {
    case ObraExists(numero: String, unidadeGestora: String) =>
      val s = sender // capture the original sender
      WS.url(baseURL + s"""/obras/exists/$unidadeGestora/$numero""")
        .withHeaders("Authorization" -> newToken)
        .withRequestTimeout(5 seconds) // set the timeout
        .get
        .onComplete {
          case Success(resp) =>
            s ! resp
          case Failure(e: scala.concurrent.TimeoutException) =>
            s ! TimeOutException("VIGIA: Receive timed out")
          case Failure(_) =>
            // do something in the case of non-timeout failures
        }
  }
}
 类似资料:
  • 我正在使用selenium java script exceuter执行下面的javascript,我想从fetch调用返回响应,并想将其存储在代码中的java变量中。但下面的代码显示了脚本超时,有什么建议吗?我怎样才能达到以上要求???

  • 我有一个基于strut的应用程序,我在其中调用我的Restful Web Service。我的实际服务调用如下所示: 呼叫通过只是罚款,但我想处理的情况下,如果我的服务是关闭,我想超时1分钟,而不是等待这么长时间。

  • 问题内容: 众所周知,代理对象时,例如为Spring / EJB创建具有事务属性的Bean时,甚至当您使用某些框架创建部分模拟时,代理对象都不知道,内部调用也不会重定向,然后也没有被拦截… 这就是为什么如果您在Spring中做类似的事情: 当您致电doSomething时,您希望除了主要交易外还有3个新交易,但是实际上,由于这个问题,您只会得到一个新交易… 所以我想知道您如何处理此类问题… 实际上

  • 我在Java web应用程序中有一个分层体系结构。UI层只是Java,服务是Akka类型的actor,外部服务调用(WS、DB等)包装在Hystrix命令中。 问题是,我想释放服务参与者正在使用的线程,而只是绑定Hystrix使用的线程。但是java的未来阻止了这一点,因为我必须阻止它的完成。我能想到的唯一选项(我不确定我喜欢)是不断地轮询Java future(一个或多个),并在Java fut

  • 我有一个基于喷雾的HTTP服务。我有一个在这个HTTP应用程序内部运行的流。现在由于这个流要做大量的I/O,所以我决定使用一个单独的线程池。我查阅了Akka文档,看看我可以做些什么来使我的线程池是可配置的。我在Akka遇到了调度器的概念。所以我尝试在application.conf中使用它如下所示: 在我的执行元中,我尝试将此配置查找为: 当我运行我的服务时,我得到以下错误:

  • 问题内容: 如何从处理程序内部正确引用路由名称? 应该全局分配而不是放在函数内部? 问题答案: 您具有返回给定请求的路由的方法。根据该请求,您可以创建一个子路由器并调用 示例:(播放:http : //play.golang.org/p/Lz10YUyP6e)