执行synthesis.py文件
# 以下是所有的超参数
# Audio
num_mels = 80
# num_freq = 1024
n_fft = 2048
sr = 22050
# frame_length_ms = 50.
# frame_shift_ms = 12.5
preemphasis = 0.97
frame_shift = 0.0125 # seconds
frame_length = 0.05 # seconds
hop_length = int(sr * frame_shift) # samples.
win_length = int(sr * frame_length) # samples.
n_mels = 80 # Number of Mel banks to generate
power = 1.2 # Exponent for amplifying the predicted magnitude
min_level_db = -100
ref_level_db = 20
hidden_size = 256
embedding_size = 512
max_db = 100
ref_db = 20
n_iter = 60
# power = 1.5
outputs_per_step = 1
epochs = 10000
lr = 0.001
save_step = 2000
image_step = 500
batch_size = 32
cleaners = 'english_cleaners'
data_path = './data/LJSpeech-1.1/LJSpeech-1.1'
checkpoint_path = './checkpoint'
sample_path = './samples'
定义一个数据类型,从LJSpeech数据集中提取训练所需的文本、语音的mel谱图和mag特征/显性图谱。该文件主要是从.wav文件中提取出所需的特征数据并进行保存,方便后续使用时调用
import numpy as np
import pandas as pd
from torch.utils.data import Dataset, DalaLoader
import os
from utils import get_spectrograms
import hyperparams as hp
import librosa
# 主要用于从wave文件中提取特征数据
class PrepareDataset(Dataset):
"""LJSpeech dataset."""
def __init__(self, csv_file, root_dir):
"""
Args:
csv_file (string): Path to the csv file with annotations. 文本数据的路径
root_dir (string): Directory with all the wavs. 音频数据的路径
"""
# 文本数据中每一行为一个数据,前面为对应音频的文件名,后面为具体的文本内容,与音频中的内容一致
self.landmarks_frame = pd.read_csv(csv_file, sep='|', header=None) # 该csv文件以'|'划分
self.root_dir = root_dir
def load_wav(self, filename):
return librosa.load(filename, sr=hp.sample_rate)
def __len__(self):
return len(self.landmarks_frame) # 返回数据量
def __getitem__(self, idx): # 通过索引获取对应的文本和音频的mel图谱
wav_name = os.path.join(self.root_dir, self.landmarks_frame.iloc[idx, 0]) + '.wav'
mel, mag = get_spectrograms(wav_name) # 提取音频的mel和mag
# 将从wav音频文件中提取的mel和mag保存至data中,方便后续训练使用;每个mel图谱的尺寸是[n, 80],每个mag图谱的尺寸是[n, 1025],因为不同音频的长度是不一样的,故n的大小是不一致的
np.save(wav_name[:-4] + '.pt', mel)
np.save(wav_name[:-4] + '.mag', mag)
sample = {'mel': mel, 'mag': mag}
return sample
上述代码中调用了函数get_spectrograms()从’.wav’的语音文件抽取对应的mel和mag(幅度谱图)。代码如下,此代码可认为是抽取语音文件特征的标准代码,在不同场景使用时整个步骤流程基本一致,改变的可能只是部分参数
def get_spectrograms(fpath):
'''解析音频文件,从音频文件中提取mel和mag
Parse the wave file in `fpath` and
Returns normalized melspectrogram and linear spectrogram.
Args:
fpath: A string. The full path of a sound file.
Returns:
mel: A 2d array of shape (T, n_mels) and dtype of float32.
mag: A 2d array of shape (T, 1+n_fft/2) and dtype of float32.
'''
# Loading sound file
y, sr = librosa.load(fpath, sr=hp.sr)
# Trimming
y, _ = librosa.effects.trim(y)
# Preemphasis
y = np.append(y[0], y[1:] - hp.preemphasis * y[:-1])
# stft,短时傅里叶变换
linear = librosa.stft(y=y,
n_fft=hp.n_fft,
hop_length=hp.hop_length,
win_length=hp.win_length)
# magnitude spectrogram,幅度谱图
mag = np.abs(linear) # (1+n_fft//2, T)
# mel spectrogram
mel_basis = librosa.filters.mel(hp.sr, hp.n_fft, hp.n_mels) # (n_mels, 1+n_fft//2)
mel = np.dot(mel_basis, mag) # (n_mels, t)
# to decibel
mel = 20 * np.log10(np.maximum(1e-5, mel))
mag = 20 * np.log10(np.maximum(1e-5, mag))
# normalize
mel = np.clip((mel - hp.ref_db + hp.max_db) / hp.max_db, 1e-8, 1)
mag = np.clip((mag - hp.ref_db + hp.max_db) / hp.max_db, 1e-8, 1)
# Transpose
mel = mel.T.astype(np.float32) # (T, n_mels)
mag = mag.T.astype(np.float32) # (T, 1+n_fft//2)
return mel, mag # 分别是音频的梅尔谱图和幅度谱图
其中主要为模型训练数据的加载与相关预处理代码
import hyperparams as hp
import pandas as pd
from torch.utils.data import Dataset, DataLoader
import os
import librosa
import numpy as np
from text import text_to_sequence
import collections
from scipy import signal
import torch as t
import math
# 创建模型训练所使用的数据集
class LJDatasets(Dataset):
"""LJSpeech dataset."""
def __init__(self, csv_file, root_dir):
"""
Args:
csv_file (string): Path to the csv file with annotations.
root_dir (string): Directory with all the wavs.
"""
self.landmarks_frame = pd.read_csv(csv_file, sep='|', header=None)
self.root_dir = root_dir
def load_wav(self, filename):
return librosa.load(filename, sr=hp.sample_rate)
def __len__(self):
return len(self.landmarks_frame)
def __getitem__(self, idx): # 通过下标索引获取数据
wav_name = os.path.join(self.root_dir, self.landmarks_frame.iloc[idx, 0]) + '.wav' # idx对应的音频文件路径
text = self.landmarks_frame.iloc[idx, 1] # idx对应的文本内容
text = np.asarray(text_to_sequence(text, [hp.cleaners]), dtype=np.int32) # 将英文文本转为序列,相当于字符级别的分词,在最后都会加上一个1
mel = np.load(wav_name[:-4] + '.pt.npy') # 加载梅尔谱图
# 将[[0] * 80]与mel中的前n-1行在垂直方向concat,即去掉mel的最后一行,并且在最前面添加全为0的一行,作为输入
mel_input = np.concatenate([np.zeros([1, hp.num_mels], np.float32), mel[:-1, :]], axis=0)
text_length = len(text) # 序列长度
pos_text = np.arange(1, text_length + 1) # 位置编码???
pos_mel = np.arange(1, mel.shape[0] + 1)
sample = {'text': text, 'mel': mel, 'text_length': text_length, 'mel_input': mel_input, 'pos_mel': pos_mel,
'pos_text': pos_text}
return sample
# 用于后续加载mel图谱和mag谱图数据
class PostDatasets(Dataset):
"""LJSpeech dataset."""
def __init__(self, csv_file, root_dir):
"""
Args:
csv_file (string): Path to the csv file with annotations.
root_dir (string): Directory with all the wavs.
"""
self.landmarks_frame = pd.read_csv(csv_file, sep='|', header=None)
self.root_dir = root_dir
def __len__(self):
return len(self.landmarks_frame)
def __getitem__(self, idx):
wav_name = os.path.join(self.root_dir, self.landmarks_frame.iloc[idx, 0]) + '.wav'
mel = np.load(wav_name[:-4] + '.pt.npy') # 加载mel谱图
mag = np.load(wav_name[:-4] + '.mag.npy') # 加载幅度谱图
sample = {'mel': mel, 'mag': mag}
return sample
# 用于对LJDatasets类构建的数据进行batch中的转换处理
def collate_fn_transformer(batch):
# Puts each data field into a tensor with outer dimension batch size
if isinstance(batch[0], collections.Mapping):
text = [d['text'] for d in batch] # batch中所有的文本数据
mel = [d['mel'] for d in batch] # batch中所有的mel数据
mel_input = [d['mel_input'] for d in batch] # batch中所有的mel_input
text_length = [d['text_length'] for d in batch] # batch中所有的test_length
pos_mel = [d['pos_mel'] for d in batch] # batch中所有的pos_mel
pos_text = [d['pos_text'] for d in batch] # batch中所有的pos_text
# 将每个text与其对应的长度text_length匹配,以长度为标准对text进行降序排序,最后的列表中只取text
text = [i for i, _ in sorted(zip(text, text_length), key=lambda x: x[1], reverse=True)]
# 将每个melt与其对应的长度text_length匹配,以长度为标准对mel进行降序排序,最后的列表中只取mel
mel = [i for i, _ in sorted(zip(mel, text_length), key=lambda x: x[1], reverse=True)]
# 下面几项也是如此,就是以text_length的大小进行降序排序
mel_input = [i for i, _ in sorted(zip(mel_input, text_length), key=lambda x: x[1], reverse=True)]
pos_text = [i for i, _ in sorted(zip(pos_text, text_length), key=lambda x: x[1], reverse=True)]
pos_mel = [i for i, _ in sorted(zip(pos_mel, text_length), key=lambda x: x[1], reverse=True)]
text_length = sorted(text_length, reverse=True)
# PAD sequences with largest length of the batch
text = _prepare_data(text).astype(np.int32) # 用0将text中的每个文本序列都pad到最长的文本序列的长度
mel = _pad_mel(mel) # 对mel进行pad
mel_input = _pad_mel(mel_input) # 对mel_input进行pad
pos_mel = _prepare_data(pos_mel).astype(np.int32) # 用0将pos_mel中的每个序列都pad到最长的序列的长度
pos_text = _prepare_data(pos_text).astype(np.int32) # 用0将pos_text中的每个序列都pad到最长的序列的长度
return t.LongTensor(text), t.FloatTensor(mel), t.FloatTensor(mel_input), t.LongTensor(pos_text), t.LongTensor(
pos_mel), t.LongTensor(text_length)
raise TypeError(("batch must contain tensors, numbers, dicts or lists; found {}"
.format(type(batch[0]))))
# 用于对PostDatasets类构建的数据进行batch中的转换处理
def collate_fn_postnet(batch):
# Puts each data field into a tensor with outer dimension batch size
if isinstance(batch[0], collections.Mapping):
mel = [d['mel'] for d in batch]
mag = [d['mag'] for d in batch]
# PAD sequences with largest length of the batch
mel = _pad_mel(mel)
mag = _pad_mel(mag)
return t.FloatTensor(mel), t.FloatTensor(mag)
raise TypeError(("batch must contain tensors, numbers, dicts or lists; found {}"
.format(type(batch[0]))))
def _pad_data(x, length): # 使用0对输出的x进行pad到指定长度length
_pad = 0
return np.pad(x, (0, length - x.shape[0]), mode='constant', constant_values=_pad)
def _prepare_data(inputs): # 将inputs中所有的序列用0pad到其中最长序列的长度
max_len = max((len(x) for x in inputs))
return np.stack([_pad_data(x, max_len) for x in inputs])
def _pad_per_step(inputs):
timesteps = inputs.shape[-1]
return np.pad(inputs, [[0, 0], [0, 0], [0, hp.outputs_per_step - (timesteps % hp.outputs_per_step)]],
mode='constant', constant_values=0.0)
def _pad_mel(inputs): # 将一个batch中所有的mel用0pad到其中最大长度的大小
_pad = 0
def _pad_one(x, max_len):
mel_len = x.shape[0]
return np.pad(x, [[0, max_len - mel_len], [0, 0]], mode='constant', constant_values=_pad)
max_len = max((x.shape[0] for x in inputs))
return np.stack([_pad_one(x, max_len) for x in inputs])
# 计算模型的参数大小
def get_param_size(model):
params = 0
for p in model.parameters():
tmp = 1
for x in p.size():
tmp *= x
params += tmp
return params
def get_dataset():
return LJDatasets(os.path.join(hp.data_path, 'metadata.csv'), os.path.join(hp.data_path, 'wavs'))
def get_post_dataset():
return PostDatasets(os.path.join(hp.data_path, 'metadata.csv'), os.path.join(hp.data_path, 'wavs'))
该文件中包含模型搭建的所有的模块
# 包含所有的模型方法
import torch.nn as nn
import torch as t
import torch.nn.functional as F
import math
import hyperparams as hp
from text.symbols import symbols
import numpy as np
import copy
from collections import OrderedDict
def clones(module, N):
"""对传入的module深度复制n份,并放在一个modulelist中"""
return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])
class Linear(nn.Module):
"""
Linear Module,定义线性全连接层,使用xavier_uniform_进行初始化
"""
def __init__(self, in_dim, out_dim, bias=True, w_init='linear'):
"""
:param in_dim: dimension of input
:param out_dim: dimension of output
:param bias: boolean. if True, bias is included.
:param w_init: str. weight inits with xavier initialization.
"""
super(Linear, self).__init__()
self.linear_layer = nn.Linear(in_dim, out_dim, bias=bias)
nn.init.xavier_uniform_(
self.linear_layer.weight,
gain=nn.init.calculate_gain(w_init))
def forward(self, x):
return self.linear_layer(x)
class Conv(nn.Module):
"""
Convolution Module,定义一维CNN,使用xavier_uniform_进行初始化
"""
def __init__(self, in_channels, out_channels, kernel_size=1, stride=1,
padding=0, dilation=1, bias=True, w_init='linear'):
"""
:param in_channels: dimension of input
:param out_channels: dimension of output
:param kernel_size: size of kernel
:param stride: size of stride
:param padding: size of padding
:param dilation: dilation rate
:param bias: boolean. if True, bias is included.
:param w_init: str. weight inits with xavier initialization.
"""
super(Conv, self).__init__()
self.conv = nn.Conv1d(in_channels, out_channels,
kernel_size=kernel_size, stride=stride,
padding=padding, dilation=dilation,
bias=bias)
nn.init.xavier_uniform_(
self.conv.weight, gain=nn.init.calculate_gain(w_init))
def forward(self, x):
x = self.conv(x)
return x
class EncoderPrenet(nn.Module):
"""
Pre-network for Encoder consists of convolution networks.编码器侧的预处理网络
"""
def __init__(self, embedding_size, num_hidden):
super(EncoderPrenet, self).__init__()
self.embedding_size = embedding_size
self.embed = nn.Embedding(len(symbols), embedding_size, padding_idx=0)
self.conv1 = Conv(in_channels=embedding_size,
out_channels=num_hidden,
kernel_size=5,
padding=int(np.floor(5 / 2)),
w_init='relu')
self.conv2 = Conv(in_channels=num_hidden,
out_channels=num_hidden,
kernel_size=5,
padding=int(np.floor(5 / 2)),
w_init='relu')
self.conv3 = Conv(in_channels=num_hidden,
out_channels=num_hidden,
kernel_size=5,
padding=int(np.floor(5 / 2)),
w_init='relu')
# 一维batchnorm
self.batch_norm1 = nn.BatchNorm1d(num_hidden)
self.batch_norm2 = nn.BatchNorm1d(num_hidden)
self.batch_norm3 = nn.BatchNorm1d(num_hidden)
self.dropout1 = nn.Dropout(p=0.2)
self.dropout2 = nn.Dropout(p=0.2)
self.dropout3 = nn.Dropout(p=0.2)
self.projection = Linear(num_hidden, num_hidden)
def forward(self, input_):
input_ = self.embed(input_)
input_ = input_.transpose(1, 2)
input_ = self.dropout1(t.relu(self.batch_norm1(self.conv1(input_))))
input_ = self.dropout2(t.relu(self.batch_norm2(self.conv2(input_))))
input_ = self.dropout3(t.relu(self.batch_norm3(self.conv3(input_))))
input_ = input_.transpose(1, 2)
input_ = self.projection(input_)
return input_
class FFN(nn.Module):
"""
Positionwise Feed-Forward Network,transformer中的FFN,包括了residual连接和layernorm
"""
def __init__(self, num_hidden):
"""
:param num_hidden: dimension of hidden
"""
super(FFN, self).__init__()
self.w_1 = Conv(num_hidden, num_hidden * 4, kernel_size=1, w_init='relu')
self.w_2 = Conv(num_hidden * 4, num_hidden, kernel_size=1)
self.dropout = nn.Dropout(p=0.1)
self.layer_norm = nn.LayerNorm(num_hidden)
def forward(self, input_):
# FFN Network
x = input_.transpose(1, 2)
x = self.w_2(t.relu(self.w_1(x)))
x = x.transpose(1, 2)
# residual connection
x = x + input_
# dropout
# x = self.dropout(x)
# layer normalization
x = self.layer_norm(x)
return x
class PostConvNet(nn.Module):
"""
Post Convolutional Network (mel --> mel),解码器测的后处理网络
"""
def __init__(self, num_hidden):
"""
:param num_hidden: dimension of hidden
"""
super(PostConvNet, self).__init__()
self.conv1 = Conv(in_channels=hp.num_mels * hp.outputs_per_step,
out_channels=num_hidden,
kernel_size=5,
padding=4,
w_init='tanh')
self.conv_list = clones(Conv(in_channels=num_hidden,
out_channels=num_hidden,
kernel_size=5,
padding=4,
w_init='tanh'), 3)
self.conv2 = Conv(in_channels=num_hidden,
out_channels=hp.num_mels * hp.outputs_per_step,
kernel_size=5,
padding=4)
self.batch_norm_list = clones(nn.BatchNorm1d(num_hidden), 3)
self.pre_batchnorm = nn.BatchNorm1d(num_hidden)
self.dropout1 = nn.Dropout(p=0.1)
self.dropout_list = nn.ModuleList([nn.Dropout(p=0.1) for _ in range(3)])
def forward(self, input_, mask=None):
# Causal Convolution (for auto-regressive)
# 因为在构建conv时,kernel_size为5,但padding为4,输出的维度是比num_hidden多4,故都不取最后的四条数据,但是为什么?
input_ = self.dropout1(t.tanh(self.pre_batchnorm(self.conv1(input_)[:, :, :-4])))
for batch_norm, conv, dropout in zip(self.batch_norm_list, self.conv_list, self.dropout_list):
input_ = dropout(t.tanh(batch_norm(conv(input_)[:, :, :-4])))
input_ = self.conv2(input_)[:, :, :-4]
return input_
class MultiheadAttention(nn.Module):
"""
Multihead attention mechanism (dot attention),多头自注意力层
"""
def __init__(self, num_hidden_k):
"""
:param num_hidden_k: dimension of hidden
"""
super(MultiheadAttention, self).__init__()
self.num_hidden_k = num_hidden_k
self.attn_dropout = nn.Dropout(p=0.1)
def forward(self, key, value, query, mask=None, query_mask=None):
# Get attention score
attn = t.bmm(query, key.transpose(1, 2))
attn = attn / math.sqrt(self.num_hidden_k)
# Masking to ignore padding (key side)
if mask is not None:
attn = attn.masked_fill(mask, -2 ** 32 + 1) # 将attn中与mask为1对用的位置的值用-2 ** 32 + 1填充
attn = t.softmax(attn, dim=-1)
else:
attn = t.softmax(attn, dim=-1)
# Masking to ignore padding (query side)
if query_mask is not None:
attn = attn * query_mask # 将attn与query_mask中的值按位相乘
# Dropout
# attn = self.attn_dropout(attn)
# Get Context Vector
result = t.bmm(attn, value)
return result, attn
class Attention(nn.Module):
"""
Attention Network,注意力层
"""
def __init__(self, num_hidden, h=4):
"""
:param num_hidden: dimension of hidden
:param h: num of heads
"""
super(Attention, self).__init__()
self.num_hidden = num_hidden
self.num_hidden_per_attn = num_hidden // h # 每个自注意力头的维度数
self.h = h # 头数
self.key = Linear(num_hidden, num_hidden, bias=False)
self.value = Linear(num_hidden, num_hidden, bias=False)
self.query = Linear(num_hidden, num_hidden, bias=False)
self.multihead = MultiheadAttention(self.num_hidden_per_attn)
self.residual_dropout = nn.Dropout(p=0.1)
self.final_linear = Linear(num_hidden * 2, num_hidden)
self.layer_norm_1 = nn.LayerNorm(num_hidden)
def forward(self, memory, decoder_input, mask=None, query_mask=None):
'''
:param memory:相当于key和value,[bsz,memory_len,num_hidden]
:param decoder_input:相当于query,[bsz,decoder_input_len,num_hidden]
:param mask:[bsz,decoder_input_len,memory_len]
:param query_mask,对问句序列的掩码,即query的掩码,[bsz,decoder_input_len]
'''
batch_size = memory.size(0)
seq_k = memory.size(1)
seq_q = decoder_input.size(1)
# Repeat masks h times
if query_mask is not None:
# [bsz,seq_q]->[bsz,seq_q,1]->[bsz,seq_q,seq_k],padding部分为0,非padding部分为1
query_mask = query_mask.unsqueeze(-1).repeat(1, 1, seq_k)
# [bsz,seq_q,seq_k]->[h*bsz,seq_q,seq_k]
query_mask = query_mask.repeat(self.h, 1, 1)
if mask is not None:
# [bsz,seq_p,seq_k]->[h*bsz,seq_p,seq_k]
mask = mask.repeat(self.h, 1, 1)
# Make multihead,初始化key、value和query,[bsz,seq_k,h,num_hidden_per_attn]
key = self.key(memory).view(batch_size, seq_k, self.h, self.num_hidden_per_attn)
value = self.value(memory).view(batch_size, seq_k, self.h, self.num_hidden_per_attn)
query = self.query(decoder_input).view(batch_size, seq_q, self.h, self.num_hidden_per_attn)
# 先把头这个维度放在最前面,然后再把头的维度和batch_size维度两个维度拉直
# [bsz,seq_k,h,num_hidden_per_attn]->[h,bsz,seq_k,num_hidden_per_attn]->[h*bsz,seq_k,num_hidden_per_attn]
key = key.permute(2, 0, 1, 3).contiguous().view(-1, seq_k, self.num_hidden_per_attn)
# [bsz,seq_k,h,num_hidden_per_attn]->[h,bsz,seq_k,num_hidden_per_attn]->[h*bsz,seq_k,num_hidden_per_attn]
value = value.permute(2, 0, 1, 3).contiguous().view(-1, seq_k, self.num_hidden_per_attn)
# [bsz,seq_q,h,num_hidden_per_attn]->[h,bsz,seq_q,num_hidden_per_attn]->[h*bsz,seq_q,num_hidden_per_attn]
query = query.permute(2, 0, 1, 3).contiguous().view(-1, seq_q, self.num_hidden_per_attn)
# Get context vector,result的维度是[h*bsz,seq_q,num_hidden_per_attn],大小没有变化;attns的维度是[h*bsz,seq_q,seq_k]
result, attns = self.multihead(key, value, query, mask=mask, query_mask=query_mask)
# Concatenate all multihead context vector
result = result.view(self.h, batch_size, seq_q, self.num_hidden_per_attn) # [h,bsz,seq_q,num_hidden_per_attn]
# result的维度是[bsz,seq_q,num_hidden],与decoder_input的大小一致,没有变化
result = result.permute(1, 2, 0, 3).contiguous().view(batch_size, seq_q, -1)
# Concatenate context vector with input (most important)
result = t.cat([decoder_input, result], dim=-1) # [bsz,seq_k,2*num_hidden]
# Final linear
result = self.final_linear(result) # [bsz,seq_k,num_hidden]
# Residual dropout & connection
result = result + decoder_input
# result = self.residual_dropout(result)
# Layer normalization
result = self.layer_norm_1(result)
return result, attns
class Prenet(nn.Module):
"""
Prenet before passing through the network,解码器侧预处理网络
"""
def __init__(self, input_size, hidden_size, output_size, p=0.5):
"""
:param input_size: dimension of input
:param hidden_size: dimension of hidden unit
:param output_size: dimension of output
"""
super(Prenet, self).__init__()
self.input_size = input_size
self.output_size = output_size
self.hidden_size = hidden_size
self.layer = nn.Sequential(OrderedDict([
('fc1', Linear(self.input_size, self.hidden_size)),
('relu1', nn.ReLU()),
('dropout1', nn.Dropout(p)),
('fc2', Linear(self.hidden_size, self.output_size)),
('relu2', nn.ReLU()),
('dropout2', nn.Dropout(p)),
]))
def forward(self, input_):
out = self.layer(input_)
return out
class CBHG(nn.Module):
"""
CBHG Module,标准的CBHG模块,将mel->linear
"""
def __init__(self, hidden_size, K=16, projection_size=256, num_gru_layers=2, max_pool_kernel_size=2, is_post=False):
"""
:param hidden_size: dimension of hidden unit
:param K: # of convolution banks
:param projection_size: dimension of projection unit
:param num_gru_layers: # of layers of GRUcell
:param max_pool_kernel_size: max pooling kernel size
:param is_post: whether post processing or not
"""
super(CBHG, self).__init__()
self.hidden_size = hidden_size
self.projection_size = projection_size
self.convbank_list = nn.ModuleList() # 存放K个一维卷积
self.convbank_list.append(nn.Conv1d(in_channels=projection_size,
out_channels=hidden_size,
kernel_size=1,
padding=int(np.floor(1 / 2))))
for i in range(2, K + 1):
self.convbank_list.append(nn.Conv1d(in_channels=hidden_size,
out_channels=hidden_size,
kernel_size=i,
padding=int(np.floor(i / 2))))
self.batchnorm_list = nn.ModuleList() # 存放K个batchnorm
for i in range(1, K + 1):
self.batchnorm_list.append(nn.BatchNorm1d(hidden_size))
convbank_outdim = hidden_size * K
self.conv_projection_1 = nn.Conv1d(in_channels=convbank_outdim,
out_channels=hidden_size,
kernel_size=3,
padding=int(np.floor(3 / 2)))
self.conv_projection_2 = nn.Conv1d(in_channels=hidden_size,
out_channels=projection_size,
kernel_size=3,
padding=int(np.floor(3 / 2)))
self.batchnorm_proj_1 = nn.BatchNorm1d(hidden_size)
self.batchnorm_proj_2 = nn.BatchNorm1d(projection_size)
self.max_pool = nn.MaxPool1d(max_pool_kernel_size, stride=1, padding=1)
self.highway = Highwaynet(self.projection_size)
self.gru = nn.GRU(self.projection_size, self.hidden_size // 2, num_layers=num_gru_layers,
batch_first=True, bidirectional=True) # 双向GRU
def _conv_fit_dim(self, x, kernel_size=3):
if kernel_size % 2 == 0:
return x[:, :, :-1]
else:
return x
def forward(self, input_):
input_ = input_.contiguous()
batch_size = input_.size(0)
total_length = input_.size(-1)
convbank_list = list()
convbank_input = input_
# Convolution bank filters
for k, (conv, batchnorm) in enumerate(zip(self.convbank_list, self.batchnorm_list)):
convbank_input = t.relu(batchnorm(self._conv_fit_dim(conv(convbank_input), k + 1).contiguous()))
convbank_list.append(convbank_input)
# Concatenate all features
conv_cat = t.cat(convbank_list, dim=1)
# Max pooling
conv_cat = self.max_pool(conv_cat)[:, :, :-1]
# Projection
conv_projection = t.relu(self.batchnorm_proj_1(self._conv_fit_dim(self.conv_projection_1(conv_cat))))
conv_projection = self.batchnorm_proj_2(self._conv_fit_dim(self.conv_projection_2(conv_projection))) + input_
# Highway networks
highway = self.highway.forward(conv_projection.transpose(1, 2))
# Bidirectional GRU
self.gru.flatten_parameters()
out, _ = self.gru(highway)
return out
class Highwaynet(nn.Module):
"""
Highway network
"""
def __init__(self, num_units, num_layers=4):
"""
:param num_units: dimension of hidden unit
:param num_layers: # of highway layers
"""
super(Highwaynet, self).__init__()
self.num_units = num_units
self.num_layers = num_layers
self.gates = nn.ModuleList()
self.linears = nn.ModuleList()
for _ in range(self.num_layers):
self.linears.append(Linear(num_units, num_units))
self.gates.append(Linear(num_units, num_units))
def forward(self, input_):
out = input_
# highway gated function
for fc1, fc2 in zip(self.linears, self.gates):
h = t.relu(fc1.forward(out))
t_ = t.sigmoid(fc2.forward(out))
c = 1. - t_
out = h * t_ + out * c
return out
该文件中实现完成模型的构建
from module import *
from utils import get_positional_table, get_sinusoid_encoding_table
import hyperparams as hp
import copy
class Encoder(nn.Module):
"""
Encoder Network,encoder模块
"""
def __init__(self, embedding_size, num_hidden):
"""
:param embedding_size: dimension of embedding
:param num_hidden: dimension of hidden
"""
super(Encoder, self).__init__()
self.alpha = nn.Parameter(t.ones(1))
# 使用正弦函数进行位置编码,并将其冻结
self.pos_emb = nn.Embedding.from_pretrained(get_sinusoid_encoding_table(1024, num_hidden,
padding_idx=0), freeze=True)
self.pos_dropout = nn.Dropout(p=0.1)
self.encoder_prenet = EncoderPrenet(embedding_size, num_hidden) # 实例化编码器侧预处理模块
self.layers = clones(Attention(num_hidden), 3) # 3表示使用三层transformer block
self.ffns = clones(FFN(num_hidden), 3)
def forward(self, x, pos):
'''
:param x:可视为分词后的文本序列,[bsz,max_text_len]
:param pos:文本序列对应的位置,[bsz,max_text_len]
'''
# Get character mask
if self.training:
c_mask = pos.ne(0).type(t.float) # [bsz,max_text_len],padding部分为0,非padding部分为1
# [bsz,max_text_len,max_text_len],padding部分为1,非padding部分为0;因为module中使用mask_filled方法,是将1对应的位置值进行填充
# 因为encoder计算的自注意力中key、value和query都是相等的,所以mask的后面两个维度大小是相等的
mask = pos.eq(0).unsqueeze(1).repeat(1, x.size(1), 1)
else:
c_mask, mask = None, None
# Encoder pre-network
x = self.encoder_prenet(x) # 预处理
# Get positional embedding, apply alpha and add
pos = self.pos_emb(pos)
x = pos * self.alpha + x # 设置位置编码
# Positional dropout
x = self.pos_dropout(x)
# Attention encoder-encoder
attns = list() # 存放encoder中每一层自注意力计算的输出结果
for layer, ffn in zip(self.layers, self.ffns):
x, attn = layer(x, x, mask=mask, query_mask=c_mask)
x = ffn(x)
attns.append(attn)
return x, c_mask, attns # 此处x为整个encoder最后的输出
class MelDecoder(nn.Module):
"""
Decoder Network,decoder模块
"""
def __init__(self, num_hidden):
"""
:param num_hidden: dimension of hidden
"""
super(MelDecoder, self).__init__()
self.pos_emb = nn.Embedding.from_pretrained(get_sinusoid_encoding_table(1024, num_hidden,
padding_idx=0), freeze=True)
self.pos_dropout = nn.Dropout(p=0.1)
self.alpha = nn.Parameter(t.ones(1))
self.decoder_prenet = Prenet(hp.num_mels, num_hidden * 2, num_hidden, p=0.2)
self.norm = Linear(num_hidden, num_hidden)
self.selfattn_layers = clones(Attention(num_hidden), 3)
self.dotattn_layers = clones(Attention(num_hidden), 3)
self.ffns = clones(FFN(num_hidden), 3)
self.mel_linear = Linear(num_hidden, hp.num_mels * hp.outputs_per_step)
self.stop_linear = Linear(num_hidden, 1, w_init='sigmoid')
self.postconvnet = PostConvNet(num_hidden)
def forward(self, memory, decoder_input, c_mask, pos):
'''
:param memory:encoder的输出,[bsz,max_text_len,num_hidden]
:param decoder_input:decoder的输入,[bsz,max_T,num_mels]
:param: c_mask:encoder输出对应的mask,[bsz,max_text_len],对应memory
:param: pos:[bsz,max_T],对应decoder_input
'''
batch_size = memory.size(0)
decoder_len = decoder_input.size(1)
# get decoder mask with triangular matrix
if self.training:
m_mask = pos.ne(0).type(t.float) # [bsz,max_T],padding部分为0,非padding部分为1
mask = m_mask.eq(0).unsqueeze(1).repeat(1, decoder_len, 1) # [bsz,max_T,max_T],padding部分为1,非padding部分为0
if next(self.parameters()).is_cuda:
mask = mask + t.triu(t.ones(decoder_len, decoder_len).cuda(),
diagonal=1).repeat(batch_size, 1, 1).byte()
else:
mask = mask + t.triu(t.ones(decoder_len, decoder_len), diagonal=1).repeat(batch_size, 1, 1).byte()
mask = mask.gt(0) # [bsz,max_T,max_T]
zero_mask = c_mask.eq(0).unsqueeze(-1).repeat(1, 1, decoder_len) # [bsz,max_text_len,max_T],padding部分为1,非padding部分为0
zero_mask = zero_mask.transpose(1, 2) # [bsz,max_T,max_text_len]
else:
if next(self.parameters()).is_cuda:
mask = t.triu(t.ones(decoder_len, decoder_len).cuda(), diagonal=1).repeat(batch_size, 1, 1).byte()
else:
mask = t.triu(t.ones(decoder_len, decoder_len), diagonal=1).repeat(batch_size, 1, 1).byte()
mask = mask.gt(0) # [bsz,max_T,max_T]
m_mask, zero_mask = None, None
# Decoder pre-network
decoder_input = self.decoder_prenet(decoder_input)
# Centered position
decoder_input = self.norm(decoder_input)
# Get positional embedding, apply alpha and add
pos = self.pos_emb(pos)
decoder_input = pos * self.alpha + decoder_input
# Positional dropout
decoder_input = self.pos_dropout(decoder_input)
# Attention decoder-decoder, encoder-decoder
attn_dot_list = list() # 记录decoder中encoder和decoder的交叉注意力层的输出
attn_dec_list = list() # 记录decoder中自注意力层的输出
for selfattn, dotattn, ffn in zip(self.selfattn_layers, self.dotattn_layers, self.ffns):
decoder_input, attn_dec = selfattn(decoder_input, decoder_input, mask=mask, query_mask=m_mask) # 自注意力
decoder_input, attn_dot = dotattn(memory, decoder_input, mask=zero_mask, query_mask=m_mask) # 交叉注意力
decoder_input = ffn(decoder_input)
attn_dot_list.append(attn_dot)
attn_dec_list.append(attn_dec)
# Mel linear projection
mel_out = self.mel_linear(decoder_input)
# Post Mel Network
postnet_input = mel_out.transpose(1, 2)
out = self.postconvnet(postnet_input) # decoder侧的后处理
out = postnet_input + out
out = out.transpose(1, 2)
# Stop tokens
stop_tokens = self.stop_linear(decoder_input) # 停止符的预测
return mel_out, out, attn_dot_list, stop_tokens, attn_dec_list
class Model(nn.Module):
"""
Transformer Network
"""
def __init__(self):
super(Model, self).__init__()
self.encoder = Encoder(hp.embedding_size, hp.hidden_size)
self.decoder = MelDecoder(hp.hidden_size)
def forward(self, characters, mel_input, pos_text, pos_mel):
memory, c_mask, attns_enc = self.encoder.forward(characters, pos=pos_text)
mel_output, postnet_output, attn_probs, stop_preds, attns_dec = self.decoder.forward(memory, mel_input, c_mask,
pos=pos_mel)
return mel_output, postnet_output, attn_probs, stop_preds, attns_enc, attns_dec
class ModelPostNet(nn.Module):
"""
CBHG Network (mel --> linear),使用CBHG将mel谱图转换成显性的mag谱图
"""
def __init__(self):
super(ModelPostNet, self).__init__()
self.pre_projection = Conv(hp.n_mels, hp.hidden_size)
self.cbhg = CBHG(hp.hidden_size)
self.post_projection = Conv(hp.hidden_size, (hp.n_fft // 2) + 1)
def forward(self, mel):
mel = mel.transpose(1, 2)
mel = self.pre_projection(mel)
mel = self.cbhg(mel).transpose(1, 2)
mag_pred = self.post_projection(mel).transpose(1, 2)
return mag_pred
if __name__ == '__main__':
a = get_sinusoid_encoding_table(10, 5, padding_idx=[0, 5])
print(a)
其中调用了utils中的get_sinusoid_encoding_table()函数进行位置编码,代码如下
def get_sinusoid_encoding_table(n_position, d_hid, padding_idx=None):
'''
Sinusoid position encoding table,正弦位置编码表
:param n_position:序列的长度
:param d_hid:编码后序列中每个位置上数值的尺寸
:param padding_idx:指定需要padding的位置的索引,即index
'''
def cal_angle(position, hid_idx):
return position / np.power(10000, 2 * (hid_idx // 2) / d_hid)
def get_posi_angle_vec(position):
return [cal_angle(position, hid_j) for hid_j in range(d_hid)]
sinusoid_table = np.array([get_posi_angle_vec(pos_i) for pos_i in range(n_position)])
sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2]) # dim 2i
sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2]) # dim 2i+1
if padding_idx is not None:
# zero vector for padding dimension
sinusoid_table[padding_idx] = 0.
return t.FloatTensor(sinusoid_table)
完成上述模块搭建后,即可开始模型训练。使用train_transformer.py进行text->mel转变
# 用于训练自回归注意网络,(text --> mel)
from preprocess import get_dataset, DataLoader, collate_fn_transformer
from network import *
from tensorboardX import SummaryWriter
import torchvision.utils as vutils
import os
from tqdm import tqdm
# 动态调整学习率
def adjust_learning_rate(optimizer, step_num, warmup_step=4000):
lr = hp.lr * warmup_step ** 0.5 * min(step_num * warmup_step ** -1.5, step_num ** -0.5)
for param_group in optimizer.param_groups:
param_group['lr'] = lr
def main():
dataset = get_dataset() # 获得数据
global_step = 0
m = nn.DataParallel(Model().cuda()) # 初始化模型;如果有多个gpu,在多个gpu上并行训练
m.train()
optimizer = t.optim.Adam(m.parameters(), lr=hp.lr)
pos_weight = t.FloatTensor([5.]).cuda()
writer = SummaryWriter() # 初始化tensorboard中的对象
for epoch in range(hp.epochs): # 训练10000个epoch
dataloader = DataLoader(dataset, batch_size=hp.batch_size, shuffle=True, collate_fn=collate_fn_transformer,
drop_last=True, num_workers=16)
pbar = tqdm(dataloader)
for i, data in enumerate(pbar):
pbar.set_description("Processing at epoch %d" % epoch)
global_step += 1 # 每训练一个batch就加1
if global_step < 400000: # 当global_step小于400000之前,每个batch训练时都进行lr调整
adjust_learning_rate(optimizer, global_step)
character, mel, mel_input, pos_text, pos_mel, _ = data
stop_tokens = t.abs(pos_mel.ne(0).type(t.float) - 1)
# 将数据都传入gpu中
character = character.cuda()
mel = mel.cuda()
mel_input = mel_input.cuda()
pos_text = pos_text.cuda()
pos_mel = pos_mel.cuda()
# 模型的前向计算
mel_pred, postnet_pred, attn_probs, stop_preds, attns_enc, attns_dec = m.forward(character, mel_input,
pos_text, pos_mel)
mel_loss = nn.L1Loss()(mel_pred, mel) # 未经postconvnet处理的mel的损失
post_mel_loss = nn.L1Loss()(postnet_pred, mel) # 经过postconvnet处理的mel的损失
loss = mel_loss + post_mel_loss
writer.add_scalars('training_loss', {
'mel_loss': mel_loss,
'post_mel_loss': post_mel_loss, }, global_step) # 记录训练过程中的损失
writer.add_scalars('alphas', {
'encoder_alpha': m.module.encoder.alpha.data,
'decoder_alpha': m.module.decoder.alpha.data,
}, global_step) # 记录训练时位置编码中参数alpha
if global_step % hp.image_step == 1: # 每训练500个batch
for i, prob in enumerate(attn_probs): # 将decoder中的交叉注意力保存为图像
num_h = prob.size(0)
for j in range(4):
x = vutils.make_grid(prob[j * 16] * 255)
writer.add_image('Attention_%d_0' % global_step, x, i * 4 + j)
for i, prob in enumerate(attns_enc): # 将encoder中的自注意力保存为图像
num_h = prob.size(0)
for j in range(4):
x = vutils.make_grid(prob[j * 16] * 255)
writer.add_image('Attention_enc_%d_0' % global_step, x, i * 4 + j)
for i, prob in enumerate(attns_dec): # 将decoder中的自注意力保存为图像
num_h = prob.size(0)
for j in range(4):
x = vutils.make_grid(prob[j * 16] * 255)
writer.add_image('Attention_dec_%d_0' % global_step, x, i * 4 + j)
optimizer.zero_grad()
# Calculate gradients
loss.backward() # 梯度回传
nn.utils.clip_grad_norm_(m.parameters(), 1.) # 梯度裁剪
# Update weights
optimizer.step() # 参数更新
if global_step % hp.save_step == 0: # 每2000个step进行一次权重保存
t.save({'model': m.state_dict(),
'optimizer': optimizer.state_dict()},
os.path.join(hp.checkpoint_path, 'checkpoint_transformer_%d.pth.tar' % global_step))
if __name__ == '__main__':
main()
要想获得语音,还要将mel谱图转换为mag幅度图,单独分开训练一个postnet网络实现该过程
# 训练后部分网络,(mel --> linear)
from preprocess import get_post_dataset, DataLoader, collate_fn_postnet
from network import *
from tensorboardX import SummaryWriter
import torchvision.utils as vutils
import os
from tqdm import tqdm
# 动态调整学习率
def adjust_learning_rate(optimizer, step_num, warmup_step=4000):
lr = hp.lr * warmup_step ** 0.5 * min(step_num * warmup_step ** -1.5, step_num ** -0.5)
for param_group in optimizer.param_groups:
param_group['lr'] = lr
def main():
dataset = get_post_dataset() # 获得数据
global_step = 0
m = nn.DataParallel(ModelPostNet().cuda()) # 初始化模型;如果有多个gpu,在多个gpu上并行训练
m.train()
optimizer = t.optim.Adam(m.parameters(), lr=hp.lr)
writer = SummaryWriter() # 初始化tensorboard的对象
for epoch in range(hp.epochs):
dataloader = DataLoader(dataset, batch_size=hp.batch_size, shuffle=True, collate_fn=collate_fn_postnet,
drop_last=True, num_workers=8)
pbar = tqdm(dataloader)
for i, data in enumerate(pbar):
pbar.set_description("Processing at epoch %d" % epoch)
global_step += 1
if global_step < 400000:
adjust_learning_rate(optimizer, global_step)
mel, mag = data
mel = mel.cuda()
mag = mag.cuda()
mag_pred = m.forward(mel)
loss = nn.L1Loss()(mag_pred, mag) # 计算mag的L1Loss
writer.add_scalars('training_loss', {
'loss': loss, }, global_step) # 记录损失
optimizer.zero_grad()
# Calculate gradients
loss.backward() # 梯度回传
nn.utils.clip_grad_norm_(m.parameters(), 1.) # 梯度裁剪
# Update weights
optimizer.step() # 参数更新
if global_step % hp.save_step == 0: # 每2000个step进行一次权重保存
t.save({'model': m.state_dict(),
'optimizer': optimizer.state_dict()},
os.path.join(hp.checkpoint_path, 'checkpoint_postnet_%d.pth.tar' % global_step))
if __name__ == '__main__':
main()
完成两个模型的训练后,可使用synthesis.py文件调用训练好的模型进行文本生成语音
# 使用文本生成wav音频文件
import torch as t
from utils import spectrogram2wav
from scipy.io.wavfile import write
import hyperparams as hp
from text import text_to_sequence
import numpy as np
from network import ModelPostNet, Model
from collections import OrderedDict
from tqdm import tqdm
import argparse
# 加载保存的模型参数
def load_checkpoint(step, model_name="transformer"):
state_dict = t.load('./checkpoint/checkpoint_%s_%d.pth.tar' % (model_name, step))
new_state_dict = OrderedDict()
for k, value in state_dict['model'].items():
key = k[7:]
new_state_dict[key] = value
return new_state_dict
def synthesis(text, args):
m = Model()
m_post = ModelPostNet()
# 模型加载参数
m.load_state_dict(load_checkpoint(args.restore_step1, "transformer"))
m_post.load_state_dict(load_checkpoint(args.restore_step2, "postnet"))
# 因为文本分析序列化
text = np.asarray(text_to_sequence(text, [hp.cleaners]))
text = t.LongTensor(text).unsqueeze(0) # 需要是一个二维张量
text = text.cuda()
mel_input = t.zeros([1, 1, 80]).cuda()
pos_text = t.arange(1, text.size(1) + 1).unsqueeze(0) # 用于构建mask
pos_text = pos_text.cuda()
m = m.cuda()
m_post = m_post.cuda()
m.train(False)
m_post.train(False)
pbar = tqdm(range(args.max_len)) # 使用max_len设置最大的预测长度
with t.no_grad():
for i in pbar:
pos_mel = t.arange(1, mel_input.size(1) + 1).unsqueeze(0).cuda()
mel_pred, postnet_pred, attn, stop_token, _, attn_dec = m.forward(text, mel_input, pos_text, pos_mel)
mel_input = t.cat([mel_input, mel_pred[:, -1:, :]], dim=1) # 将每次预测生成的mel与之前的mel进行cat作为一下次计算的输入
mag_pred = m_post.forward(postnet_pred) # 使用最后输出的经过postconvnet处理的mel谱图生成mag谱图
wav = spectrogram2wav(mag_pred.squeeze(0).cpu().numpy()) # 基于mag谱图生成音频
write(hp.sample_path + "/test1.wav", hp.sr, wav)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--restore_step1', type=int, help='Global step to restore checkpoint', default=160000)
parser.add_argument('--restore_step2', type=int, help='Global step to restore checkpoint', default=100000)
parser.add_argument('--max_len', type=int, help='Global step to restore checkpoint', default=1000)
args = parser.parse_args()
synthesis("Transformer model is so fast!", args)
上述代码中基于mag谱图生成音频文件主要是基于Griffin-Lim算法,相应的代码如下
def spectrogram2wav(mag):
'''基于mag还原为音频文件,主要基于griffin_lim算法
# Generate wave file from linear magnitude spectrogram
Args:
mag: A numpy array of (T, 1+n_fft//2)
Returns:
wav: A 1-D numpy array.
'''
# transpose
mag = mag.T
# de-noramlize
mag = (np.clip(mag, 0, 1) * hp.max_db) - hp.max_db + hp.ref_db
# to amplitude
mag = np.power(10.0, mag * 0.05)
# wav reconstruction
wav = griffin_lim(mag ** hp.power)
# de-preemphasis
wav = signal.lfilter([1], [1, -hp.preemphasis], wav)
# trim
wav, _ = librosa.effects.trim(wav)
return wav.astype(np.float32)
def griffin_lim(spectrogram):
'''Applies Griffin-Lim's raw.'''
X_best = copy.deepcopy(spectrogram)
for i in range(hp.n_iter):
X_t = invert_spectrogram(X_best)
est = librosa.stft(X_t, hp.n_fft, hp.hop_length, win_length=hp.win_length)
phase = est / np.maximum(1e-8, np.abs(est))
X_best = spectrogram * phase
X_t = invert_spectrogram(X_best)
y = np.real(X_t)
return y
def invert_spectrogram(spectrogram):
'''Applies inverse fft.
Args:
spectrogram: [1+n_fft//2, t]
'''
return librosa.istft(spectrogram, hp.hop_length, win_length=hp.win_length, window="hann")
至此,Transformer-TTS主要代码解析完毕,本人使用了github中提供的模型参数进行了音频文件的生成,对于训练集中的文本合成的音频效果很好,但是模型没见过的文本合成的音频效果不是很好。有兴趣的读者可以自己尝试一下,官方GitHub代码clone下来就能运行。上述代码中的注释难免存在错误,如若发现请留言告知修改;后续也会对其他TTS论文以及代码库进行分析。