当前位置: 首页 > 面试题库 >

在使用Keras的深度学习中如何利用多处理和多线程的优势?

洪和平
2023-03-14
问题内容

我以为大多数框架(例如keras / tensorflow /
…)会自动使用所有CPU内核,但实际上似乎并非如此。我只是发现很少的资源可以导致我们在深度学习过程中使用CPU的全部容量。我找到了一篇有关

from multiprocessing import Pool 
import psutil
import ray

另一方面,基于在多个过程中使用keras模型的答案,没有上述库的踪迹。是否有更优雅的方式利用Keras的
多处理功能 ,因为它在实施中非常受欢迎。

  • 例如,如何在学习过程中通过简单的RNN实施进行修改,以达到至少50%的CPU容量?

  • 我应该使用第二模型作为多任务处理(如LSTM)吗?我的意思是我们可以通过使用更多的CPU能力同时管理运行多模型吗?

    import numpy as np
    import pandas as pd
    import matplotlib.pyplot as plt

    from keras.layers.normalization import BatchNormalization
    from sklearn.preprocessing import MinMaxScaler
    from sklearn.metrics import mean_squared_error
    from sklearn.metrics import accuracy_score
    from sklearn.model_selection import train_test_split
    from keras.layers import Dense
    from keras.layers import Dropout
    from keras.layers import LSTM,SimpleRNN
    from keras.models import Sequential
    from keras.optimizers import Adam, RMSprop

    df = pd.read_csv(“D:\Train.csv”, header=None)

    index = [i for i in list(range(1440)) if i%3==2]

    Y_train= df[index]
    df = df.values

    def create_dataset(dataset,data_train,look_back=1):
    dataX,dataY = [],[]
    print(“Len:”,len(dataset)-look_back-1)
    for i in range(len(dataset)-look_back-1):
    a = dataset[i:(i+look_back), :]
    dataX.append(a)
    dataY.append(data_train[i + look_back, :])
    return np.array(dataX), np.array(dataY)

    Y_train=np.array(Y_train)
    df=np.array(df)

    look_back = 10
    trainX,trainY = create_dataset(df,Y_train, look_back=look_back)

    Split data into train & test

    trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)

    Shape of train and test data

    trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)
    print(“train size: {}”.format(trainX.shape))
    print(“train Label size: {}”.format(trainY.shape))
    print(“test size: {}”.format(testX.shape))
    print(“test Label size: {}”.format(testY.shape))

    train size: (23, 10, 1440)

    train Label size: (23, 960)

    test size: (6, 10, 1440)

    test Label size: (6, 960)

    model_RNN = Sequential()
    model_RNN.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
    model_RNN.add(Dense(960))
    model_RNN.add(BatchNormalization())
    model_RNN.add(Activation(‘tanh’))

    Compile model

    model_RNN.compile(loss=’mean_squared_error’, optimizer=’adam’)
    callbacks = [
    EarlyStopping(patience=10, verbose=1),
    ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1)]

    Fit the model

    hist_RNN=model_RNN.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)

    predict

    Y_train=np.array(trainY)
    Y_test=np.array(testX)

    Y_RNN_Train_pred=model_RNN.predict(trainX)
    Y_RNN_Test_pred=model_RNN.predict(testX)

    train_MSE=mean_squared_error(trainY, Y_RNN_Train_pred)
    test_MSE=mean_squared_error(testY, Y_RNN_Test_pred)

    create and fit the Simple LSTM model as 2nd model for multi-tasking

    model_LSTM = Sequential()

    model_LSTM.add(LSTM(units = 1440, input_shape=(trainX.shape[1], trainX.shape[2])))

    model_LSTM.add(Dense(units = 960))

    model_LSTM.add(BatchNormalization())

    model_LSTM.add(Activation(‘tanh’))

    model_LSTM.compile(loss=’mean_squared_error’, optimizer=’adam’)

    hist_LSTM=model_LSTM.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)

    Y_train=np.array(trainY)

    Y_test=np.array(testX)

    Y_LSTM_Train_pred=model_LSTM.predict(trainX)

    Y_LSTM_Test_pred=model_LSTM.predict(testX)

    train_MSE=mean_squared_error(trainY, Y_LSTM_Train_pred)

    test_MSE=mean_squared_error(testY, Y_LSTM_Test_pred)

    plot losses for RNN + LSTM

    f, ax = plt.subplots(figsize=(20, 15))
    plt.subplot(1, 2, 1)
    ax=plt.plot(hist_RNN.history[‘loss’] ,label=’Train loss’)
    ax=plt.plot(hist_RNN.history[‘val_loss’],label=’Test/Validation/Prediction loss’)
    plt.xlabel(‘Training steps (Epochs = 50)’)
    plt.ylabel(‘Loss (MSE) for Sx-Sy & Sxy’)
    plt.title(‘ RNN Loss on Train and Test data’)
    plt.legend()
    plt.subplot(1, 2, 2)
    ax=plt.plot(hist_LSTM.history[‘loss’] ,label=’Train loss’)
    ax=plt.plot(hist_LSTM.history[‘val_loss’],label=’Test/Validation/Prediction loss’)
    plt.xlabel(‘Training steps (Epochs = 50)’)
    plt.ylabel(‘Loss (MSE) for Sx-Sy & Sxy’)
    plt.title(‘LSTM Loss on Train and Test data’)
    plt.legend()

    plt.subplots_adjust(top=0.80, bottom=0.38, left=0.12, right=0.90, hspace=0.37, wspace=0.28)
    #plt.savefig('All_Losses_history_.png')
    plt.show()
    

请注意 ,仅当我访问没有VGA的功能强大的服务器时,我才访问 CUDA
。我的目标是利用多处理和多线程来最大程度地利用CPU而不是30%的容量,这意味着只有四核才能拥有一个核!任何建议将不胜感激。我已经上传了格式化的csv数据集。

更新: 我的硬件配置如下:

  • 处理器:AMD A8-7650K Radeon R7 10 Compute Cores 4C + 6G 3.30 GHz
  • 内存:16GB
  • 操作系统:Win 7
  • Python版本3.6.6
  • Tensorflow版本1.8.0
  • Keras版本2.2.4

问题答案:

训练一个模型不会占用您全部100%的CPU是一件好事!现在,我们有空间并行训练多个模型,并加快您的总体训练时间。

注意:如果您只是想加速此模型,请查看GPU或更改超参数,例如批大小和神经元数量(层大小)。

这是用于multiprocessing同时训练多个模型的方法(使用在计算机的每个单独CPU内核上并行运行的进程)。

multiprocessing.Pool基本创建的作业池需要做。流程将拾取并运行这些作业。作业完成后,该过程将从池中提取另一个作业。

import time
import signal
import multiprocessing

def init_worker():
    ''' Add KeyboardInterrupt exception to mutliprocessing workers '''
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def train_model(layer_size):
    '''
    This code is parallelised and runs on each process
    It trains a model with different layer sizes (hyperparameters)
    It saves the model and returns the score (error)
    '''
    import keras
    from keras.models import Sequential
    from keras.layers import Dense

    print(f'Training a model with layer size {layer_size}')

    # build your model here
    model_RNN = Sequential()
    model_RNN.add(Dense(layer_size))

    # fit the model (the bit that takes time!)
    model_RNN.fit(...)

    # lets demonstrate with a sleep timer
    time.sleep(5)

    # save trained model to a file
    model_RNN.save(...)

    # you can also return values eg. the eval score
    return model_RNN.evaluate(...)


num_workers = 4
hyperparams = [800, 960, 1100]

pool = multiprocessing.Pool(num_workers, init_worker)

scores = pool.map(train_model, hyperparams)

print(scores)

输出:

Training a model with layer size 800
Training a model with layer size 960
Training a model with layer size 1100
[{'size':960,'score':1.0}, {'size':800,'score':1.2}, {'size':1100,'score':0.7}]

time.sleep代码中的a很容易说明这一点。您会看到所有3个过程都开始了培训工作,然后它们都几乎同时完成。如果这是单个处理的,则必须等待每个处理完成后才能开始下一个(打哈欠!)。

EDIT OP还需要完整的代码。这在Stack
Overflow上很困难,因为我无法在您的环境和您的代码中进行测试。我已经自由地将代码复制并粘贴到上面的模板中。您可能需要添加一些导入,但这与您获得“可运行的”和“完整的”代码非常接近。

import time
import signal
import numpy as np
import pandas as pd
import multiprocessing
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score


def init_worker():
    ''' Add KeyboardInterrupt exception to mutliprocessing workers '''
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def train_model(model_type):
    '''
    This code is parallelised and runs on each process
    It trains a model with different layer sizes (hyperparameters)
    It saves the model and returns the score (error)
    '''
    from keras.layers import LSTM, SimpleRNN, Dense, Activation
    from keras.models import Sequential
    from keras.callbacks import EarlyStopping, ReduceLROnPlateau
    from keras.layers.normalization import BatchNormalization

    print(f'Training a model: {model_type}')

    callbacks = [
        EarlyStopping(patience=10, verbose=1),
        ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1),
    ]

    model = Sequential()

    if model_type == 'rnn':
        model.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
    elif model_type == 'lstm':
        model.add(LSTM(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))

    model.add(Dense(480))
    model.add(BatchNormalization())
    model.add(Activation('tanh'))
    model.compile(loss='mean_squared_error', optimizer='adam')
    model.fit(
        trainX,
        trainY,
        epochs=50,
        batch_size=20,
        validation_data=(testX, testY),
        verbose=1,
        callbacks=callbacks,
    )

    # predict
    Y_Train_pred = model.predict(trainX)
    Y_Test_pred = model.predict(testX)

    train_MSE = mean_squared_error(trainY, Y_Train_pred)
    test_MSE = mean_squared_error(testY, Y_Test_pred)

    # you can also return values eg. the eval score
    return {'type': model_type, 'train_MSE': train_MSE, 'test_MSE': test_MSE}


# Your code
# ---------

df = pd.read_csv("D:\Train.csv", header=None)

index = [i for i in list(range(1440)) if i % 3 == 2]

Y_train = df[index]
df = df.values

# making history by using look-back to prediction next
def create_dataset(dataset, data_train, look_back=1):
    dataX, dataY = [], []
    print("Len:", len(dataset) - look_back - 1)
    for i in range(len(dataset) - look_back - 1):
        a = dataset[i : (i + look_back), :]
        dataX.append(a)
        dataY.append(data_train[i + look_back, :])
    return np.array(dataX), np.array(dataY)


Y_train = np.array(Y_train)
df = np.array(df)

look_back = 10
trainX, trainY = create_dataset(df, Y_train, look_back=look_back)

# Split data into train & test
trainX, testX, trainY, testY = train_test_split(
    trainX, trainY, test_size=0.2, shuffle=False
)

# My Code
# -------

num_workers = 2
model_types = ['rnn', 'lstm']

pool = multiprocessing.Pool(num_workers, init_worker)

scores = pool.map(train_model, model_types)

print(scores)

程序输出:

[{'type': 'rnn', 'train_MSE': 0.06648435491248038, 'test_MSE': 0.062323388902691866}, 
 {'type': 'lstm', 'train_MSE': 0.10114341514420684, 'test_MSE': 0.09998065769499974}]


 类似资料:
  • 译者 bruce1408 作者: Robert Guthrie 本文带您进入pytorch框架进行深度学习编程的核心思想。Pytorch的很多概念(比如计算图抽象和自动求导)并非它所独有的,和其他深度学习框架相关。 我写这篇教程是专门针对那些从未用任何深度学习框架(例如:Tensorflow, Theano, Keras, Dynet)编写代码而从事NLP领域的人。我假设你已经知道NLP领域要解决

  • 问题内容: 我正在使用Keras与Tensorflow作为后端。 我正在尝试在主流程中保存模型,然后在另一个流程中加载/运行(即调用)。 我目前正在尝试从文档中使用天真的方法来保存/加载模型:https : //keras.io/getting-started/faq/#how-can-i-save-a- keras-model 。 所以基本上: 在主要过程中 在子进程中 在子进程中 但是,它只是

  • 本文向大家介绍C#多线程学习之(四)使用线程池进行多线程的自动管理,包括了C#多线程学习之(四)使用线程池进行多线程的自动管理的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了C#多线程学习之使用线程池进行多线程的自动管理。分享给大家供大家参考。具体如下: 在多线程的程序中,经常会出现两种情况: 一种情况:   应用程序中,线程把大部分的时间花费在等待状态,等待某个事件发生,然后才能给予响应

  • 这就是Keras Keras是一个高层神经网络库,Keras由纯Python编写而成并基Tensorflow或Theano。Keras 为支持快速实验而生,能够把你的idea迅速转换为结果,如果你有如下需求,请选择Keras: 简易和快速的原型设计(keras具有高度模块化,极简,和可扩充特性) 支持CNN和RNN,或二者的结合 支持任意的链接方案(包括多输入和多输出训练) 无缝CPU和GPU切换

  • Keras 是一个高层神经网络 API,Keras 由纯 Python 编写而成并基 Tensorflow、Theano 以及 CNTK 后端。Keras 为支持快速实验而生,能够把你的idea迅速转换为结果,如果你有如下需求,请选择 Keras: 简易和快速的原型设计(keras具有高度模块化,极简,和可扩充特性) 支持 CNN 和 RNN,或二者的结合 无缝 CPU 和 GPU 切换 Kera

  • 停止更新通知 Hi all,十分感谢大家对keras-cn的支持,本文档从我读书的时候开始维护,到现在已经快两年了。这个过程中我通过翻译文档,为同学们debug和答疑学到了很多东西,也很开心能帮到一些同学。 从2017年我工作以后,由于工作比较繁忙,更新频率有所下降。到今年早期的时候这种情况更加严重,加之我了解到,keras官方已经出了中文文档,更觉本份文档似乎应该已经基本完成了其历史使命,该到了