文本生成(二)---Seq2Seq + attention paddle实现

宗波涛
2023-12-01

一 前言

本文是文本生成系列的文章的第二篇,主要是seq2seq+attention的paddle实现.
本文完整实现seq2seq+att用于生成对联,包括数据预处理,训练,预测。
同时,本文的代码也很容易迁移到其他任务上,如对话。

第一篇文章为讲了关于seq2seq的相关理论内容。文本生成(一)—Seq2Seq理论笔记

二 项目结构

seq2seq/
	couplet_models/ --模型参数
   para.py      --模型超参数设置文件入口
   train.py     --训练代码
   predict.py    --生成代码
   dataset.py    --couplet数据集预测代码
   model.py     --模型网络结构定义代码

三 使用

训练
1.安装paddlenlp

pip install paddlenlp==2.0.0rc1
cd seq2seq
python train.py

预测

python predict.py

源码

1.para.py

#%% 加载需要用到的模块
import io
import os
 
from functools import partial
 
import numpy as np

import paddle
import paddle.nn as nn
import paddle.nn.functional as F
from paddlenlp.data import Vocab, Pad
from paddlenlp.metrics import Perplexity
from paddlenlp.datasets import CoupletDataset

# import paddlenlp
# paddlenlp.__version__
# '2.0.0rc1'

#%% 超参数设置
batch_size = 128
max_epoch = 20
learning_rate = 0.001

num_layers = 2
hidden_size =256
dropout = 0.2
max_grad_norm = 5.0

model_path = './couplet_models'
log_freq = 200

beam_size = 10

use_gpu = False
device = paddle.set_device("gpu" if use_gpu else "cpu")

vocab, _ = CoupletDataset.get_vocab()
pad_id = vocab[CoupletDataset.EOS_TOKEN]
bos_id = vocab[CoupletDataset.BOS_TOKEN]
eos_id = vocab[CoupletDataset.EOS_TOKEN]
trg_idx2word = vocab.idx_to_token
vocab_size = len(vocab)

2.dataset.py

from para import *

#数据预处理
def create_data_loader(dataset):
    data_loader = paddle.io.DataLoader(
        dataset,
        batch_sampler=None,
        batch_size = batch_size,
        collate_fn=partial(prepare_input, pad_id=pad_id))
    return data_loader
 
def prepare_input(insts, pad_id):
    src, src_length = Pad(pad_val=pad_id, ret_length=True)([inst[0] for inst in insts])
    tgt, tgt_length = Pad(pad_val=pad_id, ret_length=True)([inst[1] for inst in insts])
    tgt_mask = (tgt[:, :-1] != pad_id).astype(paddle.get_default_dtype())
    
    return src.astype("int64"), src_length, tgt[:, :-1].astype("int64"), tgt[:, 1:, np.newaxis].astype("int64"), tgt_mask


def coupletLoader():
    #加载数据集
    train_ds, dev_ds, test_ds = CoupletDataset.get_datasets(['train', 'dev', 'test'])
    #定义数据加载器
    train_loader = create_data_loader(train_ds)
    test_loader = create_data_loader(test_ds)
    return train_loader,test_loader

if __name__ == "__main__":
    train_loader,test_loader = coupletLoader()
    print("data shape:")
    for i in train_loader:
        print (len(i))
        for ind, each in enumerate(i):
            print (ind, each.shape)
        break


1.model.py

from para import *

#embedding -> lstm ->encoder_output(h),encoder_state(c) 
class Seq2SeqEncoder(nn.Layer):
    def __init__(self, vocab_size, embed_dim, hidden_size, num_layers):
        super(Seq2SeqEncoder, self).__init__()
        self.embedder = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(
            input_size=embed_dim,
            hidden_size=hidden_size,
            num_layers=num_layers,
            dropout=0.2 if num_layers > 1 else 0.)
 
    def forward(self, sequence, sequence_length):
        inputs = self.embedder(sequence)
        encoder_output, encoder_state = self.lstm(
            inputs, sequence_length=sequence_length)
        
        # encoder_output [128, 18, 256]  [batch_size,time_steps,hidden_size]
        # encoder_state (tuple) - 最终状态,一个包含h和c的元组。 [2, 128, 256] [2, 128, 256] [num_lauers * num_directions, batch_size, hidden_size]
        return encoder_output, encoder_state
    

