当前位置: 首页 > 知识库问答 >
问题:

akka-cluster-sharding:receive()vs extarctShardId()和extractentityId()

易烨磊
2023-03-14

我目前正在尝试修改使用Akka Actors实现的simple dist信仰框架的源代码。原始源代码在这里:http://alexminnaar.com/implementing-the-distbelieve-deep-neural-network-training-framework-with-akka.html。最初的实现只是基于Akka Actor,但我想将其扩展到分布式模式。我认为Akka-Cluster-Sharding是这个任务的正确选择。但是我想知道在哪里可以正确处理传入的消息,在receive()方法中,或者在actor类中的extractShardId()&extractEntityId()中(例如对于ParameterShard actor,您可以在上面给定的链接中看到完整的源代码)。Akka的官方文档说:*extractEntityId和extractShardId是两个应用程序特定的函数,用于从传入消息中提取实体标识符和碎片标识符。

object ParameterShard {
  case class ParameterRequest(dataShardId: Int, layerId: Int)
  case class LatestParameters(weights: DenseMatrix[Double])
}

class ParamServer(shardId: Int,
                  numberOfShards: Int,
                  learningRate: Double,
                  initialWeight: LayerWeight) extends Actor with ActorLogging {


  val shardName: String = "ParamServer"

  val extractEntityId: ShardRegion.ExtractEntityId = {
      //case ps: ParameterRequest => (ps.dataShardId.toString, ps)

  }

  val extractShardId: ShardRegion.ExtractShardId = {
      //case ps: ParameterRequest => (ps.dataShardId % numberOfShards).toString
  }
  //weights initialize randomly
  var latestParameter: LayerWeight = initialWeight

  def receive = {

    //A layer corresponding to this shardId in some model replica has requested the latest version of the parameters.
    case ParameterRequest(shardId, layerId) => {
      log.info(s"layer ${layerId} weights read by model replica ${shardId}")
      context.sender() ! LatestParameters(latestParameter)
    }

    /*
    A layer corresponding to this shardId in some model replica has computed a gradient, so we must update our
    parameters according to this gradient.
    */
    case Gradient(g, replicaId, layerId) => {
      log.info(s"layer ${layerId} weights updated by model replica ${replicaId}")
      latestParameter = latestParameter + g.t * learningRate
    }

  }

}

共有1个答案

韶浩皛
2023-03-14

我不知道任何关于distbelieve的东西,所以我不能识别哪个演员是由什么来识别的。所以,很抱歉我不能提供一个示例实现。但是让我们来做一个例子:您想要从不同的域获取/example url。所以你的演员看起来

class FetchActor(domain: String) extends Actor {
  def receive = {
    case Fetch => // do the job
  }
}

object FetchActor {
  case object Fetch
}

为了对该actor使用集群分片,请如下所示进行重构

class FetchActor extends Actor {
  def receive = {
    case Fetch(domain) => // do the job
  }
}

object FetchActor {
  case class Fetch(domain: String) // move parameter to message
  val entityIdExtractor = {
    case msg @ Fetch(domain) => (domain, msg)   // Which message FetchActor process
  }
  val shardIdExtractor = {
    case Fetch(domain) => domain   // What is shard id
  }
}

在该示例中,我决定使用整个域作为shardIdExtractor,但您的应用程序可能需要顶级域。

这就够了。只需启动FetchActor和集群分片,akka就会自动处理它的分发。

 类似资料:
  • CLUSTER INFO 打印集群的相关信息: 127.0.0.1:7711> CLUSTER INFO cluster_state:ok -- 集群运作状态 cluster_known_nodes:1 -- 集群包含的节点数量 cluster_reachable_nodes:1 -- 集

  • CLUSTER FORGET node-id 从集群中移除带有指定 ID 的节点: 127.0.0.1:7711> CLUSTER NODES de0a573aae684388a0a6efc90c41c7bd571ff981 127.0.0.1:7713 noflags 0 1430839655355 connected 6c56c20d868d76ab199b8d1ea23a99db7ee2a8

  • CLUSTER MEET ip port 将地址为 ip , 端口号为 port 的 Disque 节点加入到当前节点所在的集群里面: 127.0.0.1:7711> CLUSTER NODES -- 集群里面只有端口号为 7711 的节点(当前节点) 33d40d27b7be502a28c5b97a6d569347a0bc178d :7711 myself 0 0 connected 1

  • CLUSTER NODES 列出集群包含的各个节点, 以及这些节点的基本信息: 127.0.0.1:7711> CLUSTER NODES de0a573aae684388a0a6efc90c41c7bd571ff981 127.0.0.1:7713 noflags 0 1430839655355 connected 6c56c20d868d76ab199b8d1ea23a99db7ee2a8e2

  • gopush-cluster是一套golang开发的实时消息推送集群 轻量级 高性能 纯Golang实现 支持消息过期 支持离线消息存储 支持全量推送和单个私信推送 支持单个Key多个订阅者(可限制订阅者最大人数) 心跳支持(应用心跳和tcp keepalive) 支持安全验证(未授权用户不能订阅) 多协议支持(websocket,tcp) 详细的统计信息 可拓扑的架构(支持增加和删除comet节

  • MySQL Cluster 是 MySQL 适合于分布式计算环境的高实用、高冗余版本。它采用了NDB Cluster 存储引擎,允许在1个 Cluster 中运行多个MySQL服务器。在MyQL 5.0及以上的二进制版本中、以及与最新的Linux版本兼容的RPM中提供了该存储引擎。(注意,要想获得MySQL Cluster 的功能,必须安装 mysql-server 和 mysql-max RPM