当前位置: 首页 > 知识库问答 >
问题:

在失败的情况下解决ask的Akka期货问题

经俊茂
2023-03-14

我在一个Spray应用程序中使用ask模式调用一个Actor,并将结果作为HTTP响应返回。我将故障从参与者映射到自定义错误代码。

val authActor = context.actorOf(Props[AuthenticationActor])

callService((authActor ? TokenAuthenticationRequest(token)).mapTo[LoggedInUser]) { user =>
  complete(StatusCodes.OK, user)
}

def callService[T](f: => Future[T])(cb: T => RequestContext => Unit) = {
 onComplete(f) {
  case Success(value: T) => cb(value)
  case Failure(ex: ServiceException) => complete(ex.statusCode, ex.errorMessage)
  case e => complete(StatusCodes.InternalServerError, "Unable to complete the request. Please try again later.")
  //In reality this returns a custom error object.
 }
}

当authActor发送一个失败时,这可以正常工作,但是如果authActor抛出一个异常,则在请求超时完成之前什么也不会发生。例如:

override def receive: Receive = {
  case _ => throw new ServiceException(ErrorCodes.AuthenticationFailed, "No valid session was found for that token")
}
override def receive: Receive = {
case default =>
  try {
    default match {
      case _ => throw new ServiceException("")//Actual code would go here
    }
  }
  catch {
    case se: ServiceException =>
      logger.error("Service error raised:", se)
      sender ! Failure(se)
    case ex: Exception =>
      sender ! Failure(ex)
      throw ex
  }
}

这样,如果它是预期的错误(即ServiceException),则通过创建一个失败来处理它。如果它是意外的,它会立即返回一个失败,以便解决未来的问题,但随后抛出异常,以便仍然可以由SupervisorStrategy处理。

共有1个答案

韦业
2023-03-14

如果您想要一种方法,在意外异常的情况下自动向发件人发送回响应,那么类似这样的方法可以为您工作:

trait FailurePropatingActor extends Actor{
  override def preRestart(reason:Throwable, message:Option[Any]){
    super.preRestart(reason, message)
    sender() ! Status.Failure(reason)
  }
}

我们重写prerestart,并将失败作为状态传播回发送方。failure,这将导致上游future失败。另外,在这里调用super.preRestart也很重要,因为这是发生子停止的地方。在演员中使用这个类似于以下内容:

case class GetElement(list:List[Int], index:Int)
class MySimpleActor extends FailurePropatingActor {  
  def receive = {
    case GetElement(list, i) =>
      val result = list(i)
      sender() ! result
  }  
}

如果我像这样调用这个actor的实例

import akka.pattern.ask
import concurrent.duration._

val system = ActorSystem("test")
import system.dispatcher
implicit val timeout = Timeout(2 seconds)
val ref = system.actorOf(Props[MySimpleActor])
val fut = ref ? GetElement(List(1,2,3), 6)

fut onComplete{
  case util.Success(result) => 
    println(s"success: $result")

  case util.Failure(ex) => 
    println(s"FAIL: ${ex.getMessage}")
    ex.printStackTrace()    
}     
class MyBadFutureUsingActor extends FailurePropatingActor{
  import context.dispatcher

  def receive = {
    case GetElement(list, i) => 
      val orig = sender()
      val fut = Future{
        val result = list(i)
        orig ! result
      }      
  } 
}
class MyGoodFutureUsingActor extends FailurePropatingActor{
  import context.dispatcher
  import akka.pattern.pipe

  def receive = {
    case GetElement(list, i) => 
      val fut = Future{
        list(i)
      }

      fut pipeTo sender()
  } 
}
class MyGoodFutureUsingActor extends FailurePropatingActor{
  import context.dispatcher
  import akka.pattern.pipe

  def receive = {
    case GetElement(list, i) => 
      val fut = Future{
        list(i)
      }

      fut.to(self, sender())

    case d:Double =>
      sender() ! d * 2

    case Status.Failure(ex) =>
      throw ex
  } 
}

如果该行为变得常见,您可以将其提供给任何需要它的参与者,如下所示:

trait StatusFailureHandling{ me:Actor =>
  def failureHandling:Receive = {
    case Status.Failure(ex) =>
      throw ex      
  }
}

class MyGoodFutureUsingActor extends FailurePropatingActor with StatusFailureHandling{
  import context.dispatcher
  import akka.pattern.pipe

  def receive = myReceive orElse failureHandling

  def myReceive:Receive = {
    case GetElement(list, i) => 
      val fut = Future{
        list(i)
      }

      fut.to(self, sender())

    case d:Double =>
      sender() ! d * 2        
  } 
}  
 类似资料:
  • 我有一个有两条消息的参与者,第一个负责在mongoDB中插入数据,第二个参与者负责在elasticsearch、InserInMongo和Inserins中插入数据。也就是说,当mongoDB插入操作失败或ES插入操作因某些异常而失败时,会出现这种情况,我正在做类似的事情 在这里,我想如果mongoFuture失败,那么我抓住它的异常,它应该继续与esFuture 或者如果两个未来都失败了,我得到

  • 本文向大家介绍MySQL插入emoji表情失败问题的解决方法,包括了MySQL插入emoji表情失败问题的解决方法的使用技巧和注意事项,需要的朋友参考一下 前言 之前一直认为UTF-8是万能的字符集问题解决方案,直到最近遇到这个问题。最近在做新浪微博的爬虫, 在存库的时候发现只要保持emoji表情,就回抛出以下异常: 众所周知UTF-8是3个字节, 其中已经包括我们日常能见过的绝大多数字体. 但3

  • 我正在与我无法控制的遗留库集成。 它定义了以下接口: 这个“subscribe”方法被不同的线程频繁调用。我关心“Future.get()”的结果的唯一情况是当它失败时,所以我可以获取并处理异常。这不一定发生在调用线程中。另外,在“Future.get()”上阻塞调用线程对我来说是非常昂贵的,因为即使成功也可能需要几秒钟才能完成。 所以我的任务是以某种方式“后处理”所有这些期货,过滤失败的期货。基

  • null 这是一个好的解决方案还是有更好的方法?

  • 本文向大家介绍MySQL下常见的启动失败与备份失败问题的解决教程,包括了MySQL下常见的启动失败与备份失败问题的解决教程的使用技巧和注意事项,需要的朋友参考一下 启动失败 重启服务器后-->重启应用服务(Confluence)-->报错,数据库连接失败(mysql设置了开机自启动)-->查看mysql数据库状态: 启动mysql服务器 查看错误日志: 未发现明显性错误提示,所以手动创建一个pid

  • '''System.SetProperty(“webdriver.chrome.driver”,“c://users//naqdaq//downloads//chromedriver_win32//chromedriver.exe”);WebDriver驱动程序=新ChromeDriver(); '''