当前位置: 首页 > 工具软件 > regression > 使用案例 >

回归 regression

谢泽语
2023-12-01
# -*-coding:utf-8
import numpy as np
import matplotlib.pyplot as plt

def loadDataSet(fileName):
    f = open(fileName)
    _numFeat = len(f.readline().split('\t')) - 1    # 训练数据的特征总数
    dataMat = []    
    labelMat = []
    for _line_ in f.readlines():
        _lineArr = []    # 每个训练数据
        _curLine = _line_.strip().split('\t')
        for i in range(_numFeat):
            _lineArr.append(float(_curLine[i]))   # 数据特征
        dataMat.append(_lineArr)     # 训练X
        labelMat.append(float(_curLine[-1]))     # 分类号
    return dataMat,labelMat

def standRegres(xArr,yArr):
    '''
        最小二乘法求W,计算最佳拟合直线
        在用内积预测y的时候,第一维乘以前面的参数x0,第二维乘以输入的变量x1
        最终会得到 y = w[0]*x0 + w[1]*x1
    '''
    _xMat = np.mat(xArr)   # 数据矩阵 m * n  
    _yMat = np.mat(yArr).T # 分类列向量 m * 1
    _xTx = _xMat.T * _xMat # x.T * x
    if np.linalg.det(_xTx) == 0.0:   # 根据矩阵行列式的值判断是否可逆
        print "This matrix is singular, cannot do inverse"
        return 
    ws = _xTx.I * (_xMat.T * _yMat) # (x.T * x)(-1) * x.T * y 
    return ws
    
def lwlr(testPoint,xArr,yArr,k=1.0):
    '''
           局部加权线性回归
           测试数据、训练数据、训练值,参数K
    '''
    _xMat = np.mat(xArr)
    _yMat = np.mat(yArr).T
    m,n = np.shape(_xMat)    # 训练数据数目,特征个数
    _weights = np.mat(np.eye((m)))   # 初始化对角线为1,其余为0的权重向量
    '''
        计算每个样本点对应的权重值
        随着样本点与待预测点距离的递增,权重将以指数级衰减
        参数K控制衰减的速度
    '''
    for i in range(m):  
        _diffMat = testPoint - _xMat[i,:] # 矩阵:测试数据 - 训练数据
        _weights[i,i] = np.exp(_diffMat*_diffMat.T/(-2.0*k**2))    # 权重向量,是一个对角矩阵
    _xTx = _xMat.T * _weights * _xMat     # x.T * w * x --- 2*2   
    if np.linalg.det(_xTx) == 0.0:   # 行列式判断是否可逆
        print "This matrix is singular, cannot do inverse"
        return
    ws = _xTx.I * (_xMat.T * _weights * _yMat) # (2 * 1) 回归系数
    return  testPoint * ws  # 预测值yHat

def lwlrTest(testArr,xArr,yArr,k=1.0):
    '''
        数据集使用加权线性回归
    '''
    m,n = np.shape(testArr)
    yHat = np.zeros(m)
    for i in range(m): # 遍历每个样本点,计算权重值
        yHat[i] = lwlr(testArr[i],xArr,yArr,k)  
    return yHat

def ridgeRegres(xMat,yMat,lam=0.2):
    '''
        岭回归计算回归系数
    '''
    _xTx = xMat.T*xMat   # x.T * x
    _denom = _xTx + np.eye(np.shape(xMat)[1])*lam # x.T * x + lam*A
    if np.linalg.det(_denom) == 0.0: # 行列式判断是否可逆
        print "this matrix is singular, cannot do inverse"
        return
    ws = _denom.I * (xMat.T * yMat)  # 计算回归系数
    return ws

def ridgeTest(xArr,yArr):
    '''
        在一组lam上测试结果
    '''
    _xMat = np.mat(xArr)    
    _yMat = np.mat(yArr).T
    _yMean = np.mean(_yMat,0) # 列求均值
    _yMat = _yMat - _yMean     # 真实值与均值的差值
    _xMeans = np.mean(_xMat,0)# 每个训练特征的均值 
    _xVar = np.var(_xMat,0)   # m每个训练数据特征的方差  (x - xMeans)**2 / n
    _xMat = (_xMat - _xMeans)/_xVar # 训练数据归一化处理
    _numTestPts_= 30 
    wMat = np.zeros((_numTestPts_,np.shape(_xMat)[1]))
    for i in range(_numTestPts_):
        ws = ridgeRegres(_xMat,_yMat,np.exp(i-10))  # 循环30次,lam的值随计算次数指数变化
        wMat[i,:] = ws.T
    return wMat

def regularize(xMat):
    '''
            标准化数据:均值为0,方差为1
     (x-mean(x))/方差
    '''
    inMat = xMat.copy()
    inMeans = np.mean(inMat,0)
    inVar = np.var(inMat,0)
    inMat = (inMat - inMeans)/inVar
    return inMat

def stageWise(xArr,yArr,eps=0.01,numIt=100):
    '''
        前向逐步线性回归
        训练数据、分类标号,每次迭代调整的步长、迭代次数
    '''
    _xMat = np.mat(xArr)
    _yMat = np.mat(yArr).T    # 分类标号列向量
    _yMean = np.mean(_yMat,0) # 结果列均值
    _yMat = _yMat - _yMean    # y - mean(y)
    _xMat = regularize(_xMat) # 标准化训练数据特征
    m,n = np.shape(_xMat)
    returnMat = np.zeros((numIt,n)) # numIt * n
    ws = np.zeros((n,1))    # 初始化系数
    wsTest = ws.copy()
    wsMax = ws.copy()
    for i in range(numIt):
        print ws.T  # 1 * n
        lowestError = np.inf # 初始化误差为无穷大
        '''
                    贪心算法在所有的特征上运行两次for循环,分别计算增加或减少该特征对误差的影响
        '''
        for j in range(n):  # 遍历每一维数据特征
            for sign in [-1,1]: # -1:按步长减小w,+1:按步长增加w
                wsTest = ws.copy()
                wsTest[j] += eps*sign   # -1*eps / +1*eps
                yTest = _xMat*wsTest    # 新的预测结果
                rssE = rssError(_yMat.A,yTest.A) # 计算平方误差
                if rssE < lowestError:  # 如果误差<当前最小误差
                    lowestError = rssE  # 将计算的误差替换为最小误差
                    wsMax = wsTest  # 当前系数
        ws = wsMax.copy()   # 系数保存
        returnMat[i,:]=ws.T # 具有最小误差的系数作为该数据特征的系数
    return returnMat    
    
def rssError(yArr,yHatArr):
    return ((yArr-yHatArr)**2).sum()
    
def configshow(yHat):

    fig = plt.figure()
    ax = fig.add_subplot(111)
    ax.plot(yHat)
    plt.show()

def plotline(xArr,yArr):
    '''
            展示数据点和回归直线
            每组训练数据的第一个是偏移量,第二个是X
    '''
    yHat1 = lwlrTest(xArr,xArr,yArr,k=1.0)
    yHat2 = lwlrTest(xArr,xArr,yArr,k=0.01)
    yHat3 = lwlrTest(xArr,xArr,yArr,k=0.003)
    _xMat = np.mat(xArr)
    _yMat = np.mat(yArr)

    fig = plt.figure()
    srtInd = _xMat[:,1].argsort(0)
    xSort = _xMat[srtInd][:,0,:]
    ax1 = fig.add_subplot(311)
    ax1.scatter(_xMat[:,1].flatten().A[0],_yMat.T[:,0].flatten().A[0],s=2,c='red')
    ax1.plot(xSort[:,1],yHat1[srtInd])
    
    ax2 = fig.add_subplot(312)
    ax2.scatter(_xMat[:,1].flatten().A[0],_yMat.T[:,0].flatten().A[0],s=1,c='red')
    ax2.plot(xSort[:,1],yHat2[srtInd])
    
    ax3 = fig.add_subplot(313)
    ax3.scatter(_xMat[:,1].flatten().A[0],_yMat.T[:,0].flatten().A[0],s=2,c='red')
    ax3.plot(xSort[:,1],yHat3[srtInd])

    plt.show()
    
fileName = 'abalone.txt'
xArr,yArr = loadDataSet(fileName)
print stageWise(xArr,yArr,0.01,200)




 类似资料: