当前位置: 首页 > 知识库问答 >
问题:

如何确定Akka演员/主管层级?

范修伟
2023-03-14

我是全新的Akka(Javalib,v2.3.9)。我试图遵循主管层次结构的最佳实践,但由于这是我的第一个Akka应用程序,我在某个地方遇到了心理障碍。

在我的第一个Akka应用程序(实际上是一个用于在多个应用程序之间重用的库)中,来自外部世界的输入表现为传递给参与者的过程消息。使用我的应用程序的开发人员将提供一个基于文本的配置文件,该文件最终配置哪些参与者收到了发送的Process实例,哪些没有。换句话说,这些是我的演员课程:

// Groovy pseudo-code
class Process {
    private final Input input

    Process(Input input) {
        super()
        this.input = deepClone(input)
    }

    Input getInput() {
        deepClone(this.input)
    }
}

class StormTrooper extends UntypedActor {
    @Override
    void onReceive(Object message) {
        if(message instanceof Process) {
            // Process the message like a Storm Trooper would.
        }
    }
}

class DarthVader extends UntypedActor {
    @Override
    void onReceive(Object message) {
        if(message instanceof Process) {
            // Process the message like Darth Vader would.
        }
    }
}

class Emperor extends UntypedActor {
    @Override
    void onReceive(Object message) {
        if(message instanceof Process) {
            // Process the message like the Emperor would.
        }
    }
}

// myapp-config.json -> where the actors are configured, along with other
// app-specific configs
{
    "fizzbuzz": "true",
    "isYosemite": "false",
    "borderColor": "red",
    "processors": [
        "StormTrooper",
        "Emperor"
    ]
}

正如您在配置文件中所看到的,只有冲锋队皇帝被选择接收处理消息。这最终导致零(0)DarthVaderactor被创建。这也是我的意图,这将导致一个

class SomeApp {
    SomeAppConfig config

    static void main(String[] args) {
        String configFileUrl = args[0] // Nevermind this horrible code

        // Pretend here that configFileUrl is a valid path to
        // myapp-config.json.

        SomeApp app = new SomeApp(configFileUrl)
        app.run()
    }

    SomeApp(String url) {
        super()

        config = new SomeAppConfig(url)
    }

    void run() {
        // Since the config file only specifies StormTrooper and
        // Emperor as viable processors, the set only contains instances of
        // these ActorRef types.
        Set<ActorRef> processors = config.loadProcessors()
        ActorSystem actorSystem = config.getActorSystem()

        while(true) {
            Input input = scanForInput()
            Process process = new Process(input)

            // Notify each config-driven processor about the
            // new input we've received that they need to process.
            processors.each {
                it.tell(process, Props.self()) // This isn't correct btw
            }
        }
    }
}

因此,正如您(希望)看到的,我们有所有这些参与者(实际上,有许多非类型转换器impl)来处理过程消息(这些消息反过来从某个源捕获输入)。至于哪些参与者能够在线处理这些过程消息完全是由配置驱动的。最后,每次应用程序接收到输入,它都会被注入流程消息,并且流程消息会被发送给所有配置/活动的参与者。

有了这个给定的背景/设置,我无法确定“演员/主管层次结构”需要是什么。在我的用例中,似乎所有的参与者都是真正平等的,他们之间没有监督结构。StymTrooper仅仅接收一个Process消息,如果该类型的参与者被配置为存在的话。其他演员子类也是如此。

我完全错过了什么吗?如果所有参与者都是平等的,并且层次结构本质上是“平坦的”/“水平的”,我如何定义监督层次结构(出于容错目的)?


共有2个答案

唐腾
2023-03-14

根据您的评论,您仍然希望Masteractor复制并分发流程es。从概念上讲,您不会让用户(或生成您的输入的任何人)为每个参与者提供一次相同的输入。他们只会提供一次消息,然后您(或主参与者参与者)会根据需要复制消息并将其发送给每个相应的子参与者。

正如在dk14的回答中所讨论的,这种方法具有增加容错性的额外好处。

景令秋
2023-03-14

如果您希望为每个参与者实例化不超过一个实例,那么您可能需要使用SenatorPalpatine来监督这三个实例。如果你可能有多个冲锋队-你可能想让JangoFett演员负责创建(或者杀死)它们,一些路由器也是不错的选择(它会自动监督它们)。这还将使您能够在一个士兵失败时重新启动所有士兵(OneForAllStrategy),能够广播,保存一些常见的统计数据等。

路由器示例(伪Scala):

