用一个例子感受一下。传统的 PyTorch training loop一般长这样:
my_model.to(device)
for batch in my_training_dataloader:
my_optimizer.zero_grad()
inputs, targets = batch
inputs = inputs.to(device)
targets = targets.to(device)
outputs = my_model(inputs)
loss = my_loss_function(outputs, targets)
loss.backward()
my_optimizer.step()
和 Accelerate
配合使用,只需要增加一丢丢代码:
+ from accelerate import Accelerator
+ accelerator = Accelerator()
- # my_model.to(device)
## Pass every important object (model, optimizer, dataloader) to *accelerator.prepare*
+ my_model, my_optimizer, my_training_dataloader = accelerate.prepare(
+ my_model, my_optimizer, my_training_dataloader
+ )
for batch in my_training_dataloader:
my_optimizer.zero_grad()
inputs, targets = batch
- # inputs = inputs.to(device)
- # targets = targets.to(device)
outputs = my_model(inputs)
loss = my_loss_function(outputs, targets)
# Just a small change for the backward instruction
- loss.backward()
+ accelerator.backward(loss)
my_optimizer.step()
Accelerate 通过一个 CLI tool 使得用户不需要再去学习 torch.distributed.lauch,也不需要了解如何专门面向 TPU training 写 specific launcher. 具体操作为:
accelerate config
accelerate launch my_script.py --args_to_my_script
1. import Accelorator 并实例化
from accelerate import Accelerator
accelerator = Accelerator()
【注意】
- 越早越好,比如在进入 main 函数的第一步,此时会初始化(分布式)训练所需要的 everything.
- 不需要指明实验环境,accelerator 会自动检测。
2. 删除 model 和 data 对 .to(device)
和 .cuda()
的调用。accelerator 会帮你自动把模型和数据 place 到正确的 device 上面。
【注意】
- 可以保留对 .to(device) 的调用,但此时需保证 device = accelerator.device.
- 若不想采用 accelerator 的自动配置,则需要在初始化 Accelerator 的时候设置 device_placement=False.
3. 将所有 training 设计的对象 (optimizer, model, training_dataloader, learning rate scheduler) 送进 prepare().
model, optimizer, train_dataloader, lr_scheduler = accelerator.prepare(
model, optimizer, train_dataloader, lr_scheduler
)
此时,training dataloader 将在所有的 GPU/TPU 中间进行分享,也就是说,每个 device 拿到了 training dataset 的不同部分。
同时,所有 processes 的 random states 将在 dataloader 开始每个 iteration的时候进行同步,来确保 data 以相同的方式进行 shuffle (如果设置sampler shuffle=True 的话)。
【注意】
- 实际的 batch_size = number_of_devices * batch_size_set_in_script.
- 如果在 initialize Accelerator 的时候设置 split_batches=True,则 batch_size 不会随 device 的数量改变。
- 在所有关于 training 的 object 创建结束之后的第一时刻调用 prepare()。
- 对 len(training_dataloader) 的调用应当出现在 prepare() 之后。
- 如果不需要进行分布式 evaluation, 则无需将 eval_dataloader() 送入 prepare()。
4. 用 accelerator.backward(loss) 替换 loss.backward().
以上即为 accelerator 关于分布式训练的基本操作。Accelerate launcher 可以。
validation_dataloader = accelerator.prepare(validation_dataloader)
for inputs, targets in validation_dataloader:
predictions = model(inputs)
# Gather all predictions and targets
all_predictions = accelerator.gather(predictions)
all_targets = accelerator.gather(targets)
# Example of use with a *Datasets.Metric*
metric.add_batch(all_predictions, all_targets)
【注意】
- gather() 要求由不同 process 得到的 tensor 具有相同的 siz
- 若此条件不满足 (e.g., dynamic padding),则应当通过 pad_across_processes() 将不同 process 的 tensor pad 到相同 size.
model.eval()
samples_seen = 0
for step, batch in enumerate(eval_dataloader):
with torch.no_grad():
outputs = model(**batch)
predictions = outputs.logits.argmax(dim=-1)
labels = batch["labels"]
if not args.pad_to_max_length: # necessary to pad predictions and labels for being gathered
predictions = accelerator.pad_across_processes(predictions, dim=1, pad_index=-100)
labels = accelerator.pad_across_processes(labels, dim=1, pad_index=-100)
predictions_gathered, labels_gathered = accelerator.gather((predictions, labels))
# If we are in a multiprocess environment, the last batch has duplicates
if accelerator.num_processes > 1:
if step == len(eval_dataloader):
predictions_gathered = predictions_gathered[: len(eval_dataloader.dataset) - samples_seen]
labels_gathered = labels_gathered[: len(eval_dataloader.dataset) - samples_seen]
else:
samples_seen += labels_gathered.shape[0]
preds, refs = get_labels(predictions_gathered, labels_gathered)
metric.add_batch(
predictions=preds,
references=refs,
) # predictions and preferences are expected to be a nested list of labels, not label_ids
if accelerator.is_local_main_process:
# Is executed once per server
from tqdm.auto import tqdm
progress_bar = tqdm(range(args.max_train_steps), disable=not accelerator.is_local_main_process)
注意,上面 code 中的 local
指的是 per machine
。对于一些只需要进行一次的操作,例如将 train 好的 model upload 到 huggingface model hub,则需要
if accelerator.is_main_process:
# Is executed once only
accelerator.print('Test!')
accelerator.wait_for_everyone()
accelerator.wait_for_everyone()
unwrapped_model = accelerator.unwrap_model(model)
# save
accelerator.save(unwrapped_model.state_dict(), filename)
如果在模型通过prepare() 后使用load函数时,则需要:
unwrapped_model = accelerator.unwrap_model(model)
unwrapped_model.load_state_dict(torch.load(filename))
当我们在训练模型时,我们可能想要保存 model, optimizer, random generators, learning rate schedulers 等的当前状态。此时我们可以通过调用 accelerator.save_state(path_str)
和 accelerator.load_state(path_str)
来分别实现相关对象状态的存储和调用。
其他通过 accelerator.register_for_checkingpointing
register 的涉及状态的 items,也将通过上述函数的调用被存储/调用。
注意,送入 register_for_checkpointing
对象必须具有 load_state_dict
和 save_dict
函数可以调用。
torch.nn.utils.clip_grad_norm_
或 torch.nn.utils.clip_grad_value_
,我们应当使用 accelerator.clip_grad_norm_
或 accelerator.clip_grad_value_
进行替换。accelerator.autocast()
实现:with accelerator.autocast():
loss = complex_loss_function(outputs, target):
【注意】在 Mixed Precision training 中,起始阶段可能会 skip 几步梯度更新(由 dynamic loss scaling stratergy 造成:在训练过程中,有些点会出现梯度溢出现象,此时会减小 loss scaling factor 的取值来避免在后续更新中再次发生这种情况)。在这种情况之下,我们可能在没有梯度更新的时候更新 learning rate scheduler (该现象暂记为 mismatch)。一般情况下,出现 mismatch 现象无伤大雅,但当训练样本很少时可能存在一些问题。如果学习率的初始值对于我们的模型训练十分关键,此时我们可以通过 skip learning rate scheduler 的更新来避免 mismatch 的发生:
if not accelerator.optimizer_step_was_skipped:
lr_scheduler.step()
=> 本质上说,accelerate lib 首先会分析 launch 当前 script 的环境来决定 i) 具体采用的 distributed setup, ii) 不同 processes 的数目, iii) 当前 script 所在的 process. 相关信息均存储在 ~AcceleratorState
中。
当我们实例化 Accelerator
(并根据我们所需的 distributed setup 执行个性化初始化) 时会进行第一次初始化,然后共享至 AcceleratorState
的所有实例。
=> 当我们调用 prepare() 时,accelerate lib 会
AcceleratedOptimizer
.DataLoaderShard
.=> 当把 model 和 optimizer 送入 wraper 后,lib 会重新 create dataloader(s)。这主要是因为 PyTorch dataloader 在实例化之后,用户无法更改 batch_sampler。而 lib 通过改变 batch_sampler 的方式来利用不同的 process 处理不同的数据分片。
DataLoader 的子类 DataLoaderShard
中新的功能主要包括: