YOLOv5-4.0-torch_utils.py 源代码导读

卫建义
2023-12-01

YOLOv5介绍

YOLOv5为兼顾速度与性能的目标检测算法。笔者将在近期更新一系列YOLOv5的代码导读博客。YOLOv5为2021.1.5日发布的4.0版本。
YOLOv5开源项目github网址
源代码导读汇总网址
本博客导读的代码为utils文件夹下的torch_utils.py文件,更新日期为2021.1.10.

torch_utils.py

该文件为基于pytorch的一些实用工具的编写。
相关导入模块及对应说明如下所示:

import logging #日志功能生成模块
import math   #以下为torch相关模块和math模块
import os      #与操作系统进行交互的模块
import subprocess #用于创建新的进程来执行子程序的模块
import time    #时间处理模块
from contextlib import contextmanager # 用于进行上下文管理的模块
from copy import deepcopy  #实现深复制的模块
from pathlib import Path   #对于路径拼接、拆分等十分实用的模块

import torch
import torch.backends.cudnn as cudnn
import torch.nn as nn
import torch.nn.functional as F
import torchvision

try:
    import thop  # 计算pytorch模型FLOPS工具
except ImportError:
    thop = None
logger = logging.getLogger(__name__) #初始化日志

torch_distributed_zero_first 函数用于处理模型进行分布式训练时同步的问题。

@contextmanager #装饰器 为上下文管理模块
def torch_distributed_zero_first(local_rank: int):
    """
    让所有进程在分布式训练中等待每一个local_master做事情的修饰器
    """
    if local_rank not in [-1, 0]: #判断local进程是否是主进程
        #如果不是主进程 该函数会产生一个阻挡 限制这个进程的进行 直到所有进程进行同步
        torch.distributed.barrier()
    yield # 该yield语句 会执行with当中的语句,在上述两个if判断中间执行
    if local_rank == 0: 
        """
          如果是主进程 则设置阻挡 上述if中非主进程均已阻挡 
          等待主进程完成后 此时所有进程完成同步 并可以同时完成释放
        """
        torch.distributed.barrier()

init_torch_seeds 函数初始化相关种子并确定训练模式。

def init_torch_seeds(seed=0):
    # 速度与可重复性之间的权衡
    torch.manual_seed(seed) # 为CPU设置设置随机种子
    """
    benchmark模式会自动寻找最优配置 但由于计算的随机性 每次网络进行前向反馈时会有差异
    避免这样差异的方式就是将deterministic设置为True(该设置表明每次卷积的高效算法均相同)
    """
    if seed == 0:  # 慢但具有可重复性
        cudnn.benchmark, cudnn.deterministic = False, True
    else:  # 快但是具有低重复性
        cudnn.benchmark,cudnn.deterministic  = True, False

git_describe 函数用于返回可读的git描述。

def git_describe():
    # 返回人类可读的git描述 
    # i.e. v5.0-5-g3e25f1e https://git-scm.com/docs/git-describe
    if Path('.git').exists(): #判断'.git'路径是否存在
        #check_output为父进程等待子进程执行完 执行linux命令为''中的字符串
        return subprocess.check_output('git describe --tags --long --always', shell=True).decode('utf-8')[:-1]
    else:
        return ''

select_device 函数用于选择模型训练的设备,并输出日志信息。

