import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from merlion.utils import TimeSeries
from merlion.models.factory import ModelFactory
from merlion.models.forecast.arima import Arima, ArimaConfig
from merlion.models.forecast.arima import Sarima, SarimaConfig
from merlion.models.forecast.smoother import MSES, MSESConfig
from merlion.models.forecast.vector_ar import VectorAR,VectorARConfig
from merlion.models.forecast.boostingtrees import LGBMForecasterConfig,LGBMForecaster
from merlion.models.forecast.baggingtrees import RandomForestForecasterConfig,RandomForestForecaster,ExtraTreesForecasterConfig,ExtraTreesForecaster
from merlion.transform.base import Identity
from merlion.transform.resample import TemporalResample
from merlion.evaluate.forecast import ForecastMetric
from sklearn.metrics import accuracy_score
# 参数
target_seq_index = 3 # 标签列索引
max_forecast_steps = [i for i in range(10,130,10)] # 比较不同预测期数时的预测效果
maxlags = 50 # 滞后期数
def read_data(name):
"""
读取数据
"""
df = pd.read_csv(name)
df['datetime'] = pd.to_datetime(df['datetime'])
df = df.set_index('datetime')
return df
def df_to_timeseries(df):
"""
转换格式,划分数据集
"""
train1 = TimeSeries.from_pd(df.iloc[:800,-1:])
train2 = TimeSeries.from_pd(df.iloc[:800,:])
test = TimeSeries.from_pd(df.iloc[800:,:])
return train1,train2,test
def get_target(test,max_forecast_steps):
"""
获取需要预测的目标变量真值
"""
target_univariate = test.univariates[test.names[target_seq_index]]
target = target_univariate[:max_forecast_steps].to_ts()
return target
def build_model(max_forecast_steps,target_seq_index,maxlags):
"""
构造模型
"""
config1 = LGBMForecasterConfig(max_forecast_steps=max_forecast_steps, target_seq_index=target_seq_index,maxlags=maxlags)
model1 = LGBMForecaster(config1)
config2 = RandomForestForecasterConfig(max_forecast_steps=max_forecast_steps, target_seq_index=target_seq_index,maxlags=maxlags)
model2 = RandomForestForecaster(config2)
config3 = ExtraTreesForecasterConfig(max_forecast_steps=max_forecast_steps, target_seq_index=target_seq_index,maxlags=maxlags)
model3 = ExtraTreesForecaster(config3)
config4 = VectorARConfig(max_forecast_steps=max_forecast_steps, target_seq_index=target_seq_index,maxlags=maxlags)
model4 = VectorAR(config4)
return model1,model2,model3,model4
if __name__=='__main__':
result = pd.DataFrame(columns=['model','单/多变量','forecast_steps','rmse','smape'])
data = read_data('EURUSD.csv')
train1,train2,test = df_to_timeseries(data)
# 训练模型
for steps in max_forecast_steps:
target = get_target(test,steps)
model1,model2,model3,model4 = build_model(steps,target_seq_index,maxlags)
for model in [model1,model2,model3,model4]:
print(f"单变量Training {type(model).__name__}...")
for train in [train1,train2]:
train_pred, train_stderr = model.train(train)
forecast, stderr = model.forecast(time_stamps = target.time_stamps)
rmse = ForecastMetric.RMSE.value(ground_truth=target, predict=forecast)
smape = ForecastMetric.sMAPE.value(ground_truth=target, predict=forecast)
print(f"{type(model).__name__}训练变量")
print(f"RMSE: {rmse:.4f}")
print(f"sMAPE: {smape:.4f}")
# result_data,accuracy = assessment(forecast,data)
print()
result = result.append({'model':type(model).__name__,'单/多变量':train.names,'forecast_steps':steps,'rmse':rmse,'smape':smape},ignore_index=True)
# 可视化
fig, ax = model.plot_forecast(time_series=test[:steps])
plt.show()
# fig6.savefig('C:/Users/TANGLINGHUI331/Desktop/Arima预测_6')
print('--------------------------分割线---------------------------')
汇总结果,进行单个模型的单因素/多因素对比
result = result.iloc[:,:5]
result.fillna(method='ffill',inplace=True)
LGBM = result.iloc[np.where(result['model']=='LGBMForecaster')].reset_index()
RF = result.iloc[np.where(result['model']=='RandomForestForecaster')].reset_index()
ETF = result.iloc[np.where(result['model']=='ExtraTreesForecaster')].reset_index()
VAR = result.iloc[np.where(result['model']=='VectorAR')].reset_index()
def draw_figure(data,title):
"""
绘图
"""
plt.rcParams['font.sans-serif']=['SimHei']
plt.rcParams['axes.unicode_minus']=False
plt.rcParams['figure.figsize'] = [10, 8]
plt.rcParams.update({'font.size': 18})
for i in range(0,int(len(data)/2)):
data['单/多变量'][2*i]='单变量'
data['单/多变量'][2*i+1]='多变量'
sns.lineplot(x='forecast_steps',y='smape',style='单/多变量',markers=True,data=data)
plt.title(f'{title}Forecaster')
plt.legend(['单变量','多变量'])
# plt.savefig(f'{title}.jpg')
draw_figure(LGBM,'LGBM')
draw_figure(RF,'RF')
draw_figure(ETF,'ETF')
draw_figure(VAR,'VAR')