Spark Sql简介
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。
我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!
************** Spark SQL *******************
1、Spark SQL 是Spark套件中的一个模块,他将数据的计算任务通过SQL的形式转换成了RDD的计算,类似于Hive通过SQL的形式将数据的计算任务转换成了MapReduce。
2、Spark SQL的特点:
1、和Spark Core的无缝集成,我可以在写整个RDD应用的时候,配置Spark SQL来实现我的逻辑。
2、统一的数据访问方式,Spark SQL提供标准化的SQL查询。
3、Hive的继承,Spark SQL通过内嵌Hive或者连接外部已经部署好的hive实例,实现了对Hive语法的继承和操作。
4、标准化的连接方式,Spark SQL可以通过启动thrift Server来支持JDBC、ODBC的访问,将自己作为一个BI Server使用。
************* Spark SQL 数据抽象 ***********
0、RDD(Spark1.0)-> DataFrame(Spark1.3)-> DataSet(Spark1.6)
1、Spark SQL提供了DataFrame和DataSet的数据抽象。
2、DataFrame就是RDD + Schema,可以认为是一张二维表格。他的劣势是在编译器不进行表格中的字段的类型检查。在运行期进行检查。
3、DataSet是Spark最新的数据抽象,Spark的发展会逐步将DataSet作为主要的数据抽象,弱化RDD和DataFrame。DataSet包含了DataFrame所有的优化机制。除此之外提供了以样例类为Schema模型的强类型。
4、DataFrame = DataSet[Row]
5、DataFrame和DataSet都有可控的内存管理机制,所有数据都保存在非堆上,都使用了catalyst进行SQL的优化。
************** Spark SQL 客户端查询 **********
1、你可以通过Spark-shell来操作Spark SQL,spark作为SparkSession的变量名,sc作为SparkContext的变量名
2、可以通过Spark提供的方法读取JSOn文件,将JSON文件转换成DataFrame
3、可以通过DataFrame提供的API来操作DataFrame里面的数据。
4、你可以通过将DataFrame注册成为一个临时表的方式,来通过Spark.sql方法运行标准的SQL语句来查询。
************** DataFrame 查询方式
1、DataFrame支持两种查询方式一种是DSL风格,另外一种是SQL风格
1、DSL风格:
1、你需要引入 import spark.implicit._ 这个隐式转换,可以将DataFrame隐式转换成RDD。
2、SQL风格:
1、你需要将DataFrame注册成一张表格,如果你通过CreateTempView这种方式来创建,那么该表格Session有效,如果你通过CreateGlobalTempView来创建,那么该表格跨Session有效,但是SQL语句访问该表格的时候需要加上前缀 global_temp
2、你需要通过sparkSession.sql 方法来运行你的SQL语句。
************** DataSet
1、首先定义一个DataSet,你需要先定义一个Case类。
************** RDD、DataSet、DataFrame之间的转换总结 *************
1、 RDD -> DataFrame : rdd.map(para=>(para(0).trim(),para(1).trim().toInt)).toDF("name","age")
//通过反射来设置
rdd.map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
//通过编程方式来设置Schema,适合于编译期不能确定列的情况
schemaString.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val rdd[Row] = rdd.map(attributes => Row(attributes(0), attributes(1).trim))
val peopeDF = spark.createDataFrame(rdd[Row],schema)
2、 DataFrame -> RDD : dataFrame.rdd 注意输出:Array([Michael,29], [Andy,30], [Justin,19])
1、 RDD -> DataSet : rdd.map(para=> Person(para(0).trim(),para(1).trim().toInt)).toDS
2、 DataSet -> RDD : dataSet.rdd 注意输出: Array(Person(Michael,29), Person(Andy,30), Person(Justin,19))
1、 DataFrame -> DataSet: dataFrame.as[Person]
2、 DataSet -> DataFrame: dataSet.toDF
************** 对于DataFrame Row对象的访问方式
1、DataFrame = DataSet[Row], DataFrame里面每一行都是Row对象
2、如果需要访问Row对象中的每一个元素,你可以通过下标 row(0);你也可以通过列名 row.getAs[String]("name")
************** 应用UDF函数 ***************
1、通过spark.udf.register(name,func)来注册一个UDF函数,name是UDF调用时的标识符,fun是一个函数,用于处理字段。
2、你需要将一个DF或者DS注册为一个临时表。
3、通过spark.sql去运行一个SQL语句,在SQL语句中可以通过 name(列名) 方式来应用UDF函数。
************* UDAF 用户自定义聚合函数
1、弱类型用户自定义聚合函数
1、新建一个Class 继承UserDefinedAggregateFunction ,然后复写方法:
//聚合函数需要输入参数的数据类型
override def inputSchema: StructType = ???
//可以理解为保存聚合函数业务逻辑数据的一个数据结构
override def bufferSchema: StructType = ???
// 返回值的数据类型
override def dataType: DataType = ???
// 对于相同的输入一直有相同的输出
override def deterministic: Boolean = true
//用于初始化你的数据结构
override def initialize(buffer: MutableAggregationBuffer): Unit = ???
//用于同分区内Row对聚合函数的更新操作
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = ???
//用于不同分区对聚合结果的聚合。
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = ???
//计算最终结果
override def evaluate(buffer: Row): Any = ???
2、你需要通过spark.udf.resigter去注册你的UDAF函数。
3、需要通过spark.sql去运行你的SQL语句,可以通过 select UDAF(列名) 来应用你的用户自定义聚合函数。
2、强类型的用户自定义聚合函数
1、新建一个class,继承Aggregator[Employee, Average, Double],其中Employee是在应用聚合函数的时候传入的对象,Average是聚合函数在运行的时候内部需要的数据结构,Double是聚合函数最终需要输出的类型。这些可以根据自己的业务需求去调整。复写想对应的方法:
//用于定义一个聚合函数内部需要的数据结构
override def zero: Average = ???
//针对每个分区内部每一个输入来更新你的数据结构
override def reduce(b: Average, a: Employee): Average = ???
//用于对于不同分区的结构进行聚合
override def merge(b1: Average, b2: Average): Average = ???
//计算输出
override def finish(reduction: Average): Double = ???
//用于数据结构他的转换
override def bufferEncoder: Encoder[Average] = ???
//用于最终结果的转换
override def outputEncoder: Encoder[Double] = ???
2、新建一个UDAF实例,通过DF或者DS的DSL风格语法去应用。
Spark SQL查询
创建json文件
vi people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
将文件传输到其他节点
scala> val people = spark.read.json("/opt/datas/people.json")
people: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> people.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> people.filter($"age" > 20).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
scala> people.createOrReplaceTempView("pls")
scala> spark.sql("select * from pls where age >20").show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
-------IDEA创建SparkSQL程序-------------
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
package com.zhangbk.sparksql
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
object PeopleSelect {
val logger = LoggerFactory.getLogger(PeopleSelect.getClass)
def main(agrs : Array[String]) {
//SparkSession.builder 用于创建一个SparkSession;如果需要Hive支持,则需要以下创建语句:.enableHiveSupport()
val spark = SparkSession.builder().appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value").getOrCreate()
// For implicit conversions like converting RDDs to DataFrames.import spark.implicits._的引入是
// 用于将DataFrames隐式转换成RDD,使df能够使用RDD中的方法。
import spark.implicits._
val pl = spark.read.json("/opt/datas/people.json")
// Displays the content of the DataFrame to stdout
pl.show()
pl.filter($"age" > 21).show()
pl.createOrReplaceTempView("persons")
spark.sql("SELECT * FROM persons where age > 21").show()
spark.stop()
}
}
Spark Sql和Hive的继承
======= 内置Hive =====
1、Spark内置有Hive,Spark2.1.1 内置的Hive是1.2.1。
2、需要将core-site.xml和hdfs-site.xml 拷贝到spark的conf目录下。如果Spark路径下发现metastore_db,需要删除【仅第一次启动的时候】。
3、在你第一次启动创建metastore的时候,你需要指定spark.sql.warehouse.dir这个参数,
比如:bin/spark-shell --conf spark.sql.warehouse.dir=hdfs://master01:9000/spark_warehouse
4、注意,如果你在load数据的时候,需要将数据放到HDFS上。
======= 外部Hive =====
1、需要将hive-site.xml 拷贝到spark的conf目录下。
2、如果hive的metestore使用的是mysql数据库,那么需要将mysql的jdbc驱动包放到spark的jars目录下。
3、可以通过spark-sql或者spark-shell来进行sql的查询。完成和hive的连接。
Spark Sql输入输出
1、对于Spark SQL的输入需要使用 sparkSession.read方法
1、通用模式 sparkSession.read.format("json").load("path") 支持类型:parquet、json、text、csv、orc、jdbc
2、专业模式 sparkSession.read.json、 csv 直接指定类型。
2、对于Spark SQL的输出需要使用 sparkSession.write方法
1、通用模式 dataFrame.write.format("json").save("path") 支持类型:parquet、json、text、csv、orc、
2、专业模式 dataFrame.write.csv("path") 直接指定类型
3、如果你使用通用模式,spark默认parquet是默认格式,sparkSession.read.load 他加载的默认是parquet格式。dataFrame.write.save也是默认保存成parquet格式。
4、如果需要保存成一个text文件,那么需要dataFrame里面只有一列。
案例 加载数据
BYSL00000893,ZHAO,2007-8-23
BYSL00000897,ZHAO,2007-8-24
BYSL00000898,ZHAO,2007-8-25
BYSL00000899,ZHAO,2007-8-26
scala> case class tbStock(orderid:String, location:String, date:String) extends Serializable
scala> val tbStockRdd = spark.sparkContext.textFile("hdfs://ns1:8020/user/spark/tbStock.txt")
scala> val tbStockDS = tbStockRdd.map(_.split(",")).map(para => tbStock(para(0),para(1),para(2))).toDS
scala> tbStockDS.show
+------------+--------+---------+
| orderid|location| date|
+------------+--------+---------+
|BYSL00000893| ZHAO|2007-8-23|
|BYSL00000897| ZHAO|2007-8-24|
|BYSL00000898| ZHAO|2007-8-25|
scala> tbStockDS.createOrReplaceTempView("tbSock")
scala> spark.sql("select * from tbSock limit 10").show
+------------+--------+---------+
| orderid|location| date|
+------------+--------+---------+
|BYSL00000893| ZHAO|2007-8-23|
|BYSL00000897| ZHAO|2007-8-24|
|BYSL00000898| ZHAO|2007-8-25|
|BYSL00000899| ZHAO|2007-8-26|
|BYSL00000900| ZHAO|2007-8-26|
|BYSL00000901| ZHAO|2007-8-27|
|BYSL00000902| ZHAO|2007-8-27|
|BYSL00000904| ZHAO|2007-8-28|
|BYSL00000905| ZHAO|2007-8-28|
|BYSL00000906| ZHAO|2007-8-28|
+------------+--------+---------+