当前位置: 首页 > 知识库问答 >
问题:

喷雾路线得到儿童演员的响应

佘茂才
2023-03-14
object Boot extends App {

  // we need an ActorSystem to host our application in
  implicit val system = ActorSystem("on-spray-can")

  // create and start our service actor
  val service = system.actorOf(Props[DemoServiceActor], "demo-service")

  implicit val timeout = Timeout(5.seconds)
  // start a new HTTP server on port 8080 with our service actor as the handler
  IO(Http) ? Http.Bind(service, interface = "localhost", port = 8080)
}
class DemoServiceActor extends Actor with Api {

  // the HttpService trait defines only one abstract member, which
  // connects the services environment to the enclosing actor or test
  def actorRefFactory = context

  // this actor only runs our route, but you could add
  // other things here, like request stream processing
  // or timeout handling
  def receive = handleTimeouts orElse runRoute(route)

  //Used to watch for request timeouts
  //http://spray.io/documentation/1.1.2/spray-routing/key-concepts/timeout-handling/
  def handleTimeouts: Receive = {
    case Timedout(x: HttpRequest) =>
      sender ! HttpResponse(StatusCodes.InternalServerError, "Too late")
  }


}

//Master trait for handling large APIs
//http://stackoverflow.com/questions/14653526/can-spray-io-routes-be-split-into-multiple-controllers
trait Api extends DemoService {
  val route = {
    messageApiRouting
  }
}

示范喷洒服务(路线):

trait DemoService extends HttpService with Actor  {
  implicit val timeout = Timeout(5 seconds) // needed for `?` below
  val redisActor = context.actorOf(Props[RedisActor], "redisactor")

  val messageApiRouting =
        path("summary" / Segment / Segment) { (dataset, timeslice) =>
          onComplete(getSummary(redisActor, dataset, timeslice)) {
            case Success(value) => complete(s"The result was $value")
            case Failure(ex) => complete(s"An error occurred: ${ex.getMessage}")
          }
        }

  def getSummary(redisActor: ActorRef, dataset: String, timeslice: String): Future[String] = Future {

    val dbMessage = DbMessage("summary", dataset + timeslice)
    val future = redisActor ? dbMessage
    val result = Await.result(future, timeout.duration).asInstanceOf[String]
    result
  }

}

Redis执行元(Mock还没有实际的Redis客户端)

class RedisActor extends Actor with ActorLogging {
  //  val pool = REDIS
  implicit val timeout = Timeout(5 seconds) // needed for `?` below
  val summaryActor = context.actorOf(Props[SummaryActor], "summaryactor")


  def receive = {

    case msg: DbMessage => {
      msg.query match {
        case "summary" => {
          log.debug("Summary Query Request")
          log.debug(sender.path.toString)
           summaryActor ! msg
        }
      }
    }

    //If not match log an error
    case _ => log.error("Received unknown message: {} ")
  }
}

class SummaryActor extends Actor with ActorLogging{

  def receive = {
    case msg: DbMessage =>{
      log.debug("Summary Actor Received Message")
      //Send back to Spray Route

    }
  }
}

共有1个答案

师建德
2023-03-14

代码的第一个问题是,您需要从主执行元转发到子执行元,以便发送者能够正确传播,并可供子执行元响应。因此更改此内容(在redisactor中):

summaryActor ! msg

致:

summaryActor forward msg

这是首要问题。修复它,您的代码就应该开始工作了。不过,还有一些事情需要注意。您的getsummary方法当前定义为:

def getSummary(redisActor: ActorRef, dataset: String, timeslice: String): Future[String] = 
  Future {
    val dbMessage = DbMessage("summary", dataset + timeslice)
    val future = redisActor ? dbMessage
    val result = Await.result(future, timeout.duration).asInstanceOf[String]
    result
  }
def getSummary(redisActor: ActorRef, dataset: String, timeslice: String): Future[String] = {
  val dbMessage = DbMessage("summary", dataset + timeslice)
  (redisActor ? dbMessage).mapTo[String]      
}
 类似资料:
  • 我有一个restful API,它接收一组JSON消息,这些消息将被转换为单独的Avro消息,然后发送到Kafka。在路由中,我调用3个不同的actor:1)一个actor出去,从磁盘中检索Avro模式;2)然后循环访问JSON消息数组,并将其与第二个actor中的Avro模式进行比较。如果任何消息没有验证,那么我需要返回一个响应给API的调用者并停止处理。3)循环访问数组并传递到第三个actor

  • 我对阿克卡很陌生,我有一个(希望)简单的问题。我有一个参与者需要重复执行某个小的子任务——也就是说,每次这个参与者收到消息时,它都必须执行N个子任务。这个子任务是我指定给儿童演员的。我的问题是,我是否应该为每个子任务创建一个新的子角色实例?或者我应该简单地产生一个孩子演员,并发送N条消息?在这种情况下,最好的做法是什么? 为了更好地说明我的问题,这里有两个简化的示例(在Java中-但希望对Scal

  • 我经常发现自己使用一个“主”角色,为子任务创建许多子角色。当子任务完成时,主角也应该停止自己。所以当时,我观察子角色并停止主角色context.children.is。 我经常使用这种模式,但因为我从未读过这方面的文章。我不确定,这是一个好主意还是失败的演员有问题。。。? 我已经读过Akka 2中的关机模式,但是这种方法在Java中似乎比我的解决方案更复杂? 以下是我针对具有两个子任务的主要参与者

  • 我是Akka的新手,我想知道我应该如何处理将工作委托给其他(儿童)演员的演员,但在哪里: 其中一些儿童演员必须按特定顺序参与;和 其中一些子参与者可以以任何顺序参与,并且可以真正地与主/父参与者所做的事情异步执行 假设我有以下儿童演员(不管他们做什么): 假设我有以下调用它们的父参与者: 如你所见: 必须首先“参与”(由家长调用),我们必须等待家长的响应,然后才能继续参与/调用、或 换句话说,当消

  • 让我们假设一个使用Akka Typed实现的应用程序有一个持久执行元。这个持久执行元作为其操作的一部分创建了瞬态(或非持久)子执行元,每个子执行元都有一个唯一的ID,这些ID是持久状态的一部分。持久执行元还需要一些与其子级通信的方式,但我们不希望持久化子级的,因为它们实际上不是状态的一部分。在恢复时,持久参与者应该基于恢复的状态重新创建它的子级。这听起来并不像是一个很不寻常的用例,我正在试图弄清楚

  • 我有父母和孩子。每个儿童演员控制一个装置。当我创建所有的子角色时,我给他们一个uuid。 每个子参与者都是通过设备配置创建的,当配置更改时,我希望通过以下方式重新创建参与者: > 停止旧的: 最终的未来停止了=优雅的停止(actorRef,Duration.create(1,时间单位。秒)); Await.result(停止,Duration.create(1, TimeUnit.二)); 重新创