TensorFlow2.0 Guide官方教程 学习笔记18 -‘Distributed training with TensorFlow‘

汪成仁
2023-12-01

本笔记参照TensorFlow官方教程,主要是对‘Distributed training with TensorFlow’教程内容翻译和内容结构编排,原文链接:Distributed training with TensorFlow



一、概览

tf.distribute.strategy是一个TensorFlow API,用于跨多个gpu、多台机器或TPUs分发训练。使用这个API,我们可以用最少的代码更改来分发现有的模型和训练代码。
‘tf.distribute.strategy’有以下几个核心目标:
- 易于使用和支持多个用户组,包括研究人员,ML工程师等
- 提供良好的开箱即用功能(Provide good performance out of the box.)
- 策略间易于切换
除了在Keras里使用‘tf.distribute.Strategy’,我们还可以用于分发自定义训练循环(以及使用TensorFlow的任何计算)。
在TensorFlow2.0中,我们可以即刻执行自己的程序,或者,在图(graph)里使用‘tf.function.tf.distribute.Strategy’去支持这两种执行模式。虽然我们在本指南中大部分时间都在讨论训练,但是这个API也可以用于在不同的平台上分布评估和预测。
通过改动很少的代码,我们就可以使用‘tf.distribute.Strategy’,因为我们已经改变了TensorFlow的底层组件,使之具有战略意识。这包括变量、层、模型、优化器、指标、摘要和检查点。
这篇指南里,谷歌为我们讲述了好几种策略类型,以及在不同情况下如何使用这些策略。

注意:想要进一步了解分布训练的概念,可以参考:this deep-dive presentation(需要梯子)。特别是我们自定义训练循环的时候,谷歌强烈建议我们观看。

# Import TensorFlow
from __future__ import absolute_import, division, print_function, unicode_literals
import tensorflow as tf

二、策略类型

‘tf.distribute.Strategy’打算沿着不同的轴(axis)来覆盖许多用例。目前支持其中一些组合,将来还会添加其他组合。其中一些轴是:
- 同步和异步训练:这是两种使用数据并行性分布训练的常见方法。在同步训练中,所有工作器同步地训练不同的输入数据片,并在每一步聚合梯度。在异步训练中,所有工作器都独立地训练输入数据并异步地更新变量。通常,‘all-reduce’支持同步训练,参数服务器架构支持异步训练。
- 硬件平台:你可能希望将培训扩展到一台机器上的多个gpu,或网络中的多个机器(每个机器上有0个或多个gpu),或云TPUs上。
为了支持这些用例,TensorFlow中有5中可用的策略。在下一节中我们将介绍哪种策略支持哪种场景。下面是一个快速的概览:

Training APIMirroredStrategyTPUStrategyMultiWorkerMirroredStrategyCentralStorageStrategyParameterServerStrategyOneDeviceStrategy
Keras APISupportedExperimental supportExperimental supportExperimental supportSupported planned post 2.0Supported
Custom training loopExperimental supportExperimental supportSupport planned post 2.0Support planned post 2.0No support yetSupported
Estimator APILimited SupportNot supportedLimited SupportLimited SupportLimited SupportLimited Support

2.1 镜像策略(MirroredStrategy)

‘tf.distribute.MirroredStrategy’支持单台机器多GPU的分布式训练。它为每个GPU设备创建一个副本。模型中的每个变量都跨所有副本进行镜像。这些变量一起形成了一个单独的概念变量,称为“镜像变量”。通过应用相同的更新,这些变量彼此保持同步。
高效的‘all-reduce’算法用于在设备之间传递变量更新。All-reduce通过相加,在所有设备上聚合张量,使它们在每个设备上可用。这是一种非常有效的融合算法,可以显著降低同步的开销。根据设备之间可用的通信类型,有许多all-reduce算法和实现。默认情况下,它使用NVIDA NCCL作为all-reduce实现。我们可以从我们提供的其它选项中进行选择,或者编写我们自己的选项。
下面是创建‘镜像策略’的最简单方法:

