当前位置: 首页 > 文档资料 > 机器学习 >

分布式训练

优质
小牛编辑
135浏览
2023-12-01

相关概念

  • 客户端 (Client):客户端是一个用于建立 TensorFlow 计算图并创立与集群进行交互的会话层 tensorflow::Session 的程序。一般客户端是通过 python 或 C++ 实现的。一个独立的客户端进程可以同时与多个 TensorFlow 的服务端相连 (上面的计算流程一节),同时一个独立的服务端也可以与多个客户端相连。
  • 集群 (Cluster) : 一个 TensorFlow 的集群里包含了一个或多个作业 (job), 每一个作业又可以拆分成一个或多个任务 (task)。集群的概念主要用与一个特定的高层次对象中,比如说训练神经网络,并行化操作多台机器等等。集群对象可以通过 tf.train.ClusterSpec 来定义。
  • 作业 (Job) : 一个作业可以拆封成多个具有相同目的的任务(task),比如说,一个称之为 ps(parameter server,参数服务器) 的作业中的任务主要是保存和更新变量,而一个名为 work(工作)的作业一般是管理无状态且主要从事计算的任务。一个作业中的任务可以运行于不同的机器上,作业的角色也是灵活可变的,比如说称之为”work”的作业可以保存一些状态。
  • 主节点的服务逻辑 (Master service) : 一个 RPC 服务程序可以用来远程连接一系列的分布式设备,并扮演一个会话终端的角色,主服务程序实现了一个 tensorflow::Session 的借口并负责通过工作节点的服务进程(worker service) 与工作的任务进行通信。所有的主服务程序都有了主节点的服务逻辑。
  • 任务 (Task) : 任务相当于是一个特定的 TesnsorFlow 服务端,其相当于一个独立的进程,该进程属于特定的作业并在作业中拥有对应的序号。
  • TensorFlow 服务端 (TensorFlow server) : 一个运行了 tf.train.Server 实例的进程,其为集群中的一员,并有主节点和工作节点之分。
  • 工作节点的服务逻辑 (Worker service) : 其为一个可以使用本地设备对部分图进行计算的 RPC 逻辑,一个工作节点的服务逻辑实现了 worker_service.proto 接口, 所有的 TensorFlow 服务端均包含工作节点的服务逻辑。

创建 Cluster 集群

cluster = tf.train.ClusterSpec({
    "worker": [
        "worker0.example.com:2222",
        "worker1.example.com:2222",
        "worker2.example.com:2222"
    ],
    "ps": [
        "ps0.example.com:2222",
        "ps1.example.com:2222"
    ]})
# Specifying distributed devices in your model
with tf.device("/job:ps/task:0"):
  weights_1 = tf.Variable(...)
  biases_1 = tf.Variable(...)
with tf.device("/job:ps/task:1"):
  weights_2 = tf.Variable(...)
  biases_2 = tf.Variable(...)
with tf.device("/job:worker/task:7"):
  input, labels = ...
  layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1)
  logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2)
  # ...
  train_op = ...
with tf.Session("grpc://worker7.example.com:2222") as sess:
  for _ in range(10000):
    sess.run(train_op)

注意: **cluster spec 需要在进程启动时指定,无法实现动态的扩容或缩容,这个问题社区通过引入 Kubernetes 集群管理工具来解决 **。

LocalServer

客户端与服务器一起的版本:

# Start a TensorFlow server as a single-process "cluster".
import tensorflow as tf
c = tf.constant("Hello, distributed TensorFlow!")
server = tf.train.Server.create_local_server()
sess = tf.Session(server.target)  # Create a session on the server.
print sess.run(c)
'Hello, distributed TensorFlow!'

当然,可以将其拆分成 c/s 独立的服务:

# server
import tensorflow as tf
server = tf.train.Server.create_local_server()
server.join()
# client
import tensorflow as tf
c = tf.constant("Hello, distributed TensorFlow!")
sess = tf.Session("grpc://localhost:46685")
print sess.run(c)
'Hello, distributed TensorFlow!'

完整实例

import tensorflow as tf
# Flags for defining the tf.train.ClusterSpec
tf.app.flags.DEFINE_string("ps_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
                           "Comma-separated list of hostname:port pairs")
# Flags for defining the tf.train.Server
tf.app.flags.DEFINE_string("job_name", "","One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
FLAGS = tf.app.flags.FLAGS
def main(_):
  ps_hosts = FLAGS.ps_hosts.split(",")
  worker_hosts = FLAGS.worker_hosts.split(",")
  # Create a cluster from the parameter server and worker hosts.
  cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
  # Create and start a server for the local task.
  server = tf.train.Server(cluster,
                           job_name=FLAGS.job_name,
                           task_index=FLAGS.task_index)
  if FLAGS.job_name == "ps":
    server.join()
  elif FLAGS.job_name == "worker":
    # Assigns ops to the local worker by default.
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):
      # Build model...
      loss = ...
      global_step = tf.Variable(0)
      train_op = tf.train.AdagradOptimizer(0.01).minimize(
          loss, global_step=global_step)
      saver = tf.train.Saver()
      summary_op = tf.summary.merge_all()
      init_op = tf.global_variables_initializer()
    # Create a "supervisor", which oversees the training process.
    sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
                             logdir="/tmp/train_logs",
                             init_op=init_op,
                             summary_op=summary_op,
                             saver=saver,
                             global_step=global_step,
                             save_model_secs=600)
    # The supervisor takes care of session initialization, restoring from
    # a checkpoint, and closing when done or an error occurs.
    with sv.managed_session(server.target) as sess:
      # Loop until the supervisor shuts down or 1000000 steps have completed.
      step = 0
      while not sv.should_stop() and step < 1000000:
        # Run a training step asynchronously.
        # See `tf.train.SyncReplicasOptimizer` for additional details on how to
        # perform *synchronous* training.
        _, step = sess.run([train_op, global_step])
    # Ask for all the services to stop.
    sv.stop()
if __name__ == "__main__":
  tf.app.run()
# On ps0.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=ps --task_index=0
# On ps1.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=ps --task_index=1
# On worker0.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=worker --task_index=0
# On worker1.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=worker --task_index=1