深度学习数据标注完成后,需要构建dataset并送入dataloader整合好以后送入模型去学习,而数据在进入dataloader后会有一个采样器sampler进行数据的index的筛选,本文主要记录常见的几种sampler:
sampler是torch中的基础函数,只有定义没有实现,是实现其他不同采样器需要继承的基类,该基类只有一个初始化函数、一个迭代器__iter__函数以及一个统计数据长度的__len__函数,在实现其他采样器的时候,必须要实现的是iter函数以及len函数,其中iter不断的返回数据的index,以获取相应的样本。
class Sampler(Generic[T_co]):
def __init__(self, data_source: Optional[Sized]) -> None:
pass
def __iter__(self) -> Iterator[T_co]:
raise NotImplementedError
def __len__(self) -> int:
pass
mmdetection实现很多数据预处理方式,并最终在collate将不同size的图片通过pad统一到相同大小。减少pad面积,可以节约算力,因此商汤实现了自己采样器Groupsampler,将未处理的图片根据ratio(长和宽的比例)分为两组,ratio大于1的分为一组,ratio小于1的分为另一组。要求每次dataloader迭代一次,返回的batch都为同一组!这样相似ratio图片的输入可以有效减少pad面积。
其实现的源码如下:
class GroupSampler(Sampler):
def __init__(self, dataset, samples_per_gpu=1):
assert hasattr(dataset, 'flag')
self.dataset = dataset
self.samples_per_gpu = samples_per_gpu
self.flag = dataset.flag.astype(np.int64)
# flag标志由dataset在初始化时确定,详见customdataset
# flag只有两个取值,根据ratio是否大于1,分为两组
self.group_sizes = np.bincount(self.flag) # 对每组的数量进行计数,详见bincount的使用方法
self.num_samples = 0 # 作为__len__的返回值
for i, size in enumerate(self.group_sizes):
self.num_samples += int(np.ceil(size / self.samples_per_gpu)) * self.samples_per_gpu
# group_size不一定能确保被samples_per_gpu整除,因此需要向上取整
# 比如分组0的数量是100个,分组1的数量是200个,samples_per_gpu为3
# 那么num_samples = 102+201 = 303
def __iter__(self): # 返回迭代器,每次迭代返回一个整数索引
indices = []
for i, size in enumerate(self.group_sizes):
if size == 0:
continue
indice = np.where(self.flag == i)[0] # 获得同组的图片下标
assert len(indice) == size
np.random.shuffle(indice) # 打乱
num_extra = int(np.ceil(size / self.samples_per_gpu)) * self.samples_per_gpu - len(indice)
indice = np.concatenate([indice, np.random.choice(indice, num_extra)])
indices.append(indice)
# 还是以"分组0的数量是100个,分组1的数量是200个,samples_per_gpu为3"举例,num_samples = 102+201 = 303
# 102大于100,201大于200,所以我们需要还额外增加下标
# 最后得到303个下标,其中前102个是分组0,后201个是分组1,确保每samples_per_gpu都是同一ratio
indices = np.concatenate(indices)
indices = [
indices[i * self.samples_per_gpu:(i + 1) * self.samples_per_gpu]
for i in np.random.permutation(range(len(indices) // self.samples_per_gpu))
] # 将indicesan按照samples_per_gpu个数进行排布
indices = np.concatenate(indices)
indices = indices.astype(np.int64).tolist()
assert len(indices) == self.num_samples
return iter(indices)
def __len__(self):
return self.num_samples
在深度学习中,经常会遇见数据集分布不均衡的问题,不均匀就会导致任务学习偏向某一方,eg:猫狗二分类中,数据集总量50个,如果猫的占比为90%,狗的占比为10% ,模型学习后会偏向于预测到猫这一类别。
对于上述出现的样本极度不均衡问题,常见的处理方式会使用重采样方法来解决,当然重采样也有自己的缺陷,对于样本过少的数据,实际上相当于多次重复使用某一类样本,加剧过拟合。
在Imbalanced Dataset Sampler repo中实现了一个基于torch的不均衡采样方案,总体来说有点像weightedsampler
源码如下:
from typing import Callable
import pandas as pd
import torch
import torch.utils.data
import torchvision
class ImbalancedDatasetSampler(torch.utils.data.sampler.Sampler):
"""Samples elements randomly from a given list of indices for imbalanced dataset
Arguments:
indices: a list of indices
num_samples: number of samples to draw
callback_get_label: a callback-like function which takes two arguments - dataset and index
"""
def __init__(
self,
dataset,
labels: list = None,
indices: list = None,
num_samples: int = None,
callback_get_label: Callable = None,
):
# if indices is not provided, all elements in the dataset will be considered
self.indices = list(range(len(dataset))) if indices is None else indices
# define custom callback
self.callback_get_label = callback_get_label
# if num_samples is not provided, draw `len(indices)` samples in each iteration
self.num_samples = len(self.indices) if num_samples is None else num_samples
# distribution of classes in the dataset
df = pd.DataFrame()
df["label"] = self._get_labels(dataset) if labels is None else labels
df.index = self.indices
df = df.sort_index()
label_to_count = df["label"].value_counts()
weights = 1.0 / label_to_count[df["label"]]
self.weights = torch.DoubleTensor(weights.to_list())
def _get_labels(self, dataset):
if self.callback_get_label:
return self.callback_get_label(dataset)
elif isinstance(dataset, torch.utils.data.TensorDataset):
return dataset.tensors[1]
elif isinstance(dataset, torchvision.datasets.MNIST):
return dataset.train_labels.tolist()
elif isinstance(dataset, torchvision.datasets.ImageFolder):
return [x[1] for x in dataset.imgs]
elif isinstance(dataset, torchvision.datasets.DatasetFolder):
return dataset.samples[:][1]
elif isinstance(dataset, torch.utils.data.Subset):
return dataset.dataset.imgs[:][1]
elif isinstance(dataset, torch.utils.data.Dataset):
return dataset.get_labels()
else:
raise NotImplementedError
def __iter__(self):
return (self.indices[i] for i in torch.multinomial(self.weights, self.num_samples, replacement=True))
def __len__(self):
return self.num_samples
原理是对每一个样本的target进行记录,并计算每个类别的target的占比,然后依据输入样本的target,获取该样本的权重信息,并使用torch.multinomial函数进行样本的获取。获取的是样本本身,并不是index.需要注意的是_get_labels函数仅实现了几种常见的获取target的方式,其他的并未实现,按照自己的需要进行更新或者重写。
疑问:这里有一个问题,目标检测中一图多defect的问题,怎么去给每一个样本赋值权重
Weighted Random Sampler实际也是为了处理样本不均衡的问题,对于样本分布不均匀的数据,使用不同类别的数据的占比的倒数作为其weight。
eg:
假设在分类问题中,我们有4类,分别为猫、狗、猪、羊,其比例为[0.1,0.1,0.3,0.5]
则针对猫、狗、猪、羊他们的权重为[1/0.1,1/0.1,1/0.3,1/0.5]=[10,10,3,33,2]
因此如果数据集为【猫、猫、猫、狗、羊、羊、羊、猪、狗、狗】
权重weights就对应为:【10,10,10,10,2,2,2,3,33,10,10】
源码如下:
class WeightedRandomSampler(Sampler[int]):
r"""Samples elements from ``[0,..,len(weights)-1]`` with given probabilities (weights).
Args:
weights (sequence) : a sequence of weights, not necessary summing up to one
num_samples (int): number of samples to draw
replacement (bool): if ``True``, samples are drawn with replacement.
If not, they are drawn without replacement, which means that when a
sample index is drawn for a row, it cannot be drawn again for that row.
generator (Generator): Generator used in sampling.
Example:
>>> list(WeightedRandomSampler([0.1, 0.9, 0.4, 0.7, 3.0, 0.6], 5, replacement=True))
[4, 4, 1, 4, 5]
>>> list(WeightedRandomSampler([0.9, 0.4, 0.05, 0.2, 0.3, 0.1], 5, replacement=False))
[0, 1, 4, 3, 2]
"""
weights: Tensor
num_samples: int
replacement: bool
def __init__(self, weights: Sequence[float], num_samples: int,
replacement: bool = True, generator=None) -> None:
if not isinstance(num_samples, _int_classes) or isinstance(num_samples, bool) or \
num_samples <= 0:
raise ValueError("num_samples should be a positive integer "
"value, but got num_samples={}".format(num_samples))
if not isinstance(replacement, bool):
raise ValueError("replacement should be a boolean value, but got "
"replacement={}".format(replacement))
self.weights = torch.as_tensor(weights, dtype=torch.double)
self.num_samples = num_samples
self.replacement = replacement
self.generator = generator
def __iter__(self):
rand_tensor = torch.multinomial(self.weights, self.num_samples, self.replacement, generator=self.generator)
return iter(rand_tensor.tolist())
def __len__(self):
return self.num_samples
随机采样方式,对于所有的样本,区分是否是需要放回的方式进行采样,主要区别在replacement是否为True,主要用的函数为torch.randint()/torch.randperm()
class RandomSampler(Sampler[int]):
r"""Samples elements randomly. If without replacement, then sample from a shuffled dataset.
If with replacement, then user can specify :attr:`num_samples` to draw.
Args:
data_source (Dataset): dataset to sample from
replacement (bool): samples are drawn on-demand with replacement if ``True``, default=``False``
num_samples (int): number of samples to draw, default=`len(dataset)`. This argument
is supposed to be specified only when `replacement` is ``True``.
generator (Generator): Generator used in sampling.
"""
data_source: Sized
replacement: bool
def __init__(self, data_source: Sized, replacement: bool = False,
num_samples: Optional[int] = None, generator=None) -> None:
self.data_source = data_source
self.replacement = replacement
self._num_samples = num_samples
self.generator = generator
if not isinstance(self.replacement, bool):
raise TypeError("replacement should be a boolean value, but got "
"replacement={}".format(self.replacement))
if self._num_samples is not None and not replacement:
raise ValueError("With replacement=False, num_samples should not be specified, "
"since a random permute will be performed.")
if not isinstance(self.num_samples, int) or self.num_samples <= 0:
raise ValueError("num_samples should be a positive integer "
"value, but got num_samples={}".format(self.num_samples))
@property
def num_samples(self) -> int:
# dataset size might change at runtime
if self._num_samples is None:
return len(self.data_source)
return self._num_samples
def __iter__(self):
n = len(self.data_source)
if self.generator is None:
generator = torch.Generator()
generator.manual_seed(int(torch.empty((), dtype=torch.int64).random_().item()))
else:
generator = self.generator
if self.replacement:
for _ in range(self.num_samples // 32):
yield from torch.randint(high=n, size=(32,), dtype=torch.int64, generator=generator).tolist()
yield from torch.randint(high=n, size=(self.num_samples % 32,), dtype=torch.int64, generator=generator).tolist()
else:
yield from torch.randperm(n, generator=self.generator).tolist()
def __len__(self):
return self.num_samples
顺序采样器,顾名思义就是按照数据的排列顺序进行采样,返回样本对应的index
源码如下:
class SequentialSampler(Sampler[int]):
r"""Samples elements sequentially, always in the same order.
Args:
data_source (Dataset): dataset to sample from
"""
data_source: Sized
def __init__(self, data_source):
self.data_source = data_source
def __iter__(self):
return iter(range(len(self.data_source))) # 安装顺序返回样本的index
def __len__(self) -> int:
return len(self.data_source)
可以看到__iter__()返回的并不是随机数序列,而是通过随机数序列作为indices的索引,进而返回打乱的数据本身。需要注意的仍然是采样是不重复的,也是通过randperm()函数实现的。按照网上可以搜集到的资料,Subset Random Sampler应该用于训练集、测试集和验证集的划分,下面将data划分为train和val两个部分,再次指出__iter__()返回的的不是索引,而是索引对应的数据:
class SubsetRandomSampler(Sampler):
r"""Samples elements randomly from a given list of indices, without replacement.
Arguments:
indices (sequence): a sequence of indices
"""
def __init__(self, indices):
# 数据集的切片,比如划分训练集和测试集
self.indices = indices
def __iter__(self):
# 以元组形式返回不重复打乱后的“数据”
return (self.indices[i] for i in torch.randperm(len(self.indices)))
def __len__(self):
return len(self.indices)