mirrored_strategy = tf.distribute.MirroredStrategy()

上面的方法将创建一个‘MirroredStrategy’实例,它将使用所有对TensorFlow可见的GPU,并且用NCCL作为跨设备通信。

如果我们仅仅想用我们机器上的一些GPU,我们可以这样做:

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:0,/job:localhost/replica:0/task:0/device:GPU:1

如果希望覆盖跨设备通信,可以通过提供tf.distribution.crossdeviceops的实例,使用cross_device_ops参数来实现。目前tf.distribute.HierarchicalCopyAllReduce和tf.distribute.ReductionToOneDevice是两位两种选择,除了默认的tf.distribute.NcclAllReduce。

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())

2.2 中心存储策略(CentralStorageStrategy)

tf.distribute.experimental.CentralStorageStrategy也可以很好地进行同步训练。变量并没有镜像,相反他们被放置在CPU上,而‘操作’(operations)会被复制到本地所有GPU上。如果只有一个GPU,所有的变量和操作都将放置在GPU上。
通过以下代码创建一个‘CentralStorageStrategy’实例:

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
INFO:tensorflow:ParameterServerStrategy with compute_devices = ('/device:GPU:0',), variable_device = '/device:GPU:0'

上面的代码将创建一个‘CentralStorageStrategy’实例,它将使用所有可见的CPU和GPU。副本上的变量更新在应用到变量之前,将会被聚合。

注意:这条策略是条试验性策略,谷歌还在提升它并将它用于更多的场景中。所以以后要留意着个策略的官方更新。

2.3 多工作器镜像策略(MultiWorkerMirroredStrategy)

‘tf.distribute.experimental.MultiWorkerMirroredStrategy ’和‘MirroredStrategy’非常的像。它实现了跨多工作器进行同步分布训练,每个工作器可能带有多GPU。类似于‘MirroredStrategy’,它在所有工作器的每个设备上创建模型中所有变量的副本。

它使用CollectiveOps作为多工作者的all-reduce通信方法来保持变量的同步。集合操作(collective op)是张量流图中的单个操作(op),它可以根据硬件、网络拓扑结构和张量大小在TensorFlow运行时自动选择一个‘all-reduce’算法。

它还实现了额外的性能优化。例如,它包含一个静态优化,将小张量上的多个all-reductions转换为大张量上的更少的all-reductions。此外,我们正在设计一个插件架构——这样在未来,你将能够更好地为你的硬件调整插件算法。注意,集体操作(collective ops)还实现其他集体操作,如广播(broadcast)和all-gather。

下面是创建MultiWorkerMirroredStrategy的最简单方法:

multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker CollectiveAllReduceStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

与多gpu训练相比,多工作器(worker)训练的关键区别之一是多工作器(worker)设置。TF_CONFIG环境变量是TensorFlow中为集群中的每个工作器(worker)指定集群配置的标准方法。详情参考:6.1 创建‘TF_CONFIG’环境变量。

注意:这条策略是条试验性策略,谷歌还在提升它并将它用于更多的场景中。所以以后要留意着个策略的官方更新。

2.4 TPU策略(TPUStrategy)

‘tf.distribute.experimental.TPUStrategy’让我们在TPU上运行TensorFlow训练。TPUs是谷歌研制的专用集成电路,旨在显著提高机器学习的工作量。它们可以在谷歌Colab、TensorFlow研究云和Cloud TPU上找到。
就分布式训练架构来说,TPUStrategy和MirroredStrategy是一样的-它实现的是同步式分布训练。TPU会在多个TPU核上提供它们自己的高效的‘all-reduce’和集合式操作(collective operation),这些TPU核均使用‘TPUStrategy’。
下面是我们如何实例化‘TPUStrategy’(注意:下面这段代码在Colab上得选择TPU运行环境):

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.experimental.TPUStrategy(cluster_resolver)

‘TPUClusterResolver’实例帮助定位TPU。在Colab上,我们不再需要指定任何参数给它了。

如果想将其用于云TPUs: -我们必须在TPU参数中指定TPU资源的名称。-必须在程序开始时显式初始化tpu系统。在使用TPUs进行计算之前,这是必需的。初始化tpu系统也会清除tpu内存,所以为了避免丢失状态,首先完成这一步是很重要的。

注意:这条策略是条试验性策略,谷歌还在提升它并将它用于更多的场景中。所以以后要留意着个策略的官方更新。

2.5 参数服务器策略(ParameterServerStrategy)

	‘tf.distribute.experimental.ParameterServerStrategy’支持在多机器上用参数服务器进行训练。在这条策略里,有些机器被设计为‘生产者’(worker),有些这作为参数服务器。模型的每个变量被放置在参数服务器里。计算将被复制到所有生产者的GPU里。

从代码上看,它跟其它策略挺像:

ps_strategy = tf.distribute.experimental.ParameterServerStrategy()

对于多生产者(multi worker)训练,‘TF_CONFIG’需要指定参数服务器的配置和集群里的生产者,这些我们可以在下面的6.1节中看到。

2.6 唯一装置策略(OneDeviceStrategy)

‘tf.distribute.OneDeviceStrategy’在单独的设备上运行。此策略将把在其作用域内创建的任何变量放在指定的设备上。通过该策略分发的输入将被预取到指定的设备上。此外,通过‘strategy.experimental_run_v2’调用的函数也会被放置到指定设备上。
在使用那些分布到多设备/机器的策略之前,我们可以用这个策略来测试自己的代码。

strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

到目前为止,我们已经讨论了可用的不同策略以及如何实例化它们。在接下来的几节中,我们将讨论如何使用它们来分布我们的训练。我们将在本指南中展示简短的代码片段,并链接到我们可以从头到尾运行的完整教程。

三、使用Keras中的‘tf.distribute.Strategy’

‘tf.Keras’里集成了‘tf.distribute.Strategy’,而Keras可以用来创建和训练模型 ,所以我们可以在Keras训练框架里无缝地进行分布式训练。
下面使我们代码里需要更改的地方:
1.创建一个合适的示例‘tf.distribute.Strategy’
2.将Keras模型的创建和编译移到’strategy.scope’中。
我们支持所有类型的Keras模型-序列模型,功能模型和子类化模型。
下面是一段代码来做一个非常简单的带有一个紧密层的Keras模型。

mirrored_strategy = tf.distribute.MirroredStrategy()
with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  model.compile(loss='mse', optimizer='sgd')

这个例子中我们用的是‘MirroredStrategy’,所以我们可以在多GPU机器上运行它。‘strategy.scope()’现在哪块代码需要分布式运行。在这个范围内创建模型允许我们创建镜像变量而不是常规变量。范围(Scope)下的编译允许我们知道用户打算使用这个策略来训练这个模型。一旦设置好,我们就可以像往常一样拟合我们的模型了。‘MirroredStrategy’负责在可用的GPU上复制模型的训练,聚合梯度,等等。

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
Epoch 1/2
10/10 [==============================] - 3s 274ms/step - loss: 3.9620
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 1.8083
10/10 [==============================] - 0s 8ms/step - loss: 1.0876

1.087557077407837

下面我们用‘tf.data.Dataset’来提供训练和eval输入。我们也可以使用numpy数组:

import numpy as np
inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Train on 100 samples
Epoch 1/2
100/100 [==============================] - 1s 11ms/sample - loss: 0.7740
Epoch 2/2
100/100 [==============================] - 0s 120us/sample - loss: 0.3421

<tensorflow.python.keras.callbacks.History at 0x7f89006d92e8>

不管是Dataset还是numpy,每批给定输入在多个副本里都是均匀划分的。例如,如果‘MirroredStrategy’带有2个GPU,每个大小为10的批将会均匀划分在2个GPU里,每个GPU在每个步骤接收5个输入。通常,我们希望在添加更多加速器时增加批处理大小,以便有效地利用额外的计算能力。我们还需要根据模型重新调整我们的学习速率。我们可以用‘strategy.num_replicate_in_sync’去获得副本数量。

# Compute global batch size using number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]

3.1 现在可以支持哪些?

在TF2.0版本中,Keras可以支持‘MirroredStrategy’、‘TPUStrategy’、‘CentralStorageStrategy’和‘MultiworkerMirroredStrategy’。除了‘MirroredStrategy’,其它三种策略现在都是处于试验阶段。后续会持续更新改善。其余的策略谷歌也在开发中:

Training APIMirroredStrategyTPUStrategyMultiWorkerMirroredStrategyCentralStorageStrategyParameterServerStrategyOneDeviceStrategy
Keras APIsSupportedExperimental supportExperimental supportExperimental supportSupport planned post 2.0Supported

3.2 示例和教程

下面是一个教程和实例的列表,对上述Keras端到端的集成:
1.用‘MirroredStrategy’训练MNIST的教程。
2.ResNet使用‘MirroredStrategy’训练‘ImageNet’数据。
3.ResNet使用‘TPUStrategy’训练‘ImageNet’数据
4.使用‘MultiWorkerMirroredStrategy’训练MNIST教程
5. 使用‘MirroredStrategy’训练NCF
6. 使用‘MirroredStrategy’训练Transformer

四、在自定义训练循环中使用‘tf.distribute.Strategy’

现在我们已经看到,在‘Estimator’和‘Keras’中使用‘tf.distribute.Strategy’只需改动几行代码。再稍微努力点,我们也可以用在自定义训练循环中使用‘tf.distribute.Strategy’。

如果需要比Estimator或Keras更大的灵活性和对训练循环的控制,我们可以编写自定义训练循环。例如,在使用GAN时,我们可能希望每轮使用不同数量的生成器或鉴别器步骤。同样,高层框架也不太适合强化学习训练。

为了支持自定义训练循环,我们通过‘tf.distribute.Strategy’类提供了一组核心方法。使用这些方法可能最初需要对代码进行少量的重构,但是一旦完成了重构,您就应该能够通过更改策略实例在gpu、TPUs和多台机器之间进行切换。

在这里,我们将展示一个简短的代码片段,该代码片段演示了使用与之前相同的Keras模型进行简单培训示例的用例。

首先,我们在策略范围内创建模型和优化器。这是确保任何用模型和优化器创建的变量是镜像过的变量。

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()

下一步,我们创建输入数据集,基于策略调用‘tf.distribute.Strategy.experimental_distribute_dataset ’分发数据集。

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(1000).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

然后,我们定义一个训练步骤。我们将使用tf.GradientTape来计算梯度和优化应用这些梯度来更新我们的模型的变量。为了分发这个训练步骤我们放入了一个函数‘step_fn’并将其传递给‘tf.distribute.Strategy.experimental_run_v2’,与我们从之前创建的dist_dataset获得的数据集输入一起:

@tf.function
def train_step(dist_inputs):
  def step_fn(inputs):
    features, labels = inputs

    with tf.GradientTape() as tape:
      logits = model(features)
      cross_entropy = tf.nn.softmax_cross_entropy_with_logits(
          logits=logits, labels=labels)
      loss = tf.reduce_sum(cross_entropy) * (1.0 / global_batch_size)

    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
    return cross_entropy

  per_example_losses = mirrored_strategy.experimental_run_v2(
      step_fn, args=(dist_inputs,))
  mean_loss = mirrored_strategy.reduce(
      tf.distribute.ReduceOp.MEAN, per_example_losses, axis=0)
  return mean_loss