def select_device(device='', batch_size=None):
    # device = 'cpu' or '0' or '0,1,2,3' 设备可以输入的设备形式
    s = f'YOLOv5 {git_describe()} torch {torch.__version__} '  # 描述当前yolov5版本和torch版本信息的str
    cpu = device.lower() == 'cpu'# 如果device输入为CPU 则cpu_request为True
    if cpu:
        os.environ['CUDA_VISIBLE_DEVICES'] = '-1'  # 强制(使用cpu)让torch.cuda.is_available() = False
    elif device:  # 如果设备需求是若干个显卡
        os.environ['CUDA_VISIBLE_DEVICES'] = device  #设置环境变量 加入CUDA可用的设备 
        #检查cuda的可用性 如果不满足则终止程序 这里%device对应前str中%s 进行格式化输出
        assert torch.cuda.is_available(), f'CUDA unavailable, invalid device {device} requested'  

    cuda = not cpu and torch.cuda.is_available() #当使用CPU并且CUDA不可用时 cuda被设置为False
    if cuda:
        n = torch.cuda.device_count()#返回可用gpu数量
        
        if n > 1 and batch_size:  # 检查batchsize是否能被显卡数目整除 如果不能则终止程序
            assert batch_size % n == 0, f'batch-size {batch_size} not multiple of GPU count {n}'
            
        space = ' ' * len(s) #定义等长的空格
        
        for i, d in enumerate(device.split(',') if device else range(n)): #i为device字符串中每个字符的序号 d为对应内容
            p = torch.cuda.get_device_properties(i) #p为每个可用显卡相关属性 示例如下 
            """
            如torch 1.5.0 _CudaDeviceProperties(name='GeForce GTX 1060', major=6, minor=1, total_memory=6078MB, multi_processor_count=10)
            从第二行开始清空字符串s 显示上会更加美观 不用每一次都输出torch版本(why i==1 so'')
            """
            s += f"{'' if i == 0 else space}CUDA:{d} ({p.name}, {p.total_memory / 1024 ** 2}MB)\n"  # 将bytes转换为MB
    else:
        s += 'CPU\n'

    logger.info(s)  #  将信息输出到日志文件之中
    return torch.device('cuda:0' if cuda else 'cpu')#返回第一张显卡对应的名称 如 GeForce RTX 2060

time_synchronized 用于精确测量模型运算过程时间

def time_synchronized():
    # 基于pytorch的精准时间测量
    if torch.cuda.is_available(): #如果cuda可用则执行synchronize函数 
        # 该函数会等待当前设备上的流中的所有核心全部完成 这样测量时间便会准确 因为pytorch中程序为异步执行
        torch.cuda.synchronize()
    return time.time()

profile 用于输入每运算操作相关信息

def profile(x, ops, n=100, device=None):
    # profile a pytorch module or list of modules. 使用示例: ops代表操作过程
    #     x = torch.randn(16, 3, 640, 640)  # input
    #     m1 = lambda x: x * torch.sigmoid(x)
    #     m2 = nn.SiLU()
    #     profile(x, [m1, m2], n=100)  # profile speed over 100 iterations

    device = device or torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    
    x = x.to(device) #将x变量写到指定的设备上
    x.requires_grad = True #表明需要计算tensor x 的梯度
    # 打印当前设备相关信息 及浮点数计算量 前向、反向 输入输出相关的时间
    print(torch.__version__, device.type, torch.cuda.get_device_properties(0) if device.type == 'cuda' else '')
    print(f"\n{'Params':>12s}{'GFLOPS':>12s}{'forward (ms)':>16s}{'backward (ms)':>16s}{'input':>24s}{'output':>24s}")
    
    for m in ops if isinstance(ops, list) else [ops]:
        m = m.to(device) if hasattr(m, 'to') else m  # 确保每一个过程在对应的设备上执行
        # 保证过程m计算精度和x一致 默认精度为32位浮点数 half操作会将浮点数位数减半
        m = m.half() if hasattr(m, 'half') and isinstance(x, torch.Tensor) and x.dtype is torch.float16 else m  
        dtf, dtb, t = 0., 0., [0., 0., 0.]  # 初始化前向、反向传播消耗时间
        try:
            flops = thop.profile(m, inputs=(x,), verbose=False)[0] / 1E9 * 2  #计算GFLOPS
        except:
            flops = 0

        for _ in range(n): # n=100 
            t[0] = time_synchronized() #过程前时间
            y = m(x)                 
            t[1] = time_synchronized() #过程后时间
            try:
                _ = y.sum().backward() #进行求和之后再反向传播进行求导 这里求导结果均为1 意义不大
                t[2] = time_synchronized() #传播后时间
            except:  # 当没有反向传播方法时
                t[2] = float('nan')
            
            dtf += (t[1] - t[0]) * 1000 / n  # 平均每一个前向传播操作用的时间(单位为毫秒)
            dtb += (t[2] - t[1]) * 1000 / n  # 平均每一个反向传播操作用的时间(单位为毫秒)

        s_in = tuple(x.shape) if isinstance(x, torch.Tensor) else 'list' # 输入变量的尺寸
        s_out = tuple(y.shape) if isinstance(y, torch.Tensor) else 'list'# 输出变量的尺寸
        # .numel()函数用于返回数组中的元素个数 p为网络中参数的总数目
        p = sum(list(x.numel() for x in m.parameters())) if isinstance(m, nn.Module) else 0  
        #格式化输出每个过程的配置 12为间隔 .4g中4为有效数字位数 g为科学计数法 s为以str的形式输出:>为向后推一定间隔 24为间隔大小
        print(f'{p:12.4g}{flops:12.4g}{dtf:16.4g}{dtb:16.4g}{str(s_in):>24s}{str(s_out):>24s}')

is_parallel 用于判断模型是否支持并行

def is_parallel(model): #判断模型是否并行
    #返回值为True/False 代表model的类型 是否在后续tuple内
    return type(model) in (nn.parallel.DataParallel, nn.parallel.DistributedDataParallel)

intersect_dicts 用于筛选字典中的键值对

def intersect_dicts(da, db, exclude=()):
    # 返回字典da 中的键值对 要求键k在字典db中且全部都不在exclude中 同时da中值的shape对应db中值的shape 需相同 
    return {k: v for k, v in da.items() if k in db and not any(x in k for x in exclude) and v.shape == db[k].shape}

initialize_weights 模型权重初始化函数

def initialize_weights(model): #模型初始化权重
    for m in model.modules():
        t = type(m)
        if t is nn.Conv2d: #如果是2维卷积层则跳过
            pass  # nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
        elif t is nn.BatchNorm2d: #如果是2维BN层 则设置相关参数如下
            m.eps = 1e-3
            m.momentum = 0.03
        elif t in [nn.Hardswish, nn.LeakyReLU, nn.ReLU, nn.ReLU6]: #对这几类激活函数 inplace插值为True
            m.inplace = True

find_modules 用于找到模型中不同种类的层及对应索引

def find_modules(model, mclass=nn.Conv2d):
    # 找到model各个层里匹配mclass种类的层 的索引
    return [i for i, m in enumerate(model.module_list) if isinstance(m, mclass)]

sparsity 用于求模型的稀疏程度

def sparsity(model):
    # 返回模型整体0参数占所有参数的比例
    a, b = 0., 0.
    for p in model.parameters():
        a += p.numel()       # a为模型的总参数量
        b += (p == 0).sum()  # b为模型参数为0的数量
    return b / a

prune 对模型的剪枝操作

def prune(model, amount=0.3):
    # 对模型进行剪枝 以增加稀疏性
    import torch.nn.utils.prune as prune
    print('Pruning model... ', end='')
    for name, m in model.named_modules():
        if isinstance(m, nn.Conv2d):
            #此处为非结构化剪枝操作 将计算不重要的参数规为0
            prune.l1_unstructured(m, name='weight', amount=amount) 
            prune.remove(m, 'weight')  # 彻底移除被剪掉的权重
    print(' %.3g global sparsity' % sparsity(model)) #返回模型的稀疏度

fuse_conv_and_bn 一个很有意思的将bn层转换为conv层的融合操作

