考虑经典的“字数统计”程序。它计算某个目录中所有文件的字数。Master接收一些目录,并在Worker actors之间分割作业(每个Worker处理一个文件)。这是伪代码:
class WordCountWorker extends Actor {
def receive = {
case FileToCount(fileName:String) =>
val count = countWords(fileName)
sender ! WordCount(fileName, count)
}
}
class WordCountMaster extends Actor {
def receive = {
case StartCounting(docRoot) => // sending each file to worker
val workers = createWorkers()
fileNames = scanFiles(docRoot)
sendToWorkers(fileNames, workers)
case WordCount(fileName, count) => // aggregating results
...
}
}
但是我想按计划运行这个字数统计程序(例如每1分钟),提供不同的目录进行扫描。
Akka为调度消息传递提供了很好的方式:
system.scheduler.schedule(0.seconds, 1.minute, wordCountMaster , StartCounting(directoryName))
但上述调度程序的问题始于调度程序按记号发送新消息,但之前的消息尚未处理(例如,我发送消息扫描某个大目录,1秒后发送另一条消息扫描另一个目录,因此第一个目录的处理操作尚未完成)。因此,我的WordCountMaster
将从处理不同目录的工作人员那里接收WordCount
消息。
作为一种解决方法,而不是安排消息发送,我可以安排一些代码块的执行,这将每次创建新的WordCountMaster
。即一个目录=一个WordCountMaster
。但我认为它效率低下,而且我还需要注意为WordCountMaster
提供唯一名称,以避免InvalidActorNameException
。
所以我的问题是:我应该像我在上面的段落中提到的那样为每个刻度创建新的WordCountMaster
吗?或者有一些更好的想法/模式如何重新设计这个程序来支持调度?
一些更新:在每个目录创建一个主演员的情况下,我有一些问题:
invalidatornameexception:actor名称[WordCountMaster]不唯一!
和
InvalidActorNameException:参与者名称[WordCountWorker ]不唯一!
我可以通过不提供演员姓名来克服这个问题。但在这种情况下,我的演员会收到自动生成的名字,如<code>a<code>和<code>b<code>等。这对我不好。
我想将路由器的配置排除到 application.conf
。即,我想为每个WordCountWorker
路由器提供相同的配置。但是由于我不控制演员姓名,因此我无法使用下面的配置,因为我不知道演员姓名:
/wordCountWorker{
router = smallest-mailbox-pool
nr-of-instances = 5
dispatcher = word-counter-dispatcher
}
您应该在worker中雇用“成为/不成为”功能。如果您的工作人员开始扫描大文件夹,请使用变为
来更改忽略另一条消息(或不处理该消息的响应)的参与者行为,在目录扫描后,使用字数将消息发送回,并将改为标准行为。
就个人而言,我根本不会使用Actor来解决这个聚合问题,但无论如何,这里开始了。
我认为没有一种合理的方法可以像你建议的那样同时处理多个目录的字数计算。相反,你应该有一个“大师级”演员来监督柜台。因此,您有三个actor类:
文件计数器应该是最容易编写的。
class FileCounter() extends Actor with ActorLogging {
import context.dispatcher
override def preStart = {
log.info("FileCounter Actor initialized")
}
def receive = {
case CountFile(file) =>
log.info("Counting file: " + file.getAbsolutePath)
FileIO.readFile(file).foreach { data =>
val words = data
.split("\n")
.map { _.split(" ").length }
.sum
context.parent ! FileCount(words)
}
}
}
现在是监督文件计数器的演员。
class CounterSupervisor(actorPool: Int) extends Actor with ActorLogging {
var total = 0
var files: Array[File] = _
var pendingActors = 0
override def preStart = {
for(i <- 1 to actorPool)
context.actorOf(FileCounter.props(), name = s"counter$i")
}
def receive = {
case CountDirectory(base) =>
log.info("Now counting starting from directory : " + base.getAbsolutePath)
total = 0
files = FileIO.getAllFiles(base)
pendingActors = 0
for(i <- 1 to actorPool if(i < files.length)) {
pendingActors += 1
context.child(s"counter$i").get ! CountFile(files.head)
files = files.tail
}
case FileCount(count) =>
total += count
pendingActors -= 1
if(files.length > 0) {
sender() ! CountFile(files.head)
files = files.tail
pendingActors += 1
} else if(pendingActors == 0) {
context.parent ! WordCountTotal(total)
}
}
}
然后是监督监督者的演员。
class WordCountForker(counterActors: Int) extends Actor with ActorLogging {
var busyActors: List[(ActorRef, ActorRef)] = Nil
var idleActors: List[ActorRef] = _
override def preStart = {
val first = context.actorOf(CounterSupervisor.props(counterActors))
idleActors = List(first)
log.info(s"Initialized first supervisor with $counterActors file counters.")
}
def receive = {
case msg @ CountDirectory(dir) =>
log.info("Count directory received")
val counter = idleActors match {
case Nil =>
context.actorOf(CounterSupervisor.props(counterActors))
case head :: rest =>
idleActors = rest
head
}
counter ! msg
busyActors = (counter, sender()) :: busyActors
case msg @ WordCountTotal(n) =>
val path = sender().path.toString()
val index = busyActors.indexWhere { _._1.path.toString == path }
val (counter, replyTo) = busyActors(index)
replyTo ! msg
idleActors = counter :: idleActors
busyActors = busyActors.patch(index, Nil, 1)
}
}
我在答案中遗漏了一些部分,以尽可能保持简洁,如果您想查看我发布的其余代码,请参阅要点。
另外,关于您对效率的担忧,这里的解决方案将防止每个目录有一个子系统,但是如果需要,您仍然会生成多个子系统。
我不是Akka专家,但我认为每个聚合都有一个参与者的方法并不低效。您需要以某种方式将并发聚合分开。您可以给每个聚合一个id,以便在唯一的主参与者中用id分隔它们,或者您可以使用Akka参与者命名和活动周期逻辑,并将每个计数轮的每个聚合委托给一个仅为该聚合逻辑而存在的参与者。
对我来说,每个聚合使用一个参与者似乎更优雅。
另外请注意,Akka有一个聚合模式的实现,如下所述
我有一个关于阿卡的询问模式的问题。 我正在使用应用程序中的ask模式调用演员。我的应用程序是一个普通的类(不是演员本身): 然而,上面所说的演员叫第二个演员,比如: 第二个参与者会返回实际的响应,比如: 所以基本上涉及到一个间接寻址,当Actor1调用Actor2时,它使用
问题内容: 当我在Android上运行使用Akka的应用程序时,收到以下异常: 在actor系统创建期间抛出此异常: 我以为proguard删除了一个构造函数,所以我在proguard.cfg中添加了以下行: 但这没有帮助。 我究竟做错了什么? 问题答案: 您的具有以下签名的构造函数之一不公开或不存在:
我有一个左向的Tabview。 我想减少选项卡标题的数量,它们太大了。我该怎么做?
我正在尝试了解akka流媒体中的toMat。例如: viaMat vs via有什么用 谢谢你,阿伦
在使用Quartz Scheduler 1.8.6版的应用程序中,当作业未完成时,我们遇到了一个触发器卡住的问题。 例如,我们有ssh调用或数据库查询的作业。如果这些作业挂起(因为ssh调用没有终止,或者select语句有一个表锁),那么我将无法再触发这些作业。触发器被卡住,直到我强制重新启动调度程序。 我已经试过了。中断(触发器)和调度程序。重新调度触发器()。我试着移除触发器并重新创建它。我已
我已经和Cassandra合作了一段时间,并遵循了以下链接中的基准测试提示: http://www.datastax.com/dev/blog/how-not-to-benchmark-cassandra 我有4个节点运行Cassandra,2个不同的节点使用本机基准测试工具“cassandra-stress”为集群提供数据。我知道,由于Cassandra写操作的LSM特性,它们很难绑定到IO,但