//application.conf
akka.actor.deployment {
  /palpatine/vader {
    router = broadcast-pool
    nr-of-instances = 1
  }
  /palpatine/troopers {
    router = broadcast-pool
    nr-of-instances = 10
  }
}

class Palpatine extends Actor {
    import context._

    val troopers = actorOf(FromConfig.props(Props[Trooper], 
"troopers").withSupervisorStrategy(strategy) //`strategy` is strategy for troopers

    val vader = actorOf(FromConfig.props(Props[Vader]), "vader")

    override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1) //stategy for Palpatine's children (routers itself)

    val strategy = OneForOneStrategy(maxNrOfRetries = 100, withinTimeRange = 1) //stategy for troopers

    def receive = {
         case p@Process => troopers ! p; vader ! p
         case t@Terminted => println(t)
    }
 }

基于标准akka配置创建广播池。我还展示了您可以分别为他们定制监控策略。

如果您希望某些参与者出于某种原因忽略消息-只需在参与者中实现此逻辑,例如:

class Vader extends Actor {
    def receive {
        case p@Process => ...
        case Ignore => context.become(ignore) //changes message handler to `ignore`
    }


    def ignore = {
        case x => println("Ignored message " + x)
        case UnIgnore => context.become(process)//changes message handler back
    }

}

这将动态配置ignore/unignore(否则它只是一个简单的if)。您可以根据某些配置向参与者发送Ignore消息:

val listOfIgnorantPathes = readFromSomeConfig()
context.actorSelection(listOfIgnoredPathes) ! Ignore

如果您想从配置中控制异构广播,您还可以使用与士兵路由器相同的方式为帕尔帕廷创建广播机(只使用组而不是池):

akka.actor.deployment {
  ... //vader, troopers configuration

  /palpatine/broadcaster {
    router = broadcast-group
    routees.paths = ["/palpatine/vader", "/palpatine/troopers"]
  }
}

class Palpatine extends Actor {
   ... //vader, troopers definitions

   val broadcaster = actorOf(FromConfig.props(), "broadcaster")

   def receive = {
     case p@Process => broadcaster ! p
   }
}

只需将维德排除在routees之外。路径使他不接收处理消息。

P. S.演员永远不会孤单——总是有守护者演员(见顶级监督者),它会在异常情况下关闭整个系统。所以不管怎样,参议员帕尔帕廷可能真的会成为你的救星。

P.S.2context。actorSelection(“帕尔帕廷/*”)实际上允许您向所有孩子发送消息(作为广播池和广播组的替代方案),因此您不需要在其中设置一组孩子。

 类似资料:
  • 我很想知道调整大小,或者在本例中增加单个节点系统上的actor池中actor的数量是否真的会影响性能。 我有一个带超线程的四核系统。在任何给定的点上,系统可以运行8个线程。假设执行元执行的大多数操作都是CPU绑定的,那么将池中的执行元数量从20个增加到40个会有什么收获呢?

  • 我正在将现有应用程序从Akka Classic移植到Akka Typed。最初,您可以使用上下文获取对参与者的引用。actorSelection()。resolveOne() 我知道在Akka Type中不再支持这一点,我们应该使用来注册演员以供发现。 但是,我只想将消息发送到本地参与者,即存在于集群中每个节点上的本地单例。我有它的本地路径,但没有对它的直接引用。这是因为它是由Akka管理系统创建

  • 问题内容: 我有一个不是actor的java对象,它使用actorSelection(Path)从一个actor系统中选择actor。系统中可能不存在所选参与者。 在Java Api中,ActorSelection不存在ask(),因此我无法向actor选择发送和标识消息并使用响应的发送者。 我试图通过演员选择将消息发送给演员,然后对死信做出反应来解决该问题。但是我没有任何死信。 如何通过Acto

  • 我需要启动Akka(2.0)演员系统,发送一些消息,然后等待它做重举。之后,我需要做一些与那些演员无关的事情。 我试图等待所有参与者停止以下代码: 所有演员通过<代码>自我自杀!PoisonPill。我做错了什么?

  • 我们在使用Akka HTTP构建的web服务器上遇到了奇怪的内存行为。我们的架构是这样的: Web服务器路由调用各种参与者,为将来获取结果并将其流式传输到响应 参与者调用非阻塞操作(使用期货),组合和处理从中提取的数据,并将结果传送给发送者。我们使用标准的Akka Actor,实现它的receive方法(不是Akka键入的) 应用程序中的任何地方都没有阻止代码 当我在本地运行web服务器时,一开始