当前位置: 首页 > 知识库问答 >
问题:

在torch.distributed中,如何正确地平均不同GPU上的梯度?

鞠嘉誉
2023-03-14

在torch.distributed中,如何正确地平均不同GPU上的梯度?

从https://github.com/seba-1511/dist_tuto.pth/blob/gh-pages/train_dist.py修改,下面的代码可以成功地使用两个GPU(可以用nvidia-smi检查)。

但是有一点很难理解的是,下面的“average_gradients”是否真的是在两个图形处理器上对两个模型的梯度进行平均的正确方法。像下面的代码一样,两个运行两个进程的“模型=网络()”是两个不同GPU上的两个模型,但是行“average_gradients(模型)”只是一个GPU上模型的“平均值”梯度,而不是两个GPU上的两个模型。

问题是,下面的代码是否确实是在两个GPU上平均梯度的正确方法?如果为真,如何阅读,如何理解代码?如果没有,下面两个模型上平均梯度的正确方法是什么?

import os
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from math import ceil
from random import Random
from torch.multiprocessing import Process
from torchvision import datasets, transforms
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1"

class Partition(object):
    """ Dataset-like object, but only access a subset of it. """

    def __init__(self, data, index):
        self.data = data
        self.index = index
    def __len__(self):
        return len(self.index)

    def __getitem__(self, index):
        data_idx = self.index[index]
        return self.data[data_idx]

class DataPartitioner(object):
    """ Partitions a dataset into different chuncks. """
    def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
        self.data = data
        self.partitions = []
        rng = Random()
        rng.seed(seed)
        data_len = len(data)
        indexes = [x for x in range(0, data_len)]
        rng.shuffle(indexes)
        for frac in sizes:
            part_len = int(frac * data_len)
            self.partitions.append(indexes[0:part_len])
            indexes = indexes[part_len:]

    def use(self, partition):
        return Partition(self.data, self.partitions[partition])


class Net(nn.Module):
    """ Network architecture. """

    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)


def partition_dataset():
    """ Partitioning MNIST """
    dataset = datasets.MNIST(
        './data',
        train=True,
        download=True,
        transform=transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307, ), (0.3081, ))
        ]))
    size = dist.get_world_size()
    bsz = int(256 / float(size))
    partition_sizes = [1.0 / size for _ in range(size)]
    partition = DataPartitioner(dataset, partition_sizes)
    partition = partition.use(dist.get_rank())
    train_set = torch.utils.data.DataLoader(
        partition, batch_size=bsz, shuffle=True)
    return train_set, bsz


def average_gradients(model):
    """ Gradient averaging. """
    size = float(dist.get_world_size())
    for param in model.parameters():
        dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM)
        param.grad.data /= size


def run(rank, size):
    """ Distributed Synchronous SGD Example """
    # print("107 size = ", size)
    # print("dist.get_world_size() = ", dist.get_world_size()) ## 2

    torch.manual_seed(1234)
    train_set, bsz = partition_dataset()
    device = torch.device("cuda:{}".format(rank))

    model = Net()
    model = model.to(device)
    optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)

    num_batches = ceil(len(train_set.dataset) / float(bsz))
    for epoch in range(10):
        epoch_loss = 0.0
        for data, target in train_set:
            # data, target = Variable(data), Variable(target)
           # data, target = Variable(data.cuda(rank)), Variable(target.cuda(rank))
            data, target = data.to(device), target.to(device)

            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            epoch_loss += loss.item()
            loss.backward()
            average_gradients(model)
            optimizer.step()
        print('Rank ',
              dist.get_rank(), ', epoch ', epoch, ': ',
              epoch_loss / num_batches)
        # if epoch == 4:
            # from utils import module_utils
            # module_utils.save_model()

def init_processes(rank, size, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)


if __name__ == "__main__":
    size = 2
    processes = []
    for rank in range(size):
        p = Process(target=init_processes, args=(rank, size, run))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

共有2个答案

孙池暝
2023-03-14

在一台机器上运行DPP。关于验证,我处理得不太好,只是检查验证结果。

邢承弼
2023-03-14

我的解决方案是使用DistributedDataParallel而不是下面的DataParallel。

代码

for param in self.model.parameters():
    torch.distributed.all_reduce(param.grad.data)

你可以成功地工作。

class DDPOptimizer:
    def __init__(self, model, torch_optim=None, learning_rate=None):
        """
        :param parameters:
        :param torch_optim: like torch.optim.Adam(parameters, lr=learning_rate, eps=1e-9)
            or optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
        :param is_ddp:
        """
        if torch_optim is None:
            torch_optim = torch.optim.Adam(model.parameters(), lr=3e-4, eps=1e-9)

        if learning_rate is not None:
            torch_optim.defaults["lr"] = learning_rate

        self.model = model
        self.optimizer = torch_optim

    def optimize(self, loss):
        self.optimizer.zero_grad()
        loss.backward()
        for param in self.model.parameters():
            torch.distributed.all_reduce(param.grad.data)

        self.optimizer.step()
    pass

def run():
    """ Distributed Synchronous SGD Example """

    module_utils.initialize_torch_distributed()
    start = time.time()

    train_set, bsz = partition_dataset()
    model = Net()

    local_rank = torch.distributed.get_rank()
    device = torch.device("cuda", local_rank)
    model = model.to(device)

    sgd = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
    optimizer = DDPOptimizer(model, torch_optim=sgd)

    # optimizer = NoamOptimizerDistributed(100, 1, 10, model)

    num_batches = math.ceil(len(train_set.dataset) / float(bsz))

    epoch, end_epoch = 1, 10

    while epoch <= end_epoch:
        epoch_loss = 0.0
        for data, target in train_set:
            data, target = data.to(device), target.to(device)

            output = model(data)
            loss = F.nll_loss(output, target)
            epoch_loss += loss.item()
            optimizer.optimize(loss)

        print('Rank ', dist.get_rank(), ', epoch ', epoch, ': ', epoch_loss / num_batches)
        # if epoch % 6 == 0:
        #     if local_rank == 0:
        #         module_utils.save_model(model, "a.pt")
        epoch += 1

    print("Time take to train: ", time.time() - start)
 类似资料:
  • 目前,我有一个EC2实例(2个CPU、8GB RAM、Linux),上面有: NodeJS Express服务器(后端) NodeJS“微服务”Express服务器(与API等通信) PostgreSQL数据库 Redis(作为我的服务器(1)和我的微服务服务器(2)之间的消息代理 与服务器(1)通信的ReactJS应用程序(前端) 现在,这在低流量下工作得很好,但是随着流量的增加(每分钟/小时1

  • 问题内容: 如何在MySQL中的日期之间取平均值?我对时间值,小时和分钟更感兴趣。 在具有以下内容的桌子上: 进行如下查询: 编辑: 的作品,但我不知道它是什么返回数据。 问题答案: 这似乎有点骇人听闻,但适用于1970年〜1970年和2030年之间的日期(在32位元弓上)。您实际上是在将日期时间值转换为整数,对其求平均,然后将平均值转换回日期时间值。 可能有更好的解决方案,但这会帮助您紧要关头。

  • 我创建一个PHP计算器,需要使用以下类,然后打印出用户的名字和他们取得的平均分数。这是我到目前为止的代码,但是它没有正确显示,它说有丢失的参数和未定义的变量,但是我不确定我哪里出错了! 错误消息:警告:在C:\Program Files(x86)\EasyHP-DevServer-14.1VC11\data\localweb\my portable Files\练习4\average中调用的stu

  • 问题内容: 我的网页上有一个宽度固定的容器,其中包含用于登录的表单元素。在这些元素下方有一个提交按钮,忘记密码的链接等。 碰巧最后一个线元素需要的宽度比框提供的宽度少。如何均匀地传播它们?我不要 默认 | ABC | 将线居中 | ABC | 也不是桌子的布局 | A | B | C | 相反,我正在寻找实现的CSS方法: 那是: 在所有元素之间放置相等的空间 将整个内容居中,避免第一侧或最后侧

  • 我将Rails6与WebPack一起使用。我想使用库,但我不知道我必须如何将它包含到文件中。 我的步骤: null 我的 我的