seq2seq 网络搭建

皇甫高阳
2023-12-01

作为一个自然言语处理的新手,以前都是在做项目是直接下载别人的模型,进行修改之后直接拿来用。前段时间试着自己搭建seq2seq网络踩了很多的坑,终究是勉强的搭建了起来。在此记录一下分享一下,希望能和正在自然语言处理学习和进阶的伙伴共同努力。写的不好也请各位大神恕罪。

废话不多说了进入正题。在搭建的过程参考大神的项目https://github.com/wb14123/seq2seq-couplet

class seqModel():
    def __init__(self,vocab_size,batch_size,unit_nums,layer_size,output_keep_prob,FLAGS):
        self.unit_nums=unit_nums
        self.vocab_size=vocab_size
        self.batch_size=batch_size
        self.layer_size=layer_size
        self.output_keep_prob=output_keep_prob
        self.train_in_seq = tf.placeholder(dtype=tf.int32, shape=[FLAGS.batch_size, None], name="train_in_seq")
        self.train_in_seq_len = tf.placeholder(dtype=tf.int32, shape=[FLAGS.batch_size], name="train_in_seqs_len")
        self.test_seq = tf.placeholder(dtype=tf.int32, shape=[FLAGS.batch_size, None], name="target_seqs")
        self.test_seq_len = tf.placeholder(dtype=tf.int32, shape=[FLAGS.batch_size], name="target_seq_len")
        self.output = self.seq2seq(self.train_in_seq, self.train_in_seq_len, self.test_seq, self.test_seq_len)
        self.loss = self.seq_loss(self.output, self.test_seq, self.test_seq_len)
        self.adam = tf.train.AdamOptimizer(learning_rate=FLAGS.lr)
        self.opt = self.adam.minimize(self.loss)
        params = tf.trainable_variables()
        gradients = tf.gradients(self.loss, params)
        clipped_gradients, _ = tf.clip_by_global_norm(
            gradients, 0.5)
        self.train_op = tf.train.AdamOptimizer(
            learning_rate=FLAGS.lr
        ).apply_gradients(zip(clipped_gradients, params))

seq 模型的初始画函数参数含义:vocab_size 为词典的大小,unit_nums 为lstm神经元的数量,layer_size 为rnn 网络的层数。其它的参数大家看名字就可知道含义就不在此赘言。

def encoder(self,encoder_inputs,in_seq_len):
    with tf.device('/gpu:0'):
        embedding = tf.get_variable(
            name='embedding',
            shape=[self.vocab_size, self.unit_nums])
    embed_input = tf.nn.embedding_lookup(embedding, encoder_inputs, name='embed_input')  # 头一层
    bi_layer_size = int(self.layer_size / 2)
    encode_cell_fw = self.getLayeredCell(bi_layer_size, self.unit_nums, self.output_keep_prob)
    encode_cell_bw = self.getLayeredCell(bi_layer_size, self.unit_nums, self.output_keep_prob)
    #
    bi_encoder_output, bi_encoder_state = tf.nn.bidirectional_dynamic_rnn(
        cell_fw=encode_cell_fw,
        cell_bw=encode_cell_bw,
        inputs=embed_input,
        sequence_length=in_seq_len,
        dtype=embed_input.dtype,
        time_major=False)

    # concat encode output and state
    encoder_output = tf.concat(bi_encoder_output, -1)
    encoder_state = []
    for layer_id in range(bi_layer_size):
        encoder_state.append(bi_encoder_state[0][layer_id])
        encoder_state.append(bi_encoder_state[1][layer_id])
    encoder_state = tuple(encoder_state)
    self.encoder_output=encoder_output
    self.encoder_state=encoder_state

encoder 对输入进行编码,

tf.nn.embedding_lookup(params, ids, partition_strategy='mod', max_norm=None)

对输入的数据进行映射,将输入的id 映射为vocab_size*unit_nums 的向量。

def getLayeredCell(self,layer_size, num_units, input_keep_prob,
                   output_keep_prob=1.0):
    return tf.contrib.rnn.MultiRNNCell([tf.contrib.rnn.DropoutWrapper(tf.contrib.rnn.BasicLSTMCell(num_units),
                                                input_keep_prob, output_keep_prob) for i in range(layer_size)])

encode_cell_fw,和encode_cell_bw 都为有unit_nums神经元的layer_size 的lstm 网络。

bi_encoder_output为(output_fw, output_bw),是一个包含前向cell输出tensor和后向cell输出tensor组成的二元组。
bi_encoder_state 为(output_state_fw, output_state_bw),包含了前向和后向最后的隐藏状态的组成的二元组。 
output_state_fw和output_state_bw的类型为LSTMStateTuple。而LSTMStateTuple由(c,h)组成,分别代表memory cell和hidden state。

encoder_output = tf.concat(bi_encoder_output, -1) 将前向网络和后向lstm 网络的输出进行拼接

将多层lstm网络的输出状态合并到一起。

def attention_decoder_cell(self,encoder_output, in_seq_len, num_units, layer_size,
                           input_keep_prob):
    # attention 注意力
    attention_mechanim = tf.contrib.seq2seq.BahdanauAttention(num_units,
                                                              encoder_output, in_seq_len, normalize=True)
    cell = self.getLayeredCell(layer_size, num_units, input_keep_prob)  # 给神经元定义好的注意力
    # attentionWrapper 给decoder的cell 应用定义的attention 机制
    cell = tf.contrib.seq2seq.AttentionWrapper(cell, attention_mechanim,
                                               attention_layer_size=num_units)
    return cell

注意力机制进行解码

attention_mechanim = tf.contrib.seq2seq.BahdanauAttention(num_units, encoder_output, in_seq_len, normalize=True)

nun_units 在编码的过程产生的特征的大小,encoder_output 在RNNencoder中,维数为[batch_size, max_time, num_units],即encoder阶段产生了max_time个大小为num_units的特征向量,in_seq_len 记录memory中 的特征向量的长度,维数是[batch_size,],令memory中超过memory_sequence_length的值为0

AttentionWrapper 是一个 cell wrapper , 给 decoder 的cell 应用定义的 attention 机制

参考https://www.jianshu.com/p/f3647e156198

def seq2seq(self,in_seq,in_seq_len,target_seq,target_seq_len):
    self.encoder(in_seq,in_seq_len)
    self.decoder_attention_all(in_seq_len)
    project_layer=layers_core.Dense(self.vocab_size)
    with tf.device("/gpu:0"):
        embedding=tf.get_variable(
            name="embedding1",
            shape=[self.vocab_size,self.unit_nums]
        )
    init_state=self.decoder_cell.zero_state(self.batch_size, tf.float32).clone(
        cell_state=self.encoder_state)  
    if target_seq!=None:
        self.out_embedding=tf.nn.embedding_lookup(embedding,target_seq)
        helper=tf.contrib.seq2seq.TrainingHelper(self.out_embedding,target_seq_len,time_major=False)
    else:
        helper=tf.contrib.seq2seq.GreedyEmbeddingHelper(embedding,tf.fill([self.batch_size],0),1)
    decoder=tf.contrib.seq2seq.BasicDecoder(self.decoder_cell,helper,init_state,output_layer=project_layer)
    outputs,_,_=tf.contrib.seq2seq.dynamic_decode(decoder,maximum_iterations=100)
    if target_seq != None:
        return outputs.rnn_output  # 输出
    else:
        return outputs.sample_id

tf.contrib.seq2seq.TrainingHelper  用于训练时 

input: 上一个时刻的 , 对于第一个解码,输入是 <GO> 对应的的字典下标的向量,经过 embedding 后的矩阵

sequence_length: decoder 目标的长度

tf.contrib.seq2seq.GreedyEmbeddingHelper  用于测试

input: 是embedding后的词向量库, callable, 每个上一时刻的预测值均会经过这个 embedding 转化为词向量

start_tokens: 开始的下标向量, 即 <GO> 在字典中的下标 index 的常量向量

end_token: 结束标志的下标值

tf.fill(shape,value)生成一个定制大小的矩阵

tf.contrib.seq2seq.dynamic_decode   动态的解码器, 可以解码不同长度的输入,但每个batch的解码长度必须相同

(final_outputs, final_state, final_sequence_lengths).
其中,final_outputs是一个namedtuple,里面包含两项(rnn_outputs, sample_id)
rnn_output: [batch_size, decoder_targets_length, vocab_size],保存decode每个时刻每个单词的概率,可以用来计算loss
sample_id: [batch_size], tf.int32,保存最终的编码结果。可以表示最后的答案

def seq_loss(self,output, target, seq_len):
    target = target[:, 1:]
    cost = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=output,labels=target)
    batch_size = tf.shape(target)[0]
    loss_mask = tf.sequence_mask(seq_len, tf.shape(output)[1])
    cost = cost * tf.to_float(loss_mask)
    return tf.reduce_sum(cost) / tf.to_float(batch_size)
def run_step(self,sess,batch_data,is_train):
    train_seq = batch_data["in_seq"]
    train_seq_len = batch_data["in_seq_len"]
    target_seq = batch_data["target_seq"]
    target_seq_len = batch_data["target_seq_len"]
    self.feed_dict={self.train_in_seq:train_seq,  self.train_in_seq_len:train_seq_len,self.test_seq:target_seq,self.test_seq_len:target_seq_len}
    if is_train:
        output,loss=sess.run([self.output,self.loss],feed_dict=self.feed_dict)
        return output,loss
    else:
         loss = sess.run( self.loss, feed_dict=self.feed_dict)
         return loss

 

 

 类似资料: