当前位置: 首页 > 知识库问答 >
问题:

如何仅使用 actor 系统从一组执行组件收集状态信息?

卜弘文
2023-03-14

我正在创建一个 actor 系统,它有一个表示某种会话状态的 actor 列表。这些会话由工厂执行组件创建(如果性能需要,将来可能会被路由器取代 - 但是,这应该对系统的其余部分是透明的)。现在,我想实现一个操作,在该操作中,我从每个当前存在的会话参与者中获取一些状态信息。我没有明确的会话列表,因为我想依靠“拥有”会话的参与者系统。我尝试使用 actor 系统来查找当前会话的 actor。问题是我没有找到“使用此命名模式获取所有actor引用”的方法。我试图在系统上使用“/”运算符,然后是resolveOne - 但迷失在未来类型的迷宫中。

我的基本想法是: - 向所有当前会话的Actor发送消息(由我的ActorSystem提供给我的)。- 等待他们的响应(最好只使用“ask”模式 - 调用此广播公司请求/响应的方法只是一种监视或调试方法,因此阻止在这里没有问题。

在与Scala的类型系统进行了一场死战之后,我不得不暂时放弃。真的没有办法做这样的事吗?

共有2个答案

訾雅畅
2023-03-14

这个项目有很多额外的变化,所以我的回答/评论被推迟了很多:-/

首先,会话统计信息的收集不应该是定期的,而是应请求进行的。我最初的想法是“误用”actor系统作为所有现有会话actor的映射,这样我就不需要知道所有会话的主管actor。

这一目标已经证明是难以实现的——会话参与者依赖于共享状态,因此会话创建者无论如何都必须观看会话。

这使得选项2成为这里显而易见的答案——会话创建者无论如何都必须监视所有的孩子。

选项1最令人烦恼的障碍是“如何确定何时存在所有(当前)答案” - 我希望统计请求拍摄所有当前存在的Actor名称的快照,查询它们,忽略失败(如果会话在查询之前死亡,则可以在此处忽略) - 统计请求只是一个调试工具, 即“尽最大努力”演员选择api把我纠缠在一片未来的丛林中(我是Scala/Akka新手),所以我放弃了这条路。

因此,选择2更适合我的需要。

吕成业
2023-03-14

如果我正确理解了这个问题,那么我可以提供一些方法来实现这个目标(当然还有其他方法)。

选项1

在这种方法中,将有一个参与者负责定期唤醒并向所有会话参与者发送请求以获取其当前统计信息。该参与者将使用带有通配符的 ActorSelection 来实现该目标。如果此方法的代码如下所示,则粗略概述:

case class SessionStats(foo:Int, bar:Int)
case object GetSessionStats

class SessionActor extends Actor{
  def receive = {
    case GetSessionStats =>
      println(s"${self.path} received a request to get stats")
      sender ! SessionStats(1, 2)
  }
}


case object GatherStats
class SessionStatsGatherer extends Actor{
  context.system.scheduler.schedule(5 seconds, 5 seconds, self, GatherStats)(context.dispatcher)

  def receive = {
    case GatherStats =>
      println("Waking up to gether stats")
      val sel = context.system.actorSelection("/user/session*")
      sel ! GetSessionStats

    case SessionStats(f, b) =>
      println(s"got session stats from ${sender.path}, values are $f and $b")
  }
}

然后,您可以使用以下内容测试此代码:

val system = ActorSystem("test")
system.actorOf(Props[SessionActor], "session-1")
system.actorOf(Props[SessionActor], "session-2")

system.actorOf(Props[SessionStatsGatherer])

Thread.sleep(10000)
system.actorOf(Props[SessionActor], "session-3")

因此,使用这种方法,只要我们使用命名约定,我们就可以使用带有通配符的actor选择来始终查找所有会话Actor,即使它们不断出现(开始)和离开(停止)。

选项2

有点类似的方法,但在这个方法中,我们使用集中式参与者来生成会话参与者并充当它们的监督者。这个中央参与者还包含定期轮询统计数据的逻辑,但由于它是父参与者,它不需要ActorSelect,而是可以使用它的子参与者列表。看起来像这样:

case object SpawnSession
class SessionsManager extends Actor{
  context.system.scheduler.schedule(5 seconds, 5 seconds, self, GatherStats)(context.dispatcher)
  var sessionCount = 1

  def receive = {
    case SpawnSession =>
      val session = context.actorOf(Props[SessionActor], s"session-$sessionCount")
      println(s"Spawned session: ${session.path}")
      sessionCount += 1
      sender ! session

    case GatherStats =>
      println("Waking up to get session stats")
      context.children foreach (_ ! GetSessionStats)

    case SessionStats(f, b) =>
      println(s"got session stats from ${sender.path}, values are $f and $b")      
  }
}

并且可以如下测试:

val system = ActorSystem("test")
val manager = system.actorOf(Props[SessionsManager], "manager")
manager ! SpawnSession
manager ! SpawnSession
Thread.sleep(10000)
manager ! SpawnSession

现在,这些示例非常简单,但希望它们为您描绘了一幅图画,说明如何使用ActorSelection或管理/监督动态解决此问题。另外一个好处是,在这两种方法中都不需要询问,也不需要阻塞。

 类似资料:
  • Scala2.12和Akka都是新的。我正在尝试在运行时收集关于我的actor系统的一些度量。具体地说,我感兴趣的是: 获取在我的执行元系统中运行的执行元的# 对于每个执行元: 获取已接收的消息数 获取它已发送给其他参与者的消息数 确定消息在执行元邮箱中的时间 确定执行元实际处理消息所用的时间 获取执行元邮箱的当前大小 我希望Akka有某种鲜为人知的“元API”,在那里我可以去获取这些类型的度量,

  • 我需要使用父组件中子组件内部的状态值。 组成部分: 这是父组件:

  • Actor是封装状态和行为的对象,他们唯一的通讯方式是交换消息——把消息存放在接收方的邮箱里。从某种意义上来说,actor是面向对象最严格的形式,不过最好把它们比作人:在使用actor来对解决方案建模时,把actor想象成一群人,把子任务分配给他们,将他们的功能整理成一个有组织的结构,考虑如何将失败逐级上传(好在我们并不需要真正跟人打交道,这样我们就不需要关心他们的情绪状态和道德问题)。这个结果就

  • 假设我有以下名为Home的组件: 在PostForm组件与新Post一起提交后,我将如何更新主状态,或者如何从api重新获取数据。

  • Cache RPC Cluster Crontab SPl库 DI Invoker Logger

  • 我正在将父组件的状态从父组件传递到子组件。在子组件中,我有一个不同的状态。我正在对子组件的状态执行一些操作,并且结果必须添加到父组件的状态。因此,在我的父组件中,我编写了一个回调函数,该函数将更新父组件的状态。代码为: 因此,这个函数随后作为道具传递给子组件: 然后在我的子组件中,我试图实现回调函数为: 但这并没有达到预期效果。这是正确的方法吗?有人能帮我吗? 我试图通过实现这里提供的解决方案来解