当前位置: 首页 > 工具软件 > udf-sample > 使用案例 >

schema.UDF

贡建修
2023-12-01

一.定义schema的三种方法

//1.编程法(复杂不易维护)
val schema = StructType(
List(
StructField("id",StringType,true),
StructField("type",StringType,true),
StructField("loation",StringType(List(
StructField("lititude",DoubleType,false),
StructField("id",StringType,false)
)),false)
)
)
//2.从样例类中引用(推荐)
  //定义样例类结构
import org.apache.spark.sql.Encoders
 case class Coordinates(latitude:Double,Longitude:Double)
 case class Vehicle(id:String,type:String,location:Coordinates)
//3通过Encoder定义schema
 val schema = Encoders.product[Vehicle].schema
//3.从数据集中提取
val sample = spark.read.parquet(<path-to-sample>)
val schema = sample.schema

二.自定义UDF函数,将id转化为int

//自定义UDF函数,将id转化为int
 val id2int= udf(
          (student_id:String) =>{
            student_id.split("_")(1).toInt
          }
        )

val studentIdDF: DataFrame = answerDF.select(id2int('student_id) ).alias("student_id")

三.解析json

/**
 * 学生答题信息样例类
 */
case class Answer(student_id: String, //学生ID
                  textbook_id: String, //教材ID
                  grade_id: String, //年级ID
                  subject_id: String, //科目ID
                  chapter_id: String,//章节ID
                  question_id: String,//题目ID
                  score: Int,//题目得分,0~10分
                  answer_time: String,//答题提交时间,yyyy-MM-dd HH:mm:ss字符串形式
                  ts: Timestamp //答题提交时间,时间戳形式
                 ) extends Serializable

 import com.google.gson.Gson
//---数据预处理
    //解析json-方式1:
    /*valueDS.select(
      get_json_object($"value", "$.student_id").as("student_id"),
      //.....
    )*/

    //解析json-方式2:将每一条json字符串解析为一个样例类对象
    val answerDS: Dataset[Answer] = valueDS.map(josnStr => {
      val gson = new Gson()
      //json--->对象
      gson.fromJson(josnStr, classOf[Answer])
    })
 // val ds: Dataset[Row] =
  wordsDF.select(functions.from_json(functions.col("value").cast("string"), schema).alias("parse_value"))

//指定的Json反序列化为指定类的对象。
   val answerDF: DataFrame = rdd.coalesce(1).map(jsonStr => {
          val gson = new Gson()
          gson.fromJson(jsonStr, classOf[Answer])
        }).toDF()

四.字符串拼接

val resultDf: DataFrame = recommendDF.as[(Int, Array[(Int, Float)])].map(t => {
          val studentIdStr: String = "学生ID_" + t._1
          val questionIdStr: String = t._2.map("题目ID_" + _._2).mkString(",")
          (studentIdStr, questionIdStr)
        }).toDF("student_id", "recommendations")```



 类似资料:

相关阅读

相关文章

相关问答