1.读取Hive中的数据加载成DataFrame
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.3.1</version>
</dependency>
object sparkSQL06 {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().appName("test06").enableHiveSupport().getOrCreate()
session.sql("use spark")
//创建表
session.sql("create table person (id int,name String,age int) row format delimited fields terminated by '\t'")
//加载数据
val frame: DataFrame = session.sql("load data local inpath '/root/test/student_infos' into tables student_infos")
//将结果写入hive表中
val df: DataFrame = session.sql("select * from student_infos")
df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")
}
}
2.UDF自定义函数
object sparkSQL07 {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().master("local").appName("test07").getOrCreate()
val nameList = List[String]("zjc", "lff", "ly")
import session.implicits._
val df = nameList.toDF();
df.createTempView("temp")
/**
* 自定义函数需求:统计每个属性的长度
* */
session.udf.register("STRLEN",(name:String)=>{name.length})
session.sql("select name,STRLEN(name) from temp").show()
}
}
3.UDAF函数
object sparkSQL08 {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().master("local").appName("test08").getOrCreate()
val nameList=List[String]("zjc","lff","ergou")
import session.implicits._
val df = nameList.toDF("name")
df.createTempView("mytable")
session.udf.register("MyCount",new MyCount)
session.sql("select name,MyCount(name) from mytable").show()
}
}
//自定义聚合函数
class MyCount extends UserDefinedAggregateFunction{
//输入数据的类型
override def inputSchema: StructType = StructType(List[StructField](StructField("xx",StringType,true)))
//在聚合过程中处理的数据类型
override def bufferSchema: StructType = StructType(List[StructField](StructField("xx",StringType,true)))
//最终返回值的类型,与evaluate返回的值一致
override def dataType: DataType = IntegerType
//多次运行数据是否一致
override def deterministic: Boolean = true
//每个分区中每组key对应的初始值
override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0,0)
//每个分区中,每个分组内进行聚合操作
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer.update(0,buffer.getInt(0)+1)}
//不同的分区中相同的key的数据进行聚合
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1.update(0,buffer1.getInt(0)+buffer2.getInt(0))}
//聚合之后,每个分组最终返回的值,类型要和dataType 一致
override def evaluate(buffer: Row): Any = buffer.getInt(0)
}
4.SparkSQL开窗函数
row_number() over (partition by XXX order by XXX) 同个分组内生成连续的序号,每个分组内从1开始且排序相同的数据会标不同的号。
rank() over (partition by XXX order by XXX) 同个分组内生成不连续的序号,在每个分组内从1开始,同个分组内相同数据标号相同。
dense_rank() over (partitin by XXX order by XXX) 同个分组内生成连续的序号,在每个分组内从1开始,同个分组内相同数据标号相同,之后的数据标号连续。
mport org.apache.spark.sql.SparkSession
/**
* 找出每个
* */
object sparkSQL09 {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().appName("test09").enableHiveSupport().getOrCreate()
session.sql("use spark")
session.sql("create table if not exists sales (riqi string,leibie string,jine Int) " +
"row format delimited fields terminated by '\t'")
session.sql("load data local inpath '/root/test/sales' into table sales")
session.sql("select * from " +
"(select * row_number() over (partition by riqi order by riqi desc)").show()
}
}