我目前正在尝试修改使用Akka Actors实现的simple dist信仰框架的源代码。原始源代码在这里:http://alexminnaar.com/implementing-the-distbelieve-deep-neural-network-training-framework-with-akka.html。最初的实现只是基于Akka Actor,但我想将其扩展到分布式模式。我认为Akka-Cluster-Sharding是这个任务的正确选择。但是我想知道在哪里可以正确处理传入的消息,在receive()方法中,或者在actor类中的extractShardId()&extractEntityId()中(例如对于ParameterShard actor,您可以在上面给定的链接中看到完整的源代码)。Akka的官方文档说:*extractEntityId和extractShardId是两个应用程序特定的函数,用于从传入消息中提取实体标识符和碎片标识符。
object ParameterShard {
case class ParameterRequest(dataShardId: Int, layerId: Int)
case class LatestParameters(weights: DenseMatrix[Double])
}
class ParamServer(shardId: Int,
numberOfShards: Int,
learningRate: Double,
initialWeight: LayerWeight) extends Actor with ActorLogging {
val shardName: String = "ParamServer"
val extractEntityId: ShardRegion.ExtractEntityId = {
//case ps: ParameterRequest => (ps.dataShardId.toString, ps)
}
val extractShardId: ShardRegion.ExtractShardId = {
//case ps: ParameterRequest => (ps.dataShardId % numberOfShards).toString
}
//weights initialize randomly
var latestParameter: LayerWeight = initialWeight
def receive = {
//A layer corresponding to this shardId in some model replica has requested the latest version of the parameters.
case ParameterRequest(shardId, layerId) => {
log.info(s"layer ${layerId} weights read by model replica ${shardId}")
context.sender() ! LatestParameters(latestParameter)
}
/*
A layer corresponding to this shardId in some model replica has computed a gradient, so we must update our
parameters according to this gradient.
*/
case Gradient(g, replicaId, layerId) => {
log.info(s"layer ${layerId} weights updated by model replica ${replicaId}")
latestParameter = latestParameter + g.t * learningRate
}
}
}
我不知道任何关于distbelieve的东西,所以我不能识别哪个演员是由什么来识别的。所以,很抱歉我不能提供一个示例实现。但是让我们来做一个例子:您想要从不同的域获取/example url。所以你的演员看起来
class FetchActor(domain: String) extends Actor {
def receive = {
case Fetch => // do the job
}
}
object FetchActor {
case object Fetch
}
为了对该actor使用集群分片,请如下所示进行重构
class FetchActor extends Actor {
def receive = {
case Fetch(domain) => // do the job
}
}
object FetchActor {
case class Fetch(domain: String) // move parameter to message
val entityIdExtractor = {
case msg @ Fetch(domain) => (domain, msg) // Which message FetchActor process
}
val shardIdExtractor = {
case Fetch(domain) => domain // What is shard id
}
}
在该示例中,我决定使用整个域作为shardIdExtractor,但您的应用程序可能需要顶级域。
这就够了。只需启动FetchActor和集群分片,akka就会自动处理它的分发。
CLUSTER INFO 打印集群的相关信息: 127.0.0.1:7711> CLUSTER INFO cluster_state:ok -- 集群运作状态 cluster_known_nodes:1 -- 集群包含的节点数量 cluster_reachable_nodes:1 -- 集
CLUSTER FORGET node-id 从集群中移除带有指定 ID 的节点: 127.0.0.1:7711> CLUSTER NODES de0a573aae684388a0a6efc90c41c7bd571ff981 127.0.0.1:7713 noflags 0 1430839655355 connected 6c56c20d868d76ab199b8d1ea23a99db7ee2a8
CLUSTER MEET ip port 将地址为 ip , 端口号为 port 的 Disque 节点加入到当前节点所在的集群里面: 127.0.0.1:7711> CLUSTER NODES -- 集群里面只有端口号为 7711 的节点(当前节点) 33d40d27b7be502a28c5b97a6d569347a0bc178d :7711 myself 0 0 connected 1
CLUSTER NODES 列出集群包含的各个节点, 以及这些节点的基本信息: 127.0.0.1:7711> CLUSTER NODES de0a573aae684388a0a6efc90c41c7bd571ff981 127.0.0.1:7713 noflags 0 1430839655355 connected 6c56c20d868d76ab199b8d1ea23a99db7ee2a8e2
gopush-cluster是一套golang开发的实时消息推送集群 轻量级 高性能 纯Golang实现 支持消息过期 支持离线消息存储 支持全量推送和单个私信推送 支持单个Key多个订阅者(可限制订阅者最大人数) 心跳支持(应用心跳和tcp keepalive) 支持安全验证(未授权用户不能订阅) 多协议支持(websocket,tcp) 详细的统计信息 可拓扑的架构(支持增加和删除comet节
MySQL Cluster 是 MySQL 适合于分布式计算环境的高实用、高冗余版本。它采用了NDB Cluster 存储引擎,允许在1个 Cluster 中运行多个MySQL服务器。在MyQL 5.0及以上的二进制版本中、以及与最新的Linux版本兼容的RPM中提供了该存储引擎。(注意,要想获得MySQL Cluster 的功能,必须安装 mysql-server 和 mysql-max RPM