当前位置: 首页 > 知识库问答 >
问题:

Scala Akka演员,询问模式,发送回复时遇到的死信

夏高朗
2023-03-14

我正在尝试使用ask模式向远程参与者发送请求。本地actor接收一些值,并对其执行一些任务并更新它。然后,当本地参与者试图将更新后的值发送回远程参与者时,在发送时发生错误。我应该如何处理这个错误?

错误:[INFO][03/31/2017 17:28:18.383][ClientSystem-Akka.actor.Default-Dispatcher-3][Akka://ClientSystem/Deadletters]从参与者[Akka://ClientSystem/user/locala1#1050660737]到参与者[Akka://ClientSystem/Deadletters]的消息[check.package$RCVDCXT]未送达。[1]遇到的一纸空文。

Remote Actor:

class RemoteActor() extends Actor {

    def receive = { 


    case TaskFromLocal() =>{
        implicit val timeout: Timeout = 15000
        val currentSender = sender
        val f1 = currentSender ? RemoteActor.rtree.cxtA
            f1.onComplete{
            case Success(Rcvdcxt(cxtA))=>
                println("Success"+cxtA)
            case Success(s) =>
                    println("Success :"+s)
            case Failure(ex) =>
                    println("failure:"+ex)
            }
        }


  case _ => println("unknown msg")
  }
}
object RemoteActor{

    def createRndCxtC(count: Int):List[CxtC] = (for (i <- 1 to count) yield CxtC(Random.nextString(5), Random.nextInt())).toList
    def createRndCxtB(count: Int): List[CxtB] = (for (i <- 1 to count) yield CxtB(createRndCxtC(count), Random.nextInt())).toList
    def createRndCxtA(count: Int): List[CxtA] = (for (i <- 1 to count) yield CxtA(createRndCxtC(count), 5)).toList
    var rtree = RCxt(createRndCxtA(1),createRndCxtB(2),1,"")

    def main(args: Array[String]) {     
        val configFile = getClass.getClassLoader.getResource("remote_application.conf").getFile
        val config = ConfigFactory.parseFile(new File(configFile))
        val system = ActorSystem("RemoteSystem" , config)
        val remoteActor = system.actorOf(Props[RemoteActor], name="remote")
        println("remote is ready")
   }   
 }



Local Actor :


class LocalActorA extends Actor{    

  @throws[Exception](classOf[Exception])
  val remoteActor = context.actorSelection("akka.tcp://RemoteSystem@127.0.0.1:5150/user/remote")

  def receive = {   



    case TaskLA1(taskA) =>  {

    implicit val timeout: Timeout = 15000
      val rCxt = remoteActor ? TaskFromLocal()   
      val currentSender = sender
      rCxt.onComplete{
            case Success(Rcvdcxt(cxtA))=>
              println("Success"+cxtA)
              println("Sender: "+ sender)
              currentSender ! Rcvdcxt(cxtA)

            case Success(s)=>
              println("Got nothing from Remote"+s)
              currentSender ! "Failuree"

            case Failure(ex) =>
              println("Failure in getting remote")
              currentSender ! "Failure"
            }
    }
  }
}

object LocalActorA {    


    def createRndCxtC(count: Int):List[CxtC] = (for (i <- 1 to count) yield CxtC(Random.nextString(5), Random.nextInt())).toList
    def createRndCxtB(count: Int): List[CxtB] = (for (i <- 1 to count) yield CxtB(createRndCxtC(count), Random.nextInt())).toList
    def createRndCxtA(count: Int): List[CxtA] = (for (i <- 1 to count) yield CxtA(createRndCxtC(count), 3)).toList
    var tree = RCxt(createRndCxtA(2),createRndCxtB(2),1,"")

   def main(args: Array[String]) {
    val configFile = getClass.getClassLoader.getResource("local_application.conf").getFile
    val config = ConfigFactory.parseFile(new File(configFile))
    val system = ActorSystem("ClientSystem",config)
    val localActorA1 = system.actorOf(Props[LocalActorA], name="localA1")
    println("LocalActor A tree : "+tree)
    localActorA1 ! TaskLA1(new DummySum())
   }
}

共有1个答案

阎自怡
2023-03-14

由于您没有发布所有代码,所以我不能确切地说出错误,但我最好的猜测与您在LocalActor的oncomplete中调用sender有关。这是不安全的,应不惜一切代价加以避免。相反,对远程参与者执行类似的操作:

class LocalActor {
  def receive = {
    case TaskLA1(taskA) =>
      val currentSender = sender
      rCxt.onComplete {
        case Success(Rcvdcxt(cxtA))=>
          currentSender ! Rcvdcxt(cxtA)
          ...
      }
  }
}
 类似资料:
  • 假设我有一个演员树,它做一堆处理。处理由客户机/连接执行元启动(即,树是服务器)。最终客户端执行元需要响应。即。我有一个演员系统看起来是这样的。 客户端系统需要的响应是叶参与者的输出(即和/或)。树中的这些参与者可能正在与外部系统交互。此树可能是一组预定义的可能路由的参与者(即,因此只有一个actorref指向参与者树的根)。 问题是管理将响应(&或)从final/leaf参与者发送回客户端参与者

  • [04/27/2014 18:09:05.518][ReadScheduler-Akka.actor.Default-Dispatcher-3][Akka://ReadScheduler/User/Collector]从参与者[Akka://ReadScheduler/User/Executor#2127791644]到参与者[Akka://ReadScheduler/User/Collector

  • 假设我有以下字符串: 我需要从这个字符串中提取以下数据: 标题1 F (S#H88(P#M6)(P#M31))和(S#K3(P#M58)(P#M58)) 而且 标题2 P (S#A54(P#R8))和(S#V59(P#A25)(P#Y82)) 哪里 是某种标题。 是某种状态。 是列表的某种列表,如 由于regex知识有限,我可以得到1和2,但只能得到3的第一部分。 可以存在多次,内部(P#xx)也

  • 主要的类别是: 路由的执行元类为:

  • 我不太熟悉OpenLDAP,但我试图让一个本地实例工作,以便在开发中测试一个客户机,该客户机需要memberOf属性。 根据http://www.OpenLDAP.org/doc/admin24/guide.html#的说明,我已经在Centos7虚拟机上下载并安装了OpenLDAP,这是一个快速入门指南。我没有使用包管理来安装它。因为我使用的是2.4.45,所以它使用的是cn=config OL

  • 假设我有一个IO Actor,能够通过TCP发送和接收消息。在我的演员中,我要求连接的另一方做出回应: 使用此代码,ask future超时,而包含参与者接收ask模式之外的原始消息。 你能在Akka IO演员身上使用ask模式吗?若否,原因为何?