class AttentionLayer(nn.Layer):
    def __init__(self, hidden_size):
        super(AttentionLayer, self).__init__()
        self.input_proj = nn.Linear(hidden_size, hidden_size)
        self.output_proj = nn.Linear(hidden_size + hidden_size, hidden_size)
 
    def forward(self, hidden, encoder_output, encoder_padding_mask):
        encoder_output = self.input_proj(encoder_output)
        attn_scores = paddle.matmul(
            paddle.unsqueeze(hidden, [1]), encoder_output, transpose_y=True)
        # print('attention score', attn_scores.shape) #[128, 1, 18]
 
        if encoder_padding_mask is not None:
            attn_scores = paddle.add(attn_scores, encoder_padding_mask)
 
        attn_scores = F.softmax(attn_scores)
        attn_out = paddle.squeeze(
            paddle.matmul(attn_scores, encoder_output), [1])
        # print('1 attn_out', attn_out.shape) #[128, 256]
 
        attn_out = paddle.concat([attn_out, hidden], 1)
        # print('2 attn_out', attn_out.shape) #[128, 512]
 
        attn_out = self.output_proj(attn_out)
        # print('3 attn_out', attn_out.shape) #[128, 256]
        return attn_out
    
#lstm_cells->attention
class Seq2SeqDecoderCell(nn.RNNCellBase):
    def __init__(self, num_layers, input_size, hidden_size):
        super(Seq2SeqDecoderCell, self).__init__()
        self.dropout = nn.Dropout(0.2)
        self.lstm_cells = nn.LayerList([
            nn.LSTMCell(
                input_size=input_size + hidden_size if i == 0 else hidden_size,
                hidden_size=hidden_size) for i in range(num_layers)
        ])
        self.attention_layer = AttentionLayer(hidden_size)
    
    def forward(self,
                step_input,
                states,
                encoder_output,
                encoder_padding_mask=None):
        lstm_states, input_feed = states
        new_lstm_states = []
        step_input = paddle.concat([step_input, input_feed], 1)
        for i, lstm_cell in enumerate(self.lstm_cells):
            out, new_lstm_state = lstm_cell(step_input, lstm_states[i])
            step_input = self.dropout(out)
            new_lstm_states.append(new_lstm_state)
        out = self.attention_layer(step_input, encoder_output,
                                   encoder_padding_mask)
        return out, [new_lstm_states, out]
    
#embedding->lstm_attention->linear
class Seq2SeqDecoder(nn.Layer):
    def __init__(self, vocab_size, embed_dim, hidden_size, num_layers):
        super(Seq2SeqDecoder, self).__init__()
        self.embedder = nn.Embedding(vocab_size, embed_dim)
        self.lstm_attention = nn.RNN(
            Seq2SeqDecoderCell(num_layers, embed_dim, hidden_size))
        self.output_layer = nn.Linear(hidden_size, vocab_size)
    def forward(self, trg, decoder_initial_states, encoder_output,
                encoder_padding_mask):
        inputs = self.embedder(trg)
        decoder_output, _ = self.lstm_attention(
            inputs,
            initial_states=decoder_initial_states,
            encoder_output=encoder_output,
            encoder_padding_mask=encoder_padding_mask)
        predict = self.output_layer(decoder_output)
        return predict
    
    
class Seq2SeqAttnModel(nn.Layer):
    def __init__(self, vocab_size, embed_dim, hidden_size, num_layers,
                 eos_id=1):
        super(Seq2SeqAttnModel, self).__init__()
        self.hidden_size = hidden_size
        self.eos_id = eos_id
        self.num_layers = num_layers
        self.INF = 1e9
        self.encoder = Seq2SeqEncoder(vocab_size, embed_dim, hidden_size,
                                      num_layers)
        self.decoder = Seq2SeqDecoder(vocab_size, embed_dim, hidden_size,
                                      num_layers)
 
    def forward(self, src, src_length, trg):
        # encoder_output 各时刻的输出h
        # encoder_final_state 最后时刻的输出h,和记忆信号c
        encoder_output, encoder_final_state = self.encoder(src, src_length)
        # print('encoder_output shape', encoder_output.shape)  #  [128, 18, 256]  [batch_size,time_steps,hidden_size]
        # print('encoder_final_states shape', encoder_final_state[0].shape, encoder_final_state[1].shape) #[2, 128, 256] [2, 128, 256] [num_lauers * num_directions, batch_size, hidden_size]
 
        # Transfer shape of encoder_final_states to [num_layers, 2, batch_size, hidden_size]???
        encoder_final_states = [
            (encoder_final_state[0][i], encoder_final_state[1][i])
            for i in range(self.num_layers)
        ]
        # print('encoder_final_states shape', encoder_final_states[0][0].shape, encoder_final_states[0][1].shape) #[128, 256] [128, 256]
 
 
        # Construct decoder initial states: use input_feed and the shape is
        # [[h,c] * num_layers, input_feed], consistent with Seq2SeqDecoderCell.states
        decoder_initial_states = [
            encoder_final_states,
            self.decoder.lstm_attention.cell.get_initial_states(
                batch_ref=encoder_output, shape=[self.hidden_size])
        ]
 
        # Build attention mask to avoid paying attention on padddings
        src_mask = (src != self.eos_id).astype(paddle.get_default_dtype())
        # print ('src_mask shape', src_mask.shape)  #[128, 18]
        # print(src_mask[0, :])
 
        encoder_padding_mask = (src_mask - 1.0) * self.INF
        # print ('encoder_padding_mask', encoder_padding_mask.shape)  #[128, 18]
        # print(encoder_padding_mask[0, :])
 
        encoder_padding_mask = paddle.unsqueeze(encoder_padding_mask, [1])
        # print('encoder_padding_mask', encoder_padding_mask.shape)  #[128, 1, 18]
 
        predict = self.decoder(trg, decoder_initial_states, encoder_output,
                               encoder_padding_mask)
        # print('predict', predict.shape)   #[128, 17, 7931]
 
        return predict
    
class CrossEntropyCriterion(nn.Layer):
    def __init__(self):
        super(CrossEntropyCriterion, self).__init__()
    def forward(self, predict, label, trg_mask):
        
        cost = F.softmax_with_cross_entropy(
            logits=predict, label=label, soft_label=False)
        cost = paddle.squeeze(cost, axis=[2])
        masked_cost = cost * trg_mask
        batch_mean_cost = paddle.mean(masked_cost, axis=[0])
        seq_cost = paddle.sum(batch_mean_cost)
        print("cost:",seq_cost)
        return seq_cost
    

1.train.py

from para import *
from model import Seq2SeqAttnModel,CrossEntropyCriterion
from dataset import coupletLoader

train_loader,test_loader = coupletLoader()

model = paddle.Model(
    Seq2SeqAttnModel(vocab_size, hidden_size, hidden_size,
                        num_layers, pad_id))
 
optimizer = paddle.optimizer.Adam(
    learning_rate=learning_rate, parameters=model.parameters())
ppl_metric = Perplexity()
model.prepare(optimizer, CrossEntropyCriterion(), ppl_metric)
 

model.fit(train_data=train_loader,
            epochs=max_epoch,
            eval_freq=1,
            save_freq=1,
            save_dir=model_path,
            log_freq=log_freq)

1.predict.py

from para import *
from model import Seq2SeqAttnModel
from dataset import coupletLoader

class Seq2SeqAttnInferModel(Seq2SeqAttnModel):
    def __init__(self,
                 vocab_size,
                 embed_dim,
                 hidden_size,
                 num_layers,
                 bos_id=0,
                 eos_id=1,
                 beam_size=4,
                 max_out_len=256):
        self.bos_id = bos_id
        self.beam_size = beam_size
        self.max_out_len = max_out_len
        self.num_layers = num_layers
        super(Seq2SeqAttnInferModel, self).__init__(
            vocab_size, embed_dim, hidden_size, num_layers, eos_id)
 
        # Dynamic decoder for inference
        self.beam_search_decoder = nn.BeamSearchDecoder(
            self.decoder.lstm_attention.cell,
            start_token=bos_id,
            end_token=eos_id,
            beam_size=beam_size,
            embedding_fn=self.decoder.embedder,
            output_fn=self.decoder.output_layer)
 
    def forward(self, src, src_length):
        encoder_output, encoder_final_state = self.encoder(src, src_length)
 
        encoder_final_state = [
            (encoder_final_state[0][i], encoder_final_state[1][i])
            for i in range(self.num_layers)
        ]
 
        # Initial decoder initial states
        decoder_initial_states = [
            encoder_final_state,
            self.decoder.lstm_attention.cell.get_initial_states(
                batch_ref=encoder_output, shape=[self.hidden_size])
        ]
        # Build attention mask to avoid paying attention on paddings
        src_mask = (src != self.eos_id).astype(paddle.get_default_dtype())
 
        encoder_padding_mask = (src_mask - 1.0) * self.INF
        encoder_padding_mask = paddle.unsqueeze(encoder_padding_mask, [1])
 
        # Tile the batch dimension with beam_size
        encoder_output = nn.BeamSearchDecoder.tile_beam_merge_with_batch(
            encoder_output, self.beam_size)
        encoder_padding_mask = nn.BeamSearchDecoder.tile_beam_merge_with_batch(
            encoder_padding_mask, self.beam_size)
 
        # Dynamic decoding with beam search
        seq_output, _ = nn.dynamic_decode(
            decoder=self.beam_search_decoder,
            inits=decoder_initial_states,
            max_step_num=self.max_out_len,
            encoder_output=encoder_output,
            encoder_padding_mask=encoder_padding_mask)
        return seq_output
    
def post_process_seq(seq, bos_idx, eos_idx, output_bos=False, output_eos=False):
    """
    Post-process the decoded sequence.
    """
    eos_pos = len(seq) - 1
    for i, idx in enumerate(seq):
        if idx == eos_idx:
            eos_pos = i
            break
    seq = [
        idx for idx in seq[:eos_pos + 1]
        if (output_bos or idx != bos_idx) and (output_eos or idx != eos_idx)
    ]
    return seq

beam_size = 10
init_from_ckpt = './couplet_models/0' # for test
infer_output_file = './infer_output.txt'
 
train_dataset,test_loader = coupletLoader()

model = paddle.Model(
    Seq2SeqAttnInferModel(
        vocab_size,
        hidden_size,
        hidden_size,
        num_layers,
        bos_id=bos_id,
        eos_id=eos_id,
        beam_size=beam_size,
        max_out_len=256))
 
model.prepare()

model.load('C:/Users/Administrator/Desktop/NLP/seq2seq/couplet_models/0')

test_ds = CoupletDataset.get_datasets(['test'])
idx = 0
for data in test_loader():
    inputs = data[:2]
    finished_seq = model.predict_batch(inputs=list(inputs))[0]
    finished_seq = finished_seq[:, :, np.newaxis] if len(
        finished_seq.shape) == 2 else finished_seq
    finished_seq = np.transpose(finished_seq, [0, 2, 1])
    for ins in finished_seq:
        for beam in ins:
            id_list = post_process_seq(beam, bos_id, eos_id)
            word_list_l = [trg_idx2word[id] for id in test_ds[idx][0]][1:-1]
            word_list_r = [trg_idx2word[id] for id in id_list]
            sequence = "上联: "+" ".join(word_list_l)+"\t下联: "+" ".join(word_list_r) + "\n"
            print(sequence)
            idx += 1
            break
    break
        

参考

本项目代码主要参考
https://blog.csdn.net/qqGHJ/article/details/113761399 PaddleNLP_基于seq2seq的对联生成

理论部分
https://blog.csdn.net/weixin_45259896/article/details/124624751?spm=1001.2014.3001.5502 文本生成(一)—Seq2Seq理论笔记

aistudio项目
https://aistudio.baidu.com/aistudio/projectdetail/3957608 Seq2Seq 用于对联生成

 类似资料: