1.满足下面条件:
SPARK_HOME
shell variable must point to your local Spark installation
2.安装对应版本的h2o-sparkling
2.3.0为对应已安装的spark版本,保持版本一致,以防报错。
pip install h2o_pysparkling_2.3==2.3.0
3.完整代码:
from pysparkling import *
from h2o.grid.grid_search import H2OGridSearch
from h2o.estimators.gbm import H2OGradientBoostingEstimator#外部模式:不依赖spark集群,不依赖spark集群资源计算,必须关闭spark.dynamicAllocation.enabled=false conf = H2OConf(ss).set_external_cluster_mode().set_user_name('jerry').use_auto_cluster_start().set_num_of_external_h2o_nodes(1).set('HADOOP_USER_NAME','jerry').set_h2o_cluster("10.111.23.70",54321).set_client_ip("10.111.23.70").set_h2o_driver_path("/home/dp/h2odriver/h2odriver-sw2.3.18-hdp2.6-extended.jar").set_mapper_xmx("2G").set_yarn_queue("default").set_cloud_name("h2o_gbm")
#内部模式:依赖spark集群,h2o集群将作业分发在spark集群中,依赖spark集群资源计算
#二者任选其一
#保持默认设置即可
conf = H2OConf(ss)
conf.set_num_h2o_workers(2)
创建H2OContext
hc = H2OContext.getOrCreate(ss, conf)
if df_count \
and df_count > 30000:
percent = 30000 / df_count
# df = df.randomSplit(weights=[1-percent,percent],seed=1234)[1]
df = df.sample(fraction=percent, seed=1234)h2o_df = hc.as_h2o_frame(df, framename="df_h2o")
y_lable_h2o_df = h2o_df[self.y_col]
rest_h2o_df = h2o_df.drop(self.y_col)# 获取后字符列集合,将列值转换为枚举
h2o_rest_allcols_no_label = rest_h2o_df.columns# --------------------------对字符列进行编码----------------------
# 获取字符列h2o DataFame
str_h2o_df = None
# 获取字符列名集合
str_list = list()
if len(h2o_df.types) > 0:
# 获取字符列名集合
str_list = [k for k, v in rest_h2o_df.types.items() if v == 'string']if len(str_list) > 0:
# 如果字符列集合不为空,则将字符列h2o DataFame的每列值进行枚举化
str_h2o_df = rest_h2o_df[str_list].asfactor()# 将非空的字符列集合h2o DataFame 与剩余的数值列后h2o DataFrame,进行拼接
if str_h2o_df:
# 找到剩余数值列集合
rest_list = [i for i in h2o_rest_allcols_no_label if i not in str_list]# 如数值列集合存在,与(编码后的)字符列集合合并
if len(rest_list) > 0:
rest_h2o_df = rest_h2o_df[rest_list].concat(str_h2o_df)
# 否则 将字符列集合赋值给剩余(不带GEO_Y) h2o_df:此时只有spark df只有字符列变量
else:
rest_h2o_df = str_h2o_df# 否则 将只有数值列集合赋值给剩余(不带GEO_Y) h2o_df:此时只有spark df只有数值列
# 不用再赋值,因为删除了GEO_Y后,全部是无需编码的数值列特征# --------------------------对字符列进行编码----------------------
# 需要将标签值列 转换成 枚举类型(离散特征)
y_lable_h2o_df = y_lable_h2o_df.asfactor()
h2o_df = y_lable_h2o_df.concat(rest_h2o_df)predictors = h2o_df.names[:]
# ratios = [0.6, 0.2]
frs = h2o_df.split_frame(ratios=[.7], seed=12345)
train = frs[0]
train.frame_id = "Train"
valid = frs[1]
valid.frame_id = "Validation"if self.y_col \
and self.y_col in predictors:
predictors.remove(self.y_col)# ======================modified =========================
# 'learn_rate': 0.1, 'max_depth': 15, 'ntrees': 100, 'stopping_metric': 'AUTO'}
# gridsearch 交叉验证参数组合gbm_param_grid = {
'learn_rate': 0.06, # default = 0.1,
'max_depth': [5, 9],
# 'max_depth': [7],
'stopping_metric': 'logloss',
}
gbm_grid1 = H2OGridSearch(model=H2OGradientBoostingEstimator,
hyper_params=gbm_param_grid
)gbm_grid1.train(x=predictors, y=self.y_col, training_frame=train, validation_frame=valid, ntrees=100, seed=1)
model_rf = gbm_grid1.get_grid(sort_by='auc', decreasing=True)
# print('tmpaaa')
best_rf1 = model_rf.models[0]
# print(best_rf1)
# print('aaa')
best_rf1_perg1 = best_rf1.model_performance(valid)
print('================= 校验集表现指标 =================', best_rf1_perg1)
print('================= 最优参数组合 ================= ', model_rf.get_hyperparams_dict(model_rf.model_ids[0]))
# print(best_rf1.varimp())importance_cols = dict()
for i in best_rf1.varimp():
# ======================modified =========================# importance_cols.setdefault(i[0],i[3])
# value = i[3]
# if 'e' in str(i[3]):
# arr = str(i[3]).split('e')
# value = float(arr[0]) * math.pow(math.e, int(arr[1]))
importance_cols.setdefault(i[0], i[3])if len(importance_cols) > 0:
no_nan_len = 0
for k, v in importance_cols.items():
if v > 0:
no_nan_len = no_nan_len + 1cumulative_importance_threshold = 0
cumulative_importance_list = list()
for k, v in sorted(importance_cols.items(), key=lambda x: x[1], reverse=True):
if cumulative_importance_threshold <= no_nan_len * threshold:
cumulative_importance_list.append(k)
cumulative_importance_threshold = cumulative_importance_threshold + 1self.low_importance_list = cumulative_importance_list
# print('============重要度========',no_nan_len,'===== ',len(cumulative_importance_list))
initalImportanceList = importance_cols
return initalImportanceList