python多进程processing中的start和join函数以及pytorch.distributed中初始化进程组init_process_group函数

曹茂材
2023-12-01

背景

在学习pytorch自带的数据并行训练时,有两个库,torch.nn.DataParalleltorch.nn.parallel.DistributedDataParallel,其中第一个库是多线程,也就是一个线程控制一个GPU,第二个是多进程,一个进程控制一个GPU。
如果是一个进程控制一个GPU的话,我们会用到torch.multiprocessing库,用它生成多线程,并使每个线程与每个GPU建立联系,比如一个线程控制一个GPU。而这个建立联系就需要用到torch.distributed.init_process_group()。

参考

多进程

torch.processing和python的processing库差不多,加了一些对tensor的支持。简单的建立多线程的例子如下:

import torch.multiprocessing as mp

def func(name):
    print('hello, ', name)

if __name__ == '__main__':
    names = ['bob', 'amy', 'sam']
    for name in names:
        p = mp.Process(target=func, args=(name,))
        p.start()

结果:

hello,  bob
hello,  amy
hello,  sam

以上就是创建了三个进程,每个进程会执行函数func,但是传入的参数name各不相同。
注意,只能写在py文件中,不能是ipynb文件。

start

start()函数就是进程开始执行。

join

join函数可以理解为,如果某个子进程执行了join函数,那么在该子进程执行到join之前,父进程都会等待。
例如以上代码可以局部修改为:

if __name__ == '__main__':
    names = ['bob', 'amy', 'sam']
    for name in names:
        p = mp.Process(target=func, args=(name,))
        p.start()
    print('it\'s over')

结果为:

hello,  bob
hello,  amy
it's over
hello,  sam

如果把代码再修改为:

if __name__ == '__main__':
    processes = []
    names = ['bob', 'amy', 'sam']
    for name in names:
        p = mp.Process(target=func, args=(name,))
        p.start()
        processes.append(p)
    for p in processes:
        p.join()
    print('it\'s over')

结果为:

hello,  bob
hello,  amy
hello,  sam
it's over

it’s over将会严格在子进程执行完之后打印。

创建子进程的方法

在类Unix平台中,创建子进程的方法有三种:spawnforkforkserver,Window只有一种spawn。可以通过multiprocessing.set_start_method(xxx)改变。具体可参考:multiprocessing — Process-based parallelism

torch.distributed.init_process_group

函数原型为torch.distributed.init_process_group(backend, init_method=None, timeout=datetime.timedelta(seconds=1800), world_size=- 1, rank=- 1, store=None, group_name='', pg_options=None)

  • backend:GPU通信的后端,有ncclgloompi三种,nccl比较好。
  • init_method:用于初始化进程组,将其与GPU联系起来,之后再说。
  • world_size:我们一共使用多少个进程,比如4个GPU就可以设置为4。
  • rank:当前进程的标号,0-world_dize-1之间。

有一个稍微复杂的例子如下所示,例子就是:WRITING DISTRIBUTED APPLICATIONS WITH PYTORCH

### 分布式应用example:
#### https://pytorch.org/tutorials/intermediate/dist_tuto.html
import random
import torch
import torch.nn as nn
from torchvision import datasets, transforms
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.optim.lr_scheduler import StepLR
import torch.nn.functional as F
import torch.optim as optim
import math
import os


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output


""" 数据集分割 """
class Partition(object):

    def __init__(self, data, index):
        self.data = data
        self.index = index
    
    def __len__(self):
        return len(self.index)

    def __getitem__(self, index):
        data_idx = self.index[index]
        return self.data[data_idx]
# 这里就给了一个data,和一个index数组,等于把data提取出
# index中的部分。


class DataPartitioner(object):

    def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
        self.data = data
        self.partitions = []
        rng = random.Random()
        rng.seed(seed)
        data_len = len(data)
        indexes = [x for x in range(0, data_len)]
        rng.shuffle(indexes)  # 对数据下标随机排序

        for frac in sizes:
            part_len = int(frac * data_len)
            self.partitions.append(indexes[0:part_len])
            indexes = indexes[part_len:]
    # 这里最后就根据sizes,把data分成了几份,每一份的index作为一个
    # 数组放到self.partitions中


    # 这个use就是返回当前第几个进程的数据集
    def use(self, partition):
        return Partition(self.data, self.partitions[partition])


""" 将 MNIST 数据集分割 """
# size是GPU数目
def partition_dataset(rank, size):
    dataset = datasets.MNIST('./data', train=True, download=False,
                             transform=transforms.Compose([
                                 transforms.ToTensor(),
                                 transforms.Normalize((0.1307,), (0.3081,))
                            ]))
    # size = dist.get_world_size()  # 进程数目,也就是GPU数目
    bsz = math.ceil(128 / float(size))
    # 这就是按照GPU或者进程数目,把1分了一下
    partition_size = [1.0 / size for _ in range(size)]
    partition = DataPartitioner(dataset, partition_size)
    partition = partition.use(rank)
    # partition是当前进程的子数据集
    train_set = torch.utils.data.DataLoader(partition, batch_size=bsz, shuffle=True)

    return train_set, bsz
    # 返回当前进程的子数据集的dataloader和batch_size

# 虽然是分布式,但是总的batch_size是128


""" Gradient averaging. """
def average_gradients(model):
    size = float(dist.get_world_size())  # 进程数目,也就是GPU数目
    for param in model.parameters():
        # param.grad.data是每个参数的梯度
        dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)  # 先求和
        param.grad.data /= size  # 再除,等于求平均


""" Distributed Synchronous SGD Example """
# rank 是 当前进程号
def run(rank, size):
    torch.manual_seed(1234)
    train_set, bsz = partition_dataset(rank, size)

    # 使用GPU
    device = torch.device("cuda:{}".format(rank))
    model = Net().to(device)
    optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)

    num_batches = math.ceil(len(train_set.dataset) / float(bsz))
    # 对于该进程,一共有多少个batch
    # 由于数据集均分了,batch_size也均分了,所以batch的数目与单进程一样。
    for epoch in range(10):
        epoch_loss = 0.0
        # num = 0
        for data, target in train_set:
            data, target = data.to(device), target.to(device)
            # num += 1
            # print('Rank', rank, 'is dealing no.', num)
            optimizer.zero_grad()
            # 先把梯度归零
            output = model(data)
            loss = F.nll_loss(output, target)
            epoch_loss += loss.item()
            loss.backward()   # 反向传播求梯度
            average_gradients(model)
            # 这是分布式里不一样的!!!!!
            optimizer.step()   # 更新参数
        print('Rank', rank, ', epoch', epoch, ': ', epoch_loss/num_batches)


# rank是本进程的进程号,下标(0--size-1)?size是一共开几个进程分解工作
def init_processes(rank, size, fn, backend='gloo'):
    """初始化分布式环境"""
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)

if __name__ == "__main__":
    size = 4  # GPU的总数    不能用get_world_size,因为还没调用init_process_group!!!!!
    processes = []
    mp.set_start_method("fork")
    for rank in range(size):
        p = mp.Process(target=init_processes, args=(rank, size, run))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

运行结果如下:

Rank 1 , epoch 0 :  0.5456569447859264
Rank 3 , epoch 0 :  0.5368310891425432
Rank 2 , epoch 0 :  0.519815386723735
Rank 0 , epoch 0 :  0.5490584967614237
Rank 3 , epoch 1 :  0.2695276010860957
Rank 2 , epoch 1 :  0.25867786008253024
Rank 0 , epoch 1 :  0.26852701605160606
Rank 1 , epoch 1 :  0.2737363768117959

可以看到,4个GPU成功运行。

我们只关注init_processes函数和main函数。

main函数下首先设置GPU数量size,有几个GPU就可以设置为几。然后就通过mp.Process()创建进程,每个进程会先执行init_processes()函数,该函数首先通过:os.environ['MASTER_ADDR'] = '127.0.0.1' os.environ['MASTER_PORT'] = '29500'设置一些东西,然后调用dist.init_process_group()函数,我们可以认为这就把GPU和进程对应起来了,并且GPU之间能通信了。接着就执行run(rank, size)函数,该函数通过传入的两个参数(分别是GPU的index和GPU的总数量)进行数据并行训练。这里dist.init_process_group()函数之后的我们都不关心。

三种创建进程后的初始化(GPU通信)方法:

这里就是我们创建了进程后,如何将进程与GPU联系在一起的方法。参考:DISTRIBUTED COMMUNICATION PACKAGE - TORCH.DISTRIBUTED

  • Environment variable initialization
    这个就是上面代码的方法。通过类似os.environ['MASTER_ADDR']设置一些环境变量。
    • MASTER_PORT是rank0 进程的ip地址,多个GPU配合总需要一个GPU挑头,默认是rank0这个 GPU,所以我们就将其所在机器的ip地址设置好。多机器训练的话使用socket通信就有用了,不过我没试过。单机器设置成localhost就行。
    • MASTER_PORT是节点rank0所在机器的一个空闲端口。
    • WORLD_SIZE:GPU总数,可以在这里设置,也可以在distributed.init_process_group函数里设置。上面的实例通过传入两个参数在distributed.init_process_group函数里设置。
    • RANK:当前GPU(进程)的下标,可以在这里设置,也可以在distributed.init_process_group函数里设置。上面的实例通过传入两个参数在distributed.init_process_group函数里设置。
  • TCP initialization:跟上一个方法差不多。我们可以将init_processes函数修改为如下,代码同样跑通。
def init_processes(rank, size, fn, backend='nccl'):
    """初始化分布式环境"""
    dist.init_process_group(backend, init_method='tcp://localhost:29500', rank=rank, world_size=size)
    fn(rank, size)
  • Shared file-system initialization:使用共享文件进行初始化。这个文件必须被组内所有的机器共享访问。设置init_process_group函数中的init_method='file://xxxx',其中文件要不存在,但是父文件夹要存在。执行完后并不会把共享文件自动删除,所以我们要手动删除。我试了一下没成功,可能需要设置一些东西,不弄了。
 类似资料: