当前位置: 首页 > 工具软件 > Stacked View > 使用案例 >

pytorch根据特征图训练LSTM Stacked AutoEncoder

湛钊
2023-12-01

步骤一:构造训练数据

def get_train_data(cluster_shape=(2000, 50)):
    """得到训练数据,这里使用随机数生成训练数据,由此导致最终结果并不好"""

    def get_tensor_from_pd(dataframe_series) -> torch.Tensor:
        return torch.tensor(data=dataframe_series.values)

    # 生成训练数据x并做归一化后,构造成dataframe格式,再转换为tensor格式
    df = pd.DataFrame(data=preprocessing.MinMaxScaler().fit_transform(np.random.randint(0, 10, size=cluster_shape)))
    y = pd.Series(np.random.randint(0, 10, cluster_shape[0]))
    return get_tensor_from_pd(df).float(), get_tensor_from_pd(y).float()

步骤二:构造LSTM模型

class LstmStackedAutoEncoder(nn.Module):

    def __init__(self, embedding_size, feature_length, batch_size, hidden_layer_size=500, encode_size=200):
        super().__init__()
        self.embedding_size = embedding_size  # number of hidden states
        self.feature_length = feature_length
        self.hidden_layer_size = hidden_layer_size  # 自定义
        self.n_layers = 1
        self.batch_size = batch_size

        self.lstm_en = nn.LSTM(embedding_size, hidden_layer_size, batch_first=True)
        self.linear_en = nn.Linear(self.hidden_layer_size * self.feature_length, encode_size)

        self.linear_de = nn.Linear(encode_size, self.hidden_layer_size * self.feature_length)
        self.lstm_de = nn.LSTM(hidden_layer_size, embedding_size, batch_first=True)

        self.sigmoid = nn.Sigmoid()
        self.relu = nn.ReLU()

    def forward(self, input_x):
        # encoder
        en_lstm, (n, c) = self.lstm_en(input_x,  # input_x:[30,1480,25]
                                       (torch.zeros(self.n_layers, self.batch_size, self.hidden_layer_size),
                                        torch.zeros(self.n_layers, self.batch_size, self.hidden_layer_size)))
        en_lstm = en_lstm.contiguous().view(batch_size, -1)
        en_linear = self.linear_en(en_lstm)  # [30,1480,200]
        en_out = self.relu(en_linear)
        # decoder
        de_linear = self.linear_de(en_out)
        de_sigmoid = self.sigmoid(de_linear)  # [30,740000]
        # shape: (n_layers, batch, hidden_size)
        de_sigmoid = de_sigmoid.view([self.batch_size, self.feature_length, self.hidden_layer_size])  # [30,1480,500]
        de_out, (n, c) = self.lstm_de(de_sigmoid,
                                      # 隐层的最后一个维度的与输出的维度相同
                                      (torch.zeros(self.n_layers, self.batch_size, self.embedding_size),
                                       torch.zeros(self.n_layers, self.batch_size, self.embedding_size)))
        return de_out

构造三:构建训练数据

    logging.basicConfig(format='%(asctime)s - [line:%(lineno)d] - %(levelname)s: %(message)s',
                        level=logging.DEBUG)
    x, y = get_train_data()
    epochs = 2
    batch_size = 30
    packet_code = 10
    packet_length = 5
    train_loader = Data.DataLoader(
        dataset=Data.TensorDataset(x, y),  # 封装进Data.TensorDataset()类的数据,可以为任意维度
        batch_size=batch_size,  # 每块的大小
        shuffle=True,  # 要不要打乱数据 (打乱比较好)
        num_workers=6,  # 多进程(multiprocess)来读数据
        drop_last=True
    )

构造建模三件套

    # 建模三件套:loss,优化,epochs
    model = LstmStackedAutoEncoder(packet_length, packet_code, batch_size)  # lstm
    loss_function = nn.MSELoss()  # loss
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)  # 优化器

步骤四:训练模型并保存

    # 开始训练
    logging.debug("begin train")
    model.train()
    for ep in range(epochs):
        i = 0
        for seq, labels in train_loader:
            optimizer.zero_grad()
            input_seq = seq.view([batch_size, packet_code, packet_length])
            y_pred = model(input_seq).squeeze()  # 压缩维度:得到输出,并将维度为1的去除
            single_loss = loss_function(y_pred, input_seq)
            single_loss.backward()  # 进入到这一行,报错
            optimizer.step()
            logging.debug("TRAIN Finish" + str(i) + " LOSS " + str(single_loss))
            i += 1

保存模型:

    logging.debug("train finish")
    torch.save(model, 'stacked_auto_encoder.pt')
    logging.debug("save finish")

全部代码

import torch
import torch.nn as nn
import torch.utils.data as Data
import os
import logging
import numpy as np
import pandas as pd
from sklearn import preprocessing


def get_train_data(cluster_shape=(2000, 50)):
    """得到训练数据,这里使用随机数生成训练数据,由此导致最终结果并不好"""

    def get_tensor_from_pd(dataframe_series) -> torch.Tensor:
        return torch.tensor(data=dataframe_series.values)

    # 生成训练数据x并做归一化后,构造成dataframe格式,再转换为tensor格式
    df = pd.DataFrame(data=preprocessing.MinMaxScaler().fit_transform(np.random.randint(0, 10, size=cluster_shape)))
    y = pd.Series(np.random.randint(0, 10, cluster_shape[0]))
    return get_tensor_from_pd(df).float(), get_tensor_from_pd(y).float()


class LstmStackedAutoEncoder(nn.Module):

    def __init__(self, embedding_size, feature_length, batch_size, hidden_layer_size=500, encode_size=200):
        super().__init__()
        self.embedding_size = embedding_size  # number of hidden states
        self.feature_length = feature_length
        self.hidden_layer_size = hidden_layer_size  # 自定义
        self.n_layers = 1
        self.batch_size = batch_size

        self.lstm_en = nn.LSTM(embedding_size, hidden_layer_size, batch_first=True)
        self.linear_en = nn.Linear(self.hidden_layer_size * self.feature_length, encode_size)

        self.linear_de = nn.Linear(encode_size, self.hidden_layer_size * self.feature_length)
        self.lstm_de = nn.LSTM(hidden_layer_size, embedding_size, batch_first=True)

        self.sigmoid = nn.Sigmoid()
        self.relu = nn.ReLU()

    def forward(self, input_x):
        # encoder
        en_lstm, (n, c) = self.lstm_en(input_x,  # input_x:[30,1480,25]
                                       (torch.zeros(self.n_layers, self.batch_size, self.hidden_layer_size),
                                        torch.zeros(self.n_layers, self.batch_size, self.hidden_layer_size)))
        en_lstm = en_lstm.contiguous().view(batch_size, -1)
        en_linear = self.linear_en(en_lstm)  # [30,1480,200]
        en_out = self.relu(en_linear)
        # decoder
        de_linear = self.linear_de(en_out)
        de_sigmoid = self.sigmoid(de_linear)  # [30,740000]
        # shape: (n_layers, batch, hidden_size)
        de_sigmoid = de_sigmoid.view([self.batch_size, self.feature_length, self.hidden_layer_size])  # [30,1480,500]
        de_out, (n, c) = self.lstm_de(de_sigmoid,
                                      # 隐层的最后一个维度的与输出的维度相同
                                      (torch.zeros(self.n_layers, self.batch_size, self.embedding_size),
                                       torch.zeros(self.n_layers, self.batch_size, self.embedding_size)))
        return de_out


if __name__ == '__main__':
    # 通过原始的训练数据,训练 Stacked Lstm 并将其保存在本地
    logging.basicConfig(format='%(asctime)s - [line:%(lineno)d] - %(levelname)s: %(message)s',
                        level=logging.DEBUG)
    x, y = get_train_data()
    epochs = 2
    batch_size = 30
    packet_code = 10
    packet_length = 5
    train_loader = Data.DataLoader(
        dataset=Data.TensorDataset(x, y),  # 封装进Data.TensorDataset()类的数据,可以为任意维度
        batch_size=batch_size,  # 每块的大小
        shuffle=True,  # 要不要打乱数据 (打乱比较好)
        num_workers=6,  # 多进程(multiprocess)来读数据
        drop_last=True
    )
    # 建模三件套:loss,优化,epochs
    model = LstmStackedAutoEncoder(packet_length, packet_code, batch_size)  # lstm
    loss_function = nn.MSELoss()  # loss
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)  # 优化器
    # 开始训练
    logging.debug("begin train")
    model.train()
    for ep in range(epochs):
        i = 0
        for seq, labels in train_loader:
            optimizer.zero_grad()
            input_seq = seq.view([batch_size, packet_code, packet_length])
            y_pred = model(input_seq).squeeze()  # 压缩维度:得到输出,并将维度为1的去除
            single_loss = loss_function(y_pred, input_seq)
            single_loss.backward()  # 进入到这一行,报错
            optimizer.step()
            logging.debug("TRAIN Finish" + str(i) + " LOSS " + str(single_loss))
            i += 1
    logging.debug("train finish")
    torch.save(model, 'stacked_auto_encoder.pt')
    logging.debug("save finish")
 类似资料: