当前位置: 首页 > 知识库问答 >
问题:

什么是正确的方式来链Akka演员问电话?

谭宏盛
2023-03-14
  val system = ActorSystem("ActionSystem")
  val actorARouter = system.actorOf(Props[ActionParser].withRouter(
    SmallestMailboxRouter(Runtime.getRuntime.availableProcessors())), name = "actorARouter")
  val actorBRouter = system.actorOf(Props[ActionDispatcher].withRouter(
    SmallestMailboxRouter(Runtime.getRuntime.availableProcessors())), name = "actorBRouter")
(actorARouter ? request.body.asJson.get).map {
      case m: controllers.HttpMessages.OK => Ok(m.body)
      case m: controllers.HttpMessages.HttpResponse => Status(m.status)(m.body)
    }

然后,Actor A将json解析为对象序列,然后通过ask将它们发送给Actor B。Actor B应该最终通过将它们发送给其他Actor来处理这些对象,但目前只是返回一般的响应。

ActorA将来会接收到一般的响应,然后解析到JSON,然后通过OK响应返回给控制器。或者至少这是应该发生的。

发生了什么:控制器发送到ActorA,ActorA发送到Actorb。ActorB向Actora发送一般响应。ActorA将一般响应解析为JSON并尝试执行sender!OK(json)但是我在控制台中得到一条消息,说它没有被传递,因为它是一封“一纸空文”。当我调试到它时,当我查看sender时,sender是对actorAkka://actionsystem/deadletters的引用

  1. 显然我做错了什么。也许我不该把这些演员的反应像这样联系在一起。我再次提到,我只打算通过让ActorB向其他参与者发送请求来进一步实现这一点。
  2. 当我在actor中执行请求时,它不会占用线程并阻止它在等待响应时处理其他消息,是吗?

编辑:我发现我可以保存一个对发件人的引用,以备以后使用,然后发送到那个,这似乎解决了死信的问题。但我仍然很不确定这是否是正确的做事方式。感觉每次我添加另一层演员时,响应时间就会增加10毫秒。也许这是由于其他因素。

共有1个答案

松和泰
2023-03-14

如果不查看您的代码,我无法真正评论是什么导致了死信,从您的编辑中我猜您关闭了sender(),而不是将它赋给一个变量并关闭该变量。

回答您的问题:

  1. 如果只使用fire-and-forget消息,那么用Actor构造消息流要容易得多。ask模式在某些情况下很有用,但大多数时候应该尽量避免。您可以通过使用forward而不是tell将原始发件人传递给您的参与者。这样,可以由消息流中的最后一个参与者生成响应。第一个参与者只需要处理响应的代码,而不需要关心生成响应。很好地分离了关注点。如果您需要聚合多个响应以便之后发送单个响应,您还可以使用一个临时参与者,所有其他参与者都将向其发送响应,并且该参与者知道最初的发送者。临时演员在完成工作后需要停止。
  2. 据我所知,ask模式是异步的,并且在内部使用临时参与者。但是,如果在actor中等待将来的结果,则会阻塞该actor,并且它将无法处理进一步的消息。使用ask模式的一个很好的方法是与pipeTo模式结合使用,您可以使用pipeTo模式将ask的结果发送给actor(通常是self)
 类似资料:
  • 假设我有一个参与者,负责根据某个键将消息路由到子参与者集合,因此其内部状态如下所示: 除了路由消息外,父参与者还必须支持添加和删除操作: 希望上面的代码足以让我大致了解我要做的事情。请注意,我使用map键作为子参与者的名称。问题在于,上述模式无法处理同一个键的添加、删除和添加消息的快速连续情况-它通常在第二次添加时失败,原因是: 显然,在Remove messages上停止子参与者是异步的,这就是

  • 假设我有一个IO Actor,能够通过TCP发送和接收消息。在我的演员中,我要求连接的另一方做出回应: 使用此代码,ask future超时,而包含参与者接收ask模式之外的原始消息。 你能在Akka IO演员身上使用ask模式吗?若否,原因为何?

  • 我所做的: null 更新:我发现了问题;它是由于通过池消耗Redis连接而导致的,并且从未释放它们。但我还有第二个问题,我在这里做的事情明智吗?

  • 我从这里得到上面的错误消息: 特别是从第二行。。进口是 akka版本是2.2.1,scala是2.10.2,我正在使用sbt 0.13来构建它。 编辑:我用 结果如下:

  • 我很想知道调整大小,或者在本例中增加单个节点系统上的actor池中actor的数量是否真的会影响性能。 我有一个带超线程的四核系统。在任何给定的点上,系统可以运行8个线程。假设执行元执行的大多数操作都是CPU绑定的,那么将池中的执行元数量从20个增加到40个会有什么收获呢?

  • 我试图在akka演员之间建立一个信息传递过程,代表主人给工人一份工作,并密切关注它。我的问题是 我在下面提出的是一个合理的方法,以及 即使不是,我也想知道如何通过期货的组成来正确完成它,为了我的未来教育。 我想要的过程是这样的 1)Master用将工作发送给Worker。它希望在5秒内得到回复,否则它认为工人失去了机会,它将不得不再次进入竞标。 2a)如果工人在5秒内没有响应,我希望主人给自己发送