上面的代码中我们需要注意的几点:
1.我们使用了‘tf.nn.softmax_cross_entropy_with_logits’来训练代价。然后我们用全局批大小来衡量总损失。这一点很重要,因为所有的副本都是同步训练的,而训练的每个步骤中的示例数都是全局批处理的。因此,损失需要除以全局批大小,而不是副本(本地)批大小。
2.我们使用‘tf.distribute.Strategy.reduce’ API来聚合由‘tf.distribute.Strategy.experimental_run_v2’返回的结果。‘tf.distribute.Strategy.experimental_run_v2’从策略里每个本地副本中返回结果,而且我们有多种方法来利用这个结果。我们可以归纳他们得到一个聚合值,也可以用‘tf.distribute.Strategy.experimental_local_results’得到包含结果值的列表,每本本地副本得到一个列表。
3.当在分布策略范围内调用apply_gradients时,它的行为将被修改。具体地说,在同步训练过程中,在对每个并行实例应用梯度之前,它要对梯度进行累加。

最后,一旦我们定义训练步骤,我们可以通过‘dist_dataset’迭代,并且在循环里运行训练:

with mirrored_strategy.scope():
  for inputs in dist_dataset:
    print(train_step(inputs))
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)

在上面的例子中,我们通过迭代‘dist_dataset’来给我们的训练提供输入。谷歌同样通过‘tf.distribute.Strategy.make_experimental_numpy_dataset’来支持numpy输入。我们可以在调用‘tf.distribute.Strategy.experimental_distribute_dataset’之前用这个API创建一个数据集。

对数据进行迭代的另一种方法是显式地使用迭代器。当我们希望运行给定数量的步骤而不是遍历整个数据集时,我们可能希望这样做。现在可以修改上面的迭代,首先创建一个迭代器,然后显式地调用next以获得输入数据。

with mirrored_strategy.scope():
  iterator = iter(dist_dataset)
  for _ in range(10):
    print(train_step(next(iterator)))
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.0, shape=(), dtype=float32)

这涵盖了使用tf.distribute.Strategy API的最简单情况,来分发自定义训练循环。谷歌正在改进这些api。由于这个用例需要更多的工作来调整您的代码,谷歌将在未来发布一个单独的详细指南。

4.1 现在可以支持哪些?

在TF2.0版本,自定义训练循环支持使用‘MirroredStrategy’和‘TPUStrategy’。未来将会支持‘MultiWorkerMiorredStrategy’。

Training APIMirroredStrategyTPUStrategyMultiWorkerMirroredStrategyCentralStorageStrategyParameterServerStrategyOneDeviceStrategy
Custom Training LoopExperimental supportExperimental supportSupport planned post 2.0Support planned post 2.0No support yetSupported

4.2 示例和教程

下面是在自定义训练循环里使用分布策略:
1.使用MirroredStrategy训练MNIST数据集教程
2.使用‘MirroredStrategy’的DenseNet示例
3.使用‘MirroredStrategy’训练的BERT示例。这个示例对于理解在分布式训练加载检查点和定期生成检查点特别有帮助。
4.使用‘MirroredStrategy’和‘TPUStrategy’训练的NCF示例,它可以被‘keras_use_ctl’标识使能。
5.使用‘MirroredStrategy’训练的NMT示例。

五、在Estimator中使用‘tf.distribute.Strategy’(支持有限)

‘tf.estimator’是一个分布式训练TensorFlow API ,它最开始是支持异步参数服务器方法。像Keras一样,谷歌已经将‘tf.distribute.Strategy’整合进了‘tf.Estimator’。如果我们正在使用Estimator训练,我们可以通过改很少的代码进行分布式训练。现在,Estimator用户也可以在多GPU、多工作期以及TPU上做同步式分布训练。这些支持都是有限的,具体看下面5.1节。
Estimator里的‘tf.distribute.Strategy’使用范围和Keras场景是有轻微不同对的。与使用‘strategy.scope’不同的是,我们将策略对象传递到Estimator中的‘RunConfig’。
下面这段代码,使用了Estimator的‘LinearRegressor’和‘MirroredStrategy’:

