当前位置: 首页 > 知识库问答 >
问题:

具有Akka类型参与者和群集分片的优先级邮箱

熊俊人
2023-03-14

我有一个带有类型化参与者的集群分片应用程序。演员看起来像这样:

object TestActor {

  sealed trait Command
  final case class Inte(i: Int) extends Command
  final case class Stringaki(s: String) extends Command

  val TypeKey = EntityTypeKey[Command]("Test")

  def defaultThreadBehavior(id: String): Behavior[Command] = Behaviors.setup { ctx =>

    Behaviors.receiveMessage { cmd =>
      cmd match {
        case Inte(i) =>
          ctx.log.info(System.currentTimeMillis()/1000 + " Received int: " + i)
          Thread.sleep(1000)
        case Stringaki(s) =>
          ctx.log.info(System.currentTimeMillis()/1000 + " Received string: " + s)
          Thread.sleep(1000)
      }
      Behaviors.same
    }
  }

}

演员是通过这样的碎片信封创造的:

val system_config = ConfigFactory.parseString(
      """
        |akka {
        |  actor {
        |    provider = "cluster"
        |    prio-dispatcher {
        |      type = "Dispatcher"
        |      mailbox-type = "PriorityMailbox"
        |    }
        |  }
        |  remote {
        |    netty.tcp {
        |      hostname = "127.0.0.1"
        |      port = 2551
        |    }
        |  }
        |  cluster {
        |    seed-nodes = [
        |      "akka.tcp://TestApp@127.0.0.1:2551"
        |    ]
        |    sharding {
        |      number-of-shards = 10
        |      use-dispatcher = "akka.actor.prio-dispatcher"
        |    }
        |  }
        |}
        |""".stripMargin)

    val system = ActorSystem(Behaviors.empty[TestActor.Command], "TestApp",system_config)
    val sharding = ClusterSharding(system)

    val shardRegion = sharding.init(Entity(TestActor.TypeKey, ctx => defaultThreadBehavior(ctx.entityId)))

    (0 to 9).foreach{
      i =>
        shardRegion ! ShardingEnvelope(0.toString, Inte(i))
    }

    (0 to 9).foreach{
      i =>
        shardRegion ! ShardingEnvelope(0.toString, Stringaki(i.toString))
    }

两个for循环向同一参与者发送消息。第一个循环发送整数,第二个循环发送字符串。当参与者处理消息时,它会Hibernate,以便在队列中建立消息并测试优先级。优先级邮箱在系统配置中配置,UnboundDPriorityMailbox实现如下:

class PriorityMailbox (settings: Settings, cfg: Config) extends UnboundedPriorityMailbox(

  PriorityGenerator {
    case Stringaki => 0
    case _ => 1
  }

)

为什么Actor按照消息到达的顺序打印消息,而不考虑优先级生成器?

共有1个答案

袁高明
2023-03-14

对于为什么没有看到优先级邮箱的效果,简短的回答是,您的TestActor没有使用优先级邮箱,而是使用默认邮箱。只有Akka群集分片系统正在使用优先级邮箱。集群分片reference.confakka.cluster.sharding.use的描述:

# The id of the dispatcher to use for ClusterSharding actors.
# If specified you need to define the settings of the actual dispatcher.
# This dispatcher for the entity actors is defined by the user provided
# Props, i.e. this dispatcher is not used for the entity actors.

诚然,您发送的每一条消息都要通过优先级邮箱,但是由于群集分片内部的参与者不睡觉,所以没有积压的开发(尽管在某些情况下,尤其是在内核较少的情况下,可能会有积压的优先级可能会发光)。

要让实体角色在具有优先级邮箱的调度程序中运行,您需要类似于

val entityDispatcherProps = DispatcherSelector.fromConfig("akka.actor.prio-dispatcher")
val baseEntity = Entity(TestActor.TypeKey)(ctx => defaultThreadBehavior(ctx.entityId))
val shardRegion = sharding.init(baseEntity.withEntityProps(entityDispatcherProps))

 类似资料:
  • 我的用例是,我想建立一个运行Akka Actors的节点集群。每个参与者都是同一参与者的实例,用于处理与特定用户的WebSocket连接。每个参与者都将使用唯一的路径注册自己。在非集群设置中,我可以简单地通过其路径调用actor,如其中,是actor实例的唯一名称。我必须将消息传递给这些参与者,这样他们才能将消息发送回各自的WebSocket客户端。 显然Akka集群提供了各种设置:http://

  • 我正在尝试使用类型化执行器版本2.6.3和akka http版本10.1.11,而在非类型化执行器中都运行良好,现在我得到了编译错误

  • 我遇到了一个场景,我需要检查特定的参与者是否存在,这可以通过ActorSystem完成。actorSelection方法,指定参与者路径 但是,当本地节点上存在此类参与者时,此方法可以正常工作。若actor系统由多个节点组成,并且actor存在于另一个节点上,则该方法告诉我们actor不存在。若我给出指定远程参与者系统的字符串,那个么这个方法可以工作。但在actorSelection方法中指定远程

  • 我们正在使用Akka sharding将我们的运行参与者分布在多个节点上。这些行为体是持久的,我们将它们的内部状态保留在数据库中。 现在我们需要将ActorRef添加到“Metrics Actor”中,在每个节点上运行。shard中的每个actor都应该将遥测数据发送给metrics actor-它必须选择正确的metrics actor,该actor在本地运行于同一节点上。原因是,度量执行元收集

  • 我正致力于将一个单节点akka actor系统应用程序改为akka集群。一个变化是将一种有状态的参与者(在运行时可以有很多)变成集群分片托管实体。现在它在多个节点上运行良好。 我面临的一个问题是如何查询(获取全部)在集群中划分区域所创建的实体。以前在单节点模式下,它使用actor system ActorSelection进行actor路径匹配以获得匹配的actor列表,这在集群分片中不再起作用。