当前位置: 首页 > 知识库问答 >
问题:

Akka演员工作移交与未来

松高歌
2023-03-14

我试图在akka演员之间建立一个信息传递过程,代表主人给工人一份工作,并密切关注它。我的问题是

  1. 我在下面提出的是一个合理的方法,以及
  2. 即使不是,我也想知道如何通过期货的组成来正确完成它,为了我的未来教育。

我想要的过程是这样的

1)Master用ask将工作发送给Worker。它希望在5秒内得到回复,否则它认为工人失去了机会,它将不得不再次进入竞标。

import context.dispatcher
implicit val timeout = Timeout(5 seconds)
val workCompletedFuture = (worker ? WorkTicket(work)).mapTo[Future[WorkCompleted]]

2a)如果工人在5秒内没有响应,我希望主人给自己发送一条消息,说明重新分配工作。

self ! WorkAllocationFailed(work, worker)

2b)如果工人确实回复了,那么它会给我们一个未来[工作完成]。我想等待这个未来完成最多,比如,2分钟。

3a)如果未来[工作完成]未能在超时内完成,则重新分配工作

self ! WorkFailed(work, worker)

3b)如果未来[工作完成]成功,则收集结果

我曾尝试创建此逻辑,但嵌套的onComplete让我陷入了困境,我不知道如何在将来[WorkCompleted]执行超时。我试着阅读Akka 2.10期货文档,但找不到解决方案。

共有2个答案

仉姚石
2023-03-14

总的来说,你有一个将工作交给一群工人的大师,这是一个很好的模式。

另一方面,当你的系统的所有部分都已经是演员时,我不建议使用期货。您可以通过tell发送作品,而不是使用ask提交作品。然后,主服务器可以定期检查超时的作业,并再次重新提交它们。

此外,在Actor的主体中调用on完成是非常危险的,因为它在可能不同的线程上执行。与Actor通信的安全方式是通过消息传递。如果您有一个Future并且您希望在Future完成后在Actor中执行某些事情,最好使用管道模式。

代码段中还有一个小错误。如果您的worker-actor回复为WorkCompleted,那么这就是您真正想要的行:

val workCompletedFuture = (worker ? WorkTicket(work)).mapTo[WorkCompleted]
洪飞龙
2023-03-14

我同意Endre的回答-所有这些都是非常好的观点。

怎么样:

1) 为自己安排一条超时消息(使用system.Schedule.scheduleOnce

2)使用常规告诉将工作消息发送给工人

3a)如果已完成的工作在超时消息之前返回,请取消计划的工作,并使用步骤1和2重新分配工作

3b) 如果已完成的工作在超时消息后返回,则忽略它或取消重新分配的工作。

未来可能有帮助的一个地方是在工人身上,特别是如果工作需要很长时间或被阻止。工作人员可以使用将来来完成工作,并保持可用以处理更多传入消息,例如取消工作。

 类似资料:
  • 我很想知道调整大小,或者在本例中增加单个节点系统上的actor池中actor的数量是否真的会影响性能。 我有一个带超线程的四核系统。在任何给定的点上,系统可以运行8个线程。假设执行元执行的大多数操作都是CPU绑定的,那么将池中的执行元数量从20个增加到40个会有什么收获呢?

  • 升级到Apache Flink 1.4.2后,我们每几秒钟就会在3个TaskManager中的一个上出现以下错误。 相应的TaskManager会消失并重新出现在集群中,但无法在其上成功运行任何作业。 尝试在此TaskManager上运行作业时 此外,TaskManager似乎没有向JobManager发送心跳响应 从零开始重新创建了具有相同结果的集群。1.3.2没有发生这种情况 这是什么原因造成

  • [04/27/2014 18:09:05.518][ReadScheduler-Akka.actor.Default-Dispatcher-3][Akka://ReadScheduler/User/Collector]从参与者[Akka://ReadScheduler/User/Executor#2127791644]到参与者[Akka://ReadScheduler/User/Collector

  • java.util.concurrent.CompletionException:Akka.Pattern.AskTimeoutException:收件人[Actor[akka:/web_server/user/MyActor#-769383443]]已终止。发送者[null]发送了类型为“com.data.model.request”的消息。 所以我重写了方法,在那里添加了一个log语句。 现在

  • 由于akka是一个构建actor模型的工具包,而actor模型是运行在JVM内部的对象,那么创建后的actor对象的可靠性有多高。考虑到让它崩溃的性质,除非像使用毒丸或JVM关闭一样显式地杀死它,否则JVM中actor对象的可靠性有多高,actor不是自己全部杀死的。

  • 我经常发现自己使用一个“主”角色,为子任务创建许多子角色。当子任务完成时,主角也应该停止自己。所以当时,我观察子角色并停止主角色context.children.is。 我经常使用这种模式,但因为我从未读过这方面的文章。我不确定,这是一个好主意还是失败的演员有问题。。。? 我已经读过Akka 2中的关机模式,但是这种方法在Java中似乎比我的解决方案更复杂? 以下是我针对具有两个子任务的主要参与者