(本文为本人学习工作总结,如有雷同,不胜荣幸。可联系本人立即修改或者删除)
ps:使用的是ML不是MLib,区别是ML更加的抽象,使用的DataSet(Dataframe为其子集),使用了pipeline将数据进入管道。
将高维数据降维
import org.apache.spark.ml.feature.PCA
import org.apache.spark.sql._
import org.apache.spark.functions.udf
#使用sql导入数据,使用dataFrame,将数据map为离散向量。
val df = sql("select c1,c2,c3 from default.pcademo").map{
case Row(c1:Int, c2:Int, c3:Int) => Vectors.dense(c1,c2,c3)
}.collect().map(Tuple1.apply)
#定义PCA模型,K为降低的维度
val dfPCA = sqlContext.createDataFrame(df).toDF("features")
val pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(2)
.fit(dfPCA)
val dfPcaResult = pca.transform(dfPCA)
val dfResult = dfPcaResult.select("pcaFeatures")
val n = dfResult.first.getAs[org.apache.spark.mllib.linalg.Vector](0).size
#将向量转换为数组
val vecToSeq = udf((v: Vector) => v.toArray)
val exprs = (0 until n).map(i => $"_tmp".getItem(i).alias(s"f$i"))
#alias起别名
val saveDF = dfResult.select(vecToSeq($"pcaFeatures").alias("_tmp")).select(exprs:_*)
#保存结果到表
import org.apache.spark.sql.SaveMode
saveDF.write.mode(SaveMode.Overwrite).saveTable("pcatmpTable")
sql("alter table pcatmpTable set lifycycle 28")
/*
##output
import org.apache.spark.ml.feature.PCA
import org.apache.spark.sql._
import org.apache.spark.sql.functions.udf
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vector
df: Array[(org.apache.spark.mllib.linalg.Vector,)] = Array(([2.0,0.0,4.0],), ([4.0,0.0,6.0],))
dfPCA: org.apache.spark.sql.DataFrame = [features: vector]
pca: org.apache.spark.ml.feature.PCAModel = pca_3675926d0a62
dfPcaResult: org.apache.spark.sql.DataFrame = [features: vector, pcaFeatures: vector]
dfResult: org.apache.spark.sql.DataFrame = [pcaFeatures: vector]
n: Int = 2
vecToSeq: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(DoubleType,false),List(org.apache.spark.mllib.linalg.VectorUDT@f71b0bce))
exprs: scala.collection.immutable.IndexedSeq[org.apache.spark.sql.Column] = Vector(_tmp[0] AS f0#28, _tmp[1] AS f1#29)
saveDF: org.apache.spark.sql.DataFrame = [f0: double, f1: double]
import org.apache.spark.sql.SaveMode
res3: org.apache.spark.sql.DataFrame = [result: string]
*/
/*
0: jdbc:hive2://192.168.66.01:10000> select * from pcademo;
+-----+-----+-----+--+
| c1 | c2 | c3 |
+-----+-----+-----+--+
| 2 | 0 | 4 |
| 4 | 0 | 6 |
+-----+-----+-----+--+
2 rows selected (0.989 seconds)
0: jdbc:hive2://192.168.66.01:10000> select * from pcatmptable;
+---------------------+--------------------+--+
| f0 | f1 |
+---------------------+--------------------+--+
| -4.242640687119284 | 1.414213562373095 |
| -7.071067811865474 | 1.414213562373095 |
+---------------------+--------------------+--+
2 rows selected (0.327 seconds)
*/
1.https://www.zhihu.com/question/35225203
2.http://spark.apache.org/docs/latest/ml-features.html#pca