mirrored_strategy = tf.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(
    train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy)
regressor = tf.estimator.LinearRegressor(
    feature_columns=[tf.feature_column.numeric_column('feats')],
    optimizer='SGD',
    config=config)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpyfhq9w2k
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpyfhq9w2k', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7f8900395668>, '_device_fn': None, '_protocol': None, '_eval_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7f8900395668>, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f89004d66d8>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}

这里我们使用了预制的Estimator,但有些代码用自定义Estimator工作的也挺好。‘train_distribute’决定训练如何分布,‘eval_distribute’决定评价如何分布。这点是我们在Keras里进行训练和评价期间使用同一策略时另一个不同点。

现在我们可以用一个输入函数来训练和评价这个Estimator:

def input_fn():
  dataset = tf.data.Dataset.from_tensors(({"feats":[1.]}, [1.]))
  return dataset.repeat(1000).batch(10)
regressor.train(input_fn=input_fn, steps=10)
regressor.evaluate(input_fn=input_fn, steps=10)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1630: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.
Instructions for updating:
If using Keras pass *_constraint arguments to layers.
INFO:tensorflow:Reduce to /replica:0/task:0/device:CPU:0 then broadcast to ('/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_core/python/feature_column/feature_column_v2.py:518: Layer.add_variable (from tensorflow.python.keras.engine.base_layer) is deprecated and will be removed in a future version.
Instructions for updating:
Please use `layer.add_weight` method instead.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/canned/linear.py:308: to_float (from tensorflow.python.ops.math_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.cast` instead.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmpyfhq9w2k/model.ckpt.
INFO:tensorflow:loss = 1.0, step = 0
INFO:tensorflow:Saving checkpoints for 10 into /tmp/tmpyfhq9w2k/model.ckpt.
INFO:tensorflow:Loss for final step: 2.877698e-13.
INFO:tensorflow:Reduce to /replica:0/task:0/device:CPU:0 then broadcast to ('/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Reduce to /replica:0/task:0/device:CPU:0 then broadcast to ('/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /replica:0/task:0/device:CPU:0 then broadcast to ('/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /replica:0/task:0/device:CPU:0 then broadcast to ('/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /replica:0/task:0/device:CPU:0 then broadcast to ('/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /replica:0/task:0/device:CPU:0 then broadcast to ('/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /replica:0/task:0/device:CPU:0 then broadcast to ('/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /replica:0/task:0/device:CPU:0 then broadcast to ('/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /replica:0/task:0/device:CPU:0 then broadcast to ('/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2019-10-24T01:22:46Z
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpyfhq9w2k/model.ckpt-10
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [1/10]
INFO:tensorflow:Evaluation [2/10]
INFO:tensorflow:Evaluation [3/10]
INFO:tensorflow:Evaluation [4/10]
INFO:tensorflow:Evaluation [5/10]
INFO:tensorflow:Evaluation [6/10]
INFO:tensorflow:Evaluation [7/10]
INFO:tensorflow:Evaluation [8/10]
INFO:tensorflow:Evaluation [9/10]
INFO:tensorflow:Evaluation [10/10]
INFO:tensorflow:Finished evaluation at 2019-10-24-01:22:46
INFO:tensorflow:Saving dict for global step 10: average_loss = 1.4210855e-14, global_step = 10, label/mean = 1.0, loss = 1.4210855e-14, prediction/mean = 0.99999994
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 10: /tmp/tmpyfhq9w2k/model.ckpt-10

{'average_loss': 1.4210855e-14,
 'label/mean': 1.0,
 'loss': 1.4210855e-14,
 'prediction/mean': 0.99999994,
 'global_step': 10}

这里要强调一下Estimator和Keras之间的另一个区别是输入处理。在Keras中,我们提到过每批数据集都是自动地在多个副本之间分割的。然而,在Estimator中,我们不进行批处理的自动分割,也不跨不同的工作器自动分割数据。我们可以完全按自己所想控制数据跨工作器和设备分布的方式,并且必须提供input_fn来指定如何分布数据。

我们的input_fn对每个worker调用一次,因此为每个worker提供一个数据集。然后将数据集中的一个批处理提供给该工作上的一个副本,进而在一个工作器上使用N个副本的N个批处理。换句话说,input_fn返回的数据集应该提供批量大小PER_REPLICA_BATCH_SIZE。步骤的全局批处理大小可以通过PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync获得。

当使用多工作期训练时,我们需要在多工作器上分割数据,或者打乱每个随机种子。Multi-worker Training with Estimator可以看到如何这样做的示范。

刚刚我们展示了一个在Estimator使用‘MirroredStrategy’的例子,我们还可以在确切的方式里在Estimator里使用‘TPUStrategy’。

config = tf.estimator.RunConfig(
    train_distribute=tpu_strategy, eval_distribute=tpu_strategy)

同样,我们也可以使用多个工作器和参数服务器策略。代码保持不变,但是需要使用tf.estimator.train_and_evaluate,并为集群中运行的每个二进制文件设置TF_CONFIG环境变量。

5.1 现在支持哪些

在TF2.0版本中,Estimator除了不支持‘TPUStrategy’以外,其它都是有限支持。基本的训练和评价可以正常工作,但一些高级特性比如‘scafford’目前还不能工作。整合当中也可能有些bug。这次,谷歌不计划提升支持,主要集中在对Keras和自定义训练循环的支持,所以我们需要更倾向使用这些API相关的‘tf.distribute’。

Training APIMirroredStrategyTPUStrategyMultiWorkerMirroredStrategyCentralStorageStrategyParameterServerStrategyOneDeviceStrategy
Estimator APILimited SupportNot supportedLimited SupportLimited SupportLimited SupportLimited Support

5.2 示例和教程

下面的例子,展示了在Estimator里使用不同策略的端到端模型示例:
1.在Estimator里,用使用‘MultiWorkerMirroredStrategy’的多工作器来训练MNIST数据
2.在tensorflow/生态系统中使用Kubernetes模板进行多工作器的端到端示例这个示例一开始是Keras模型,后面用‘tf.keras.estimator.model_to_estimator’转换为Estimator。
3.ResNet50官方示例,它可以被使用‘MirroredStrategy’或‘MultiWorkerMirroredStrategy’来训练

六、其它话题

在这个版块,我们将讨论几个关于多重(multiple)用例的话题

6.1 创建‘TF_CONFIG’环境变量

对于多工作器训练,之前已经提到过,我们需要在集群里的每个二进制文件设置‘TF_CONFIG’环境变量。TF_CONFIG环境变量是一个JSON字符串,它指定哪些任务构成集群、它们的地址以及每个任务在集群中的角色。我们在tensorflow/ecosystem 仓库中提供了一个Kubernetes模板,用于为我们的培训任务设置TF_CONFIG。
其中一个示例为:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})

TF_CONFIG指定集群中有三个worker和两个ps任务,以及它们的主机和端口。“任务”部分指定集群中当前任务的角色,worker 1(第二个worker)。集群中的有效角色是“主管(chief)”、“工人(worker)”、“ps”和“评价器(evaluator)”。除了在使用tf. distribution .experimental. ParameterServerStrategy时,不应该有“ps”作业。‘ps’指参数服务器(parameter server)

七、下一步?

	‘tf.distribute.Strategy’还处于积极开发中,	谷歌欢迎我们使用它,并使用‘GitHub issues’提供反馈。	
 类似资料: