一.定义schema的三种方法
//1.编程法(复杂不易维护)
val schema = StructType(
List(
StructField("id",StringType,true),
StructField("type",StringType,true),
StructField("loation",StringType(List(
StructField("lititude",DoubleType,false),
StructField("id",StringType,false)
)),false)
)
)
//2.从样例类中引用(推荐)
//定义样例类结构
import org.apache.spark.sql.Encoders
case class Coordinates(latitude:Double,Longitude:Double)
case class Vehicle(id:String,type:String,location:Coordinates)
//3通过Encoder定义schema
val schema = Encoders.product[Vehicle].schema
//3.从数据集中提取
val sample = spark.read.parquet(<path-to-sample>)
val schema = sample.schema
二.自定义UDF函数,将id转化为int
//自定义UDF函数,将id转化为int
val id2int= udf(
(student_id:String) =>{
student_id.split("_")(1).toInt
}
)
val studentIdDF: DataFrame = answerDF.select(id2int('student_id) ).alias("student_id")
三.解析json
/**
* 学生答题信息样例类
*/
case class Answer(student_id: String, //学生ID
textbook_id: String, //教材ID
grade_id: String, //年级ID
subject_id: String, //科目ID
chapter_id: String,//章节ID
question_id: String,//题目ID
score: Int,//题目得分,0~10分
answer_time: String,//答题提交时间,yyyy-MM-dd HH:mm:ss字符串形式
ts: Timestamp //答题提交时间,时间戳形式
) extends Serializable
import com.google.gson.Gson
//---数据预处理
//解析json-方式1:
/*valueDS.select(
get_json_object($"value", "$.student_id").as("student_id"),
//.....
)*/
//解析json-方式2:将每一条json字符串解析为一个样例类对象
val answerDS: Dataset[Answer] = valueDS.map(josnStr => {
val gson = new Gson()
//json--->对象
gson.fromJson(josnStr, classOf[Answer])
})
// val ds: Dataset[Row] =
wordsDF.select(functions.from_json(functions.col("value").cast("string"), schema).alias("parse_value"))
//指定的Json反序列化为指定类的对象。
val answerDF: DataFrame = rdd.coalesce(1).map(jsonStr => {
val gson = new Gson()
gson.fromJson(jsonStr, classOf[Answer])
}).toDF()
四.字符串拼接
val resultDf: DataFrame = recommendDF.as[(Int, Array[(Int, Float)])].map(t => {
val studentIdStr: String = "学生ID_" + t._1
val questionIdStr: String = t._2.map("题目ID_" + _._2).mkString(",")
(studentIdStr, questionIdStr)
}).toDF("student_id", "recommendations")```