def fuse_conv_and_bn(conv, bn): # conv代表torch支持的卷积层 bn代表torch支持的卷积层
    # 融合卷积与BN层https://tehnokv.com/posts/fusing-batchnorm-and-conv/
    # 将BN层写为1*1卷积层的形式 能够节省计算资源并简化网络结构
    fusedconv = nn.Conv2d(conv.in_channels,
                          conv.out_channels,
                          kernel_size=conv.kernel_size,
                          stride=conv.stride,
                          padding=conv.padding,
                          groups=conv.groups,
                          bias=True).requires_grad_(False).to(conv.weight.device)

    # 准备滤波器(权重) 并进行替换 
    w_conv = conv.weight.clone().view(conv.out_channels, -1)
    w_bn = torch.diag(bn.weight.div(torch.sqrt(bn.eps + bn.running_var)))
    fusedconv.weight.copy_(torch.mm(w_bn, w_conv).view(fusedconv.weight.size()))

    # 准备空间偏置 并进行替换
    b_conv = torch.zeros(conv.weight.size(0), device=conv.weight.device) if conv.bias is None else conv.bias
    b_bn = bn.bias - bn.weight.mul(bn.running_mean).div(torch.sqrt(bn.running_var + bn.eps))
    fusedconv.bias.copy_(torch.mm(w_bn, b_conv.reshape(-1, 1)).reshape(-1) + b_bn)
    
    # 综合上述两步 该卷积层在数学表达上等价于BN操作
    
    return fusedconv #返回fuseconv相关配置

model_info 用于在日志输出模型的总结信息

def model_info(model, verbose=False, img_size=640): #verbose意为冗长的
    # 模型信息. img_size 可能为 int or list, i.e. img_size=640 or img_size=[640, 320]
    n_p = sum(x.numel() for x in model.parameters())  # 模型总参数量
    n_g = sum(x.numel() for x in model.parameters() if x.requires_grad)  # 需要求梯度的参数量
    if verbose:
        #格式化输出字符串
        print('%5s %40s %9s %12s %20s %10s %10s' % ('layer', 'name', 'gradient', 'parameters', 'shape', 'mu', 'sigma'))
        for i, (name, p) in enumerate(model.named_parameters()):
            name = name.replace('module_list.', '')
            #输出模型参数相关信息
            print('%5g %40s %9s %12g %20s %10.3g %10.3g' %
                  (i, name, p.requires_grad, p.numel(), list(p.shape), p.mean(), p.std()))

    try:  # FLOPS
        from thop import profile
        stride = int(model.stride.max()) if hasattr(model, 'stride') else 32
        img = torch.zeros((1, model.yaml.get('ch', 3), stride, stride), device=next(model.parameters()).device)  # 模拟输入一张图片
        flops = profile(deepcopy(model), inputs=(img,), verbose=False)[0] / 1E9 * 2  # stride GFLOPS
        img_size = img_size if isinstance(img_size, list) else [img_size, img_size]  # expand if int/float
        fs = ', %.1f GFLOPS' % (flops * img_size[0] / stride * img_size[1] / stride)  # 640x640 GFLOPS
    except (ImportError, Exception):
        fs = ''

    logger.info(f"Model Summary: {len(list(model.modules()))} layers, {n_p} parameters, {n_g} gradients{fs}") #日志添加信息

load_classifier 通过改变预训练的backbone 并重置全连接层来构造分类器

def load_classifier(name='resnet101', n=2):
    # 加载torchvision中pretrained模型 reshape为n类输出
    model = torchvision.models.__dict__[name](pretrained=True)

    # ResNet model properties
    # input_size = [3, 224, 224]
    # input_space = 'RGB'
    # input_range = [0, 1]
    # mean = [0.485, 0.456, 0.406]
    # std = [0.229, 0.224, 0.225]

    # reshape的过程是将fc层的权重和偏置清0 并将输出改为类别个数
    filters = model.fc.weight.shape[1]
    model.fc.bias = nn.Parameter(torch.zeros(n), requires_grad=True)
    model.fc.weight = nn.Parameter(torch.zeros(n, filters), requires_grad=True)
    model.fc.out_features = n
    return model #返回reshape之后的模型

scale_img 实现对图片的缩放

def scale_img(img, ratio=1.0, same_shape=False, gs=32):  # img(16,3,256,416)
    # scales img(bs,3,y,x)
    # 对img进行缩放 gs代表最终图片的元素点数目必须被32整除以满足浮点数计算要求
    if ratio == 1.0:
        return img
    else:
        h, w = img.shape[2:] #获得图片的高和宽
        s = (int(h * ratio), int(w * ratio))  # 缩放后的尺寸
        img = F.interpolate(img, size=s, mode='bilinear', align_corners=False)  # 用torch自带的插值函数进行resize
        if not same_shape:  # pad/crop img 如果不保持相同 则将按比例放缩的h和w当做输出图片尺度
            h, w = [math.ceil(x * ratio / gs) * gs for x in (h, w)]
        # 将放缩的部分和要求的图片尺寸部分 不想交的部分 用imagenet均值填充
        # Q如果ratio大于1 且same_shape=False 时 w-s[1]<0 此时是否会报错?
        return F.pad(img, [0, w - s[1], 0, h - s[0]], value=0.447) 

copy_attr 函数为复制实例对象的属性

def copy_attr(a, b, include=(), exclude=()):
    # 复制属从b到a, options to only include [...] and to exclude [...]
    # .__dict__返回一个类的实例的属性和对应取值的字典
    for k, v in b.__dict__.items():
        if (len(include) and k not in include) or k.startswith('_') or k in exclude:
            continue
        else:
            setattr(a, k, v) #将对象a的属性k赋值v

ModelEMA 类为模型的指数加权平均方法 使模型更加鲁棒

class ModelEMA: # 模型的指数加权平均方法 以提高模型在测试数据上的健壮性
    """ Model Exponential Moving Average from https://github.com/rwightman/pytorch-image-models
    在模型state_dict中保留所有内容的移动平均值(参数和缓冲区)。
    Keep a moving average of everything in the model state_dict (parameters and buffers).
    This is intended to allow functionality like
    https://www.tensorflow.org/api_docs/python/tf/train/ExponentialMovingAverage
    权重的平滑版本对于某些训练方案表现好十分必要
    A smoothed version of the weights is necessary for some training schemes to perform well.
    这个类对按照模型初始化顺序进行初始化十分敏感
    This class is sensitive where it is initialized in the sequence of model init,
    GPU分配与分布式训练的包装器。
    GPU assignment and distributed training wrappers.
    """

    def __init__(self, model, decay=0.9999, updates=0):
        # 创建EMA
        self.ema = deepcopy(model.module if is_parallel(model) else model).eval()  # FP32 EMA
        # if next(model.parameters()).device.type != 'cpu':
        #     self.ema.half()  # FP16 EMA
        self.updates = updates  # EMA更新次数
        #self.decay为一个衰减函数 输入变量为x
        self.decay = lambda x: decay * (1 - math.exp(-x / 2000))  # decay exponential ramp (to help early epochs)
        for p in self.ema.parameters(): #参数取消设置梯度
            p.requires_grad_(False)

    def update(self, model):
        # 更新EMA的参数
        with torch.no_grad():
            self.updates += 1
            d = self.decay(self.updates) #随着更新的次数设置衰减

            msd = model.module.state_dict() if is_parallel(model) else model.state_dict()  # msd为模型配置的字典
            for k, v in self.ema.state_dict().items():
                if v.dtype.is_floating_point:
                    v *= d                          # 对浮点数进行衰减
                    # .detach函数 使对应Variables与网络隔开进而不参与梯度更新
                    v += (1. - d) * msd[k].detach() 

    def update_attr(self, model, include=(), exclude=('process_group', 'reducer')):
        # 从model复制相关属性复制到self.ema中
        copy_attr(self.ema, model, include, exclude)

争取日后2-3天一更一个源码导读博客(不鸽.jpg)
限于笔者能力有限,欢迎读者批评指正!

 类似资料: