当前位置: 首页 > 工具软件 > ML-NOTE > 使用案例 >

SparkML-note-PCA

华锦
2023-12-01

(本文为本人学习工作总结,如有雷同,不胜荣幸。可联系本人立即修改或者删除)

spark ML机器学习

问题:什么是主成分分析(PCA)?

解答:

ps:使用的是ML不是MLib,区别是ML更加的抽象,使用的DataSet(Dataframe为其子集),使用了pipeline将数据进入管道。

PCA解析

将高维数据降维

代码实现(参考官网):

import org.apache.spark.ml.feature.PCA
import org.apache.spark.sql._
import org.apache.spark.functions.udf

#使用sql导入数据,使用dataFrame,将数据map为离散向量。
val df = sql("select c1,c2,c3 from default.pcademo").map{
  case Row(c1:Int, c2:Int, c3:Int) => Vectors.dense(c1,c2,c3)
}.collect().map(Tuple1.apply)

#定义PCA模型,K为降低的维度
val dfPCA = sqlContext.createDataFrame(df).toDF("features")
val pca = new PCA()
    .setInputCol("features")
    .setOutputCol("pcaFeatures")
    .setK(2)
    .fit(dfPCA)
val dfPcaResult = pca.transform(dfPCA)

val dfResult = dfPcaResult.select("pcaFeatures")
val n = dfResult.first.getAs[org.apache.spark.mllib.linalg.Vector](0).size
#将向量转换为数组
val vecToSeq = udf((v: Vector) => v.toArray)
val exprs = (0 until n).map(i => $"_tmp".getItem(i).alias(s"f$i"))
#alias起别名
val saveDF = dfResult.select(vecToSeq($"pcaFeatures").alias("_tmp")).select(exprs:_*)

#保存结果到表
import org.apache.spark.sql.SaveMode
saveDF.write.mode(SaveMode.Overwrite).saveTable("pcatmpTable")
sql("alter table pcatmpTable set lifycycle 28")

/*
##output
import org.apache.spark.ml.feature.PCA
import org.apache.spark.sql._
import org.apache.spark.sql.functions.udf
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vector
df: Array[(org.apache.spark.mllib.linalg.Vector,)] = Array(([2.0,0.0,4.0],), ([4.0,0.0,6.0],))
dfPCA: org.apache.spark.sql.DataFrame = [features: vector]
pca: org.apache.spark.ml.feature.PCAModel = pca_3675926d0a62
dfPcaResult: org.apache.spark.sql.DataFrame = [features: vector, pcaFeatures: vector]
dfResult: org.apache.spark.sql.DataFrame = [pcaFeatures: vector]
n: Int = 2
vecToSeq: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(DoubleType,false),List(org.apache.spark.mllib.linalg.VectorUDT@f71b0bce))
exprs: scala.collection.immutable.IndexedSeq[org.apache.spark.sql.Column] = Vector(_tmp[0] AS f0#28, _tmp[1] AS f1#29)
saveDF: org.apache.spark.sql.DataFrame = [f0: double, f1: double]
import org.apache.spark.sql.SaveMode
res3: org.apache.spark.sql.DataFrame = [result: string]
*/
/*
0: jdbc:hive2://192.168.66.01:10000> select * from pcademo;
+-----+-----+-----+--+
| c1  | c2  | c3  |
+-----+-----+-----+--+
| 2   | 0   | 4   |
| 4   | 0   | 6   |
+-----+-----+-----+--+
2 rows selected (0.989 seconds)

0: jdbc:hive2://192.168.66.01:10000> select * from pcatmptable;
+---------------------+--------------------+--+
|         f0          |         f1         |
+---------------------+--------------------+--+
| -4.242640687119284  | 1.414213562373095  |
| -7.071067811865474  | 1.414213562373095  |
+---------------------+--------------------+--+
2 rows selected (0.327 seconds)
*/

参考

1.https://www.zhihu.com/question/35225203
2.http://spark.apache.org/docs/latest/ml-features.html#pca

 类似资料: