# -*-coding:utf-8
import numpy as np
import matplotlib.pyplot as plt
def loadDataSet(fileName):
f = open(fileName)
_numFeat = len(f.readline().split('\t')) - 1 # 训练数据的特征总数
dataMat = []
labelMat = []
for _line_ in f.readlines():
_lineArr = [] # 每个训练数据
_curLine = _line_.strip().split('\t')
for i in range(_numFeat):
_lineArr.append(float(_curLine[i])) # 数据特征
dataMat.append(_lineArr) # 训练X
labelMat.append(float(_curLine[-1])) # 分类号
return dataMat,labelMat
def standRegres(xArr,yArr):
'''
最小二乘法求W,计算最佳拟合直线
在用内积预测y的时候,第一维乘以前面的参数x0,第二维乘以输入的变量x1
最终会得到 y = w[0]*x0 + w[1]*x1
'''
_xMat = np.mat(xArr) # 数据矩阵 m * n
_yMat = np.mat(yArr).T # 分类列向量 m * 1
_xTx = _xMat.T * _xMat # x.T * x
if np.linalg.det(_xTx) == 0.0: # 根据矩阵行列式的值判断是否可逆
print "This matrix is singular, cannot do inverse"
return
ws = _xTx.I * (_xMat.T * _yMat) # (x.T * x)(-1) * x.T * y
return ws
def lwlr(testPoint,xArr,yArr,k=1.0):
'''
局部加权线性回归
测试数据、训练数据、训练值,参数K
'''
_xMat = np.mat(xArr)
_yMat = np.mat(yArr).T
m,n = np.shape(_xMat) # 训练数据数目,特征个数
_weights = np.mat(np.eye((m))) # 初始化对角线为1,其余为0的权重向量
'''
计算每个样本点对应的权重值
随着样本点与待预测点距离的递增,权重将以指数级衰减
参数K控制衰减的速度
'''
for i in range(m):
_diffMat = testPoint - _xMat[i,:] # 矩阵:测试数据 - 训练数据
_weights[i,i] = np.exp(_diffMat*_diffMat.T/(-2.0*k**2)) # 权重向量,是一个对角矩阵
_xTx = _xMat.T * _weights * _xMat # x.T * w * x --- 2*2
if np.linalg.det(_xTx) == 0.0: # 行列式判断是否可逆
print "This matrix is singular, cannot do inverse"
return
ws = _xTx.I * (_xMat.T * _weights * _yMat) # (2 * 1) 回归系数
return testPoint * ws # 预测值yHat
def lwlrTest(testArr,xArr,yArr,k=1.0):
'''
数据集使用加权线性回归
'''
m,n = np.shape(testArr)
yHat = np.zeros(m)
for i in range(m): # 遍历每个样本点,计算权重值
yHat[i] = lwlr(testArr[i],xArr,yArr,k)
return yHat
def ridgeRegres(xMat,yMat,lam=0.2):
'''
岭回归计算回归系数
'''
_xTx = xMat.T*xMat # x.T * x
_denom = _xTx + np.eye(np.shape(xMat)[1])*lam # x.T * x + lam*A
if np.linalg.det(_denom) == 0.0: # 行列式判断是否可逆
print "this matrix is singular, cannot do inverse"
return
ws = _denom.I * (xMat.T * yMat) # 计算回归系数
return ws
def ridgeTest(xArr,yArr):
'''
在一组lam上测试结果
'''
_xMat = np.mat(xArr)
_yMat = np.mat(yArr).T
_yMean = np.mean(_yMat,0) # 列求均值
_yMat = _yMat - _yMean # 真实值与均值的差值
_xMeans = np.mean(_xMat,0)# 每个训练特征的均值
_xVar = np.var(_xMat,0) # m每个训练数据特征的方差 (x - xMeans)**2 / n
_xMat = (_xMat - _xMeans)/_xVar # 训练数据归一化处理
_numTestPts_= 30
wMat = np.zeros((_numTestPts_,np.shape(_xMat)[1]))
for i in range(_numTestPts_):
ws = ridgeRegres(_xMat,_yMat,np.exp(i-10)) # 循环30次,lam的值随计算次数指数变化
wMat[i,:] = ws.T
return wMat
def regularize(xMat):
'''
标准化数据:均值为0,方差为1
(x-mean(x))/方差
'''
inMat = xMat.copy()
inMeans = np.mean(inMat,0)
inVar = np.var(inMat,0)
inMat = (inMat - inMeans)/inVar
return inMat
def stageWise(xArr,yArr,eps=0.01,numIt=100):
'''
前向逐步线性回归
训练数据、分类标号,每次迭代调整的步长、迭代次数
'''
_xMat = np.mat(xArr)
_yMat = np.mat(yArr).T # 分类标号列向量
_yMean = np.mean(_yMat,0) # 结果列均值
_yMat = _yMat - _yMean # y - mean(y)
_xMat = regularize(_xMat) # 标准化训练数据特征
m,n = np.shape(_xMat)
returnMat = np.zeros((numIt,n)) # numIt * n
ws = np.zeros((n,1)) # 初始化系数
wsTest = ws.copy()
wsMax = ws.copy()
for i in range(numIt):
print ws.T # 1 * n
lowestError = np.inf # 初始化误差为无穷大
'''
贪心算法在所有的特征上运行两次for循环,分别计算增加或减少该特征对误差的影响
'''
for j in range(n): # 遍历每一维数据特征
for sign in [-1,1]: # -1:按步长减小w,+1:按步长增加w
wsTest = ws.copy()
wsTest[j] += eps*sign # -1*eps / +1*eps
yTest = _xMat*wsTest # 新的预测结果
rssE = rssError(_yMat.A,yTest.A) # 计算平方误差
if rssE < lowestError: # 如果误差<当前最小误差
lowestError = rssE # 将计算的误差替换为最小误差
wsMax = wsTest # 当前系数
ws = wsMax.copy() # 系数保存
returnMat[i,:]=ws.T # 具有最小误差的系数作为该数据特征的系数
return returnMat
def rssError(yArr,yHatArr):
return ((yArr-yHatArr)**2).sum()
def configshow(yHat):
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(yHat)
plt.show()
def plotline(xArr,yArr):
'''
展示数据点和回归直线
每组训练数据的第一个是偏移量,第二个是X
'''
yHat1 = lwlrTest(xArr,xArr,yArr,k=1.0)
yHat2 = lwlrTest(xArr,xArr,yArr,k=0.01)
yHat3 = lwlrTest(xArr,xArr,yArr,k=0.003)
_xMat = np.mat(xArr)
_yMat = np.mat(yArr)
fig = plt.figure()
srtInd = _xMat[:,1].argsort(0)
xSort = _xMat[srtInd][:,0,:]
ax1 = fig.add_subplot(311)
ax1.scatter(_xMat[:,1].flatten().A[0],_yMat.T[:,0].flatten().A[0],s=2,c='red')
ax1.plot(xSort[:,1],yHat1[srtInd])
ax2 = fig.add_subplot(312)
ax2.scatter(_xMat[:,1].flatten().A[0],_yMat.T[:,0].flatten().A[0],s=1,c='red')
ax2.plot(xSort[:,1],yHat2[srtInd])
ax3 = fig.add_subplot(313)
ax3.scatter(_xMat[:,1].flatten().A[0],_yMat.T[:,0].flatten().A[0],s=2,c='red')
ax3.plot(xSort[:,1],yHat3[srtInd])
plt.show()
fileName = 'abalone.txt'
xArr,yArr = loadDataSet(fileName)
print stageWise(xArr,yArr,0.01,200)