当前位置: 首页 > 工具软件 > Spark Kernel > 使用案例 >

Spark之Spark-Sql

公孙芷阳
2023-12-01

Spark Sql简介

       Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。

        我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!

************** Spark SQL *******************

1、Spark SQL 是Spark套件中的一个模块,他将数据的计算任务通过SQL的形式转换成了RDD的计算,类似于Hive通过SQL的形式将数据的计算任务转换成了MapReduce。
2、Spark SQL的特点:
      1、和Spark Core的无缝集成,我可以在写整个RDD应用的时候,配置Spark SQL来实现我的逻辑。
      2、统一的数据访问方式,Spark SQL提供标准化的SQL查询。
      3、Hive的继承,Spark SQL通过内嵌Hive或者连接外部已经部署好的hive实例,实现了对Hive语法的继承和操作。
      4、标准化的连接方式,Spark SQL可以通过启动thrift Server来支持JDBC、ODBC的访问,将自己作为一个BI Server使用。

*************  Spark SQL 数据抽象  ***********

0、RDD(Spark1.0)-> DataFrame(Spark1.3)-> DataSet(Spark1.6)
1、Spark SQL提供了DataFrame和DataSet的数据抽象。
2、DataFrame就是RDD + Schema,可以认为是一张二维表格。他的劣势是在编译器不进行表格中的字段的类型检查。在运行期进行检查。
3、DataSet是Spark最新的数据抽象,Spark的发展会逐步将DataSet作为主要的数据抽象,弱化RDD和DataFrame。DataSet包含了DataFrame所有的优化机制。除此之外提供了以样例类为Schema模型的强类型。
4、DataFrame = DataSet[Row]
5、DataFrame和DataSet都有可控的内存管理机制,所有数据都保存在非堆上,都使用了catalyst进行SQL的优化。

************** Spark SQL 客户端查询 **********
1、你可以通过Spark-shell来操作Spark SQL,spark作为SparkSession的变量名,sc作为SparkContext的变量名
2、可以通过Spark提供的方法读取JSOn文件,将JSON文件转换成DataFrame
3、可以通过DataFrame提供的API来操作DataFrame里面的数据。
4、你可以通过将DataFrame注册成为一个临时表的方式,来通过Spark.sql方法运行标准的SQL语句来查询。


************** DataFrame 查询方式

1、DataFrame支持两种查询方式一种是DSL风格,另外一种是SQL风格

1、DSL风格:
      1、你需要引入  import spark.implicit._  这个隐式转换,可以将DataFrame隐式转换成RDD。

2、SQL风格:
      1、你需要将DataFrame注册成一张表格,如果你通过CreateTempView这种方式来创建,那么该表格Session有效,如果你通过CreateGlobalTempView来创建,那么该表格跨Session有效,但是SQL语句访问该表格的时候需要加上前缀  global_temp
      2、你需要通过sparkSession.sql 方法来运行你的SQL语句。

**************  DataSet 
1、首先定义一个DataSet,你需要先定义一个Case类。

**************  RDD、DataSet、DataFrame之间的转换总结  *************

1、 RDD -> DataFrame :   rdd.map(para=>(para(0).trim(),para(1).trim().toInt)).toDF("name","age")
                         //通过反射来设置
                         rdd.map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()    
                             
                             //通过编程方式来设置Schema,适合于编译期不能确定列的情况
                         schemaString.map(fieldName => StructField(fieldName, StringType, nullable = true))  
                 val schema = StructType(fields)
                         val rdd[Row] = rdd.map(attributes => Row(attributes(0), attributes(1).trim))
                         val peopeDF = spark.createDataFrame(rdd[Row],schema)

2、 DataFrame -> RDD :   dataFrame.rdd      注意输出:Array([Michael,29], [Andy,30], [Justin,19])

1、 RDD -> DataSet   :   rdd.map(para=> Person(para(0).trim(),para(1).trim().toInt)).toDS
2、 DataSet ->  RDD  :  dataSet.rdd        注意输出: Array(Person(Michael,29), Person(Andy,30), Person(Justin,19))

1、 DataFrame -> DataSet:  dataFrame.as[Person]  
2、 DataSet -> DataFrame: dataSet.toDF      

**************  对于DataFrame Row对象的访问方式
1、DataFrame = DataSet[Row], DataFrame里面每一行都是Row对象
2、如果需要访问Row对象中的每一个元素,你可以通过下标 row(0);你也可以通过列名 row.getAs[String]("name")


**************  应用UDF函数  ***************
1、通过spark.udf.register(name,func)来注册一个UDF函数,name是UDF调用时的标识符,fun是一个函数,用于处理字段。
2、你需要将一个DF或者DS注册为一个临时表。
3、通过spark.sql去运行一个SQL语句,在SQL语句中可以通过 name(列名) 方式来应用UDF函数。

*************  UDAF 用户自定义聚合函数
1、弱类型用户自定义聚合函数
      1、新建一个Class 继承UserDefinedAggregateFunction  ,然后复写方法:
              //聚合函数需要输入参数的数据类型
          override def inputSchema: StructType = ???

        //可以理解为保存聚合函数业务逻辑数据的一个数据结构
        override def bufferSchema: StructType = ???

        // 返回值的数据类型
        override def dataType: DataType = ???

        // 对于相同的输入一直有相同的输出
        override def deterministic: Boolean = true

         //用于初始化你的数据结构
        override def initialize(buffer: MutableAggregationBuffer): Unit = ???

         //用于同分区内Row对聚合函数的更新操作
        override def update(buffer: MutableAggregationBuffer, input: Row): Unit = ???

        //用于不同分区对聚合结果的聚合。
        override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = ???

        //计算最终结果
        override def evaluate(buffer: Row): Any = ???
     2、你需要通过spark.udf.resigter去注册你的UDAF函数。
     3、需要通过spark.sql去运行你的SQL语句,可以通过 select UDAF(列名) 来应用你的用户自定义聚合函数。

2、强类型的用户自定义聚合函数
     1、新建一个class,继承Aggregator[Employee, Average, Double],其中Employee是在应用聚合函数的时候传入的对象,Average是聚合函数在运行的时候内部需要的数据结构,Double是聚合函数最终需要输出的类型。这些可以根据自己的业务需求去调整。复写想对应的方法:
     //用于定义一个聚合函数内部需要的数据结构
        override def zero: Average = ???

         //针对每个分区内部每一个输入来更新你的数据结构
        override def reduce(b: Average, a: Employee): Average = ???

        //用于对于不同分区的结构进行聚合
        override def merge(b1: Average, b2: Average): Average = ???

        //计算输出
        override def finish(reduction: Average): Double = ???

        //用于数据结构他的转换
        override def bufferEncoder: Encoder[Average] = ???

        //用于最终结果的转换
        override def outputEncoder: Encoder[Double] = ???
      2、新建一个UDAF实例,通过DF或者DS的DSL风格语法去应用。

 

Spark SQL查询

创建json文件
vi people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
将文件传输到其他节点

scala> val people = spark.read.json("/opt/datas/people.json")
people: org.apache.spark.sql.DataFrame = [age: bigint, name: string]            

scala> people.show()
+----+-------+                                                                  
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


scala> people.filter($"age" > 20).show()
+---+----+                                                                      
|age|name|
+---+----+
| 30|Andy|
+---+----+

scala> people.createOrReplaceTempView("pls")

scala> spark.sql("select * from pls where age >20").show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

-------IDEA创建SparkSQL程序-------------

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>
package com.zhangbk.sparksql

import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory


object PeopleSelect {

  val logger = LoggerFactory.getLogger(PeopleSelect.getClass)

  def main(agrs : Array[String]) {
    //SparkSession.builder 用于创建一个SparkSession;如果需要Hive支持,则需要以下创建语句:.enableHiveSupport()
    val spark = SparkSession.builder().appName("Spark SQL basic example")
                .config("spark.some.config.option", "some-value").getOrCreate()

    // For implicit conversions like converting RDDs to DataFrames.import spark.implicits._的引入是
    // 用于将DataFrames隐式转换成RDD,使df能够使用RDD中的方法。

    import spark.implicits._

    val pl = spark.read.json("/opt/datas/people.json")

    // Displays the content of the DataFrame to stdout
    pl.show()

    pl.filter($"age" > 21).show()

    pl.createOrReplaceTempView("persons")

    spark.sql("SELECT * FROM persons where age > 21").show()

    spark.stop()

  }
}

Spark Sql和Hive的继承

======= 内置Hive =====
1、Spark内置有Hive,Spark2.1.1 内置的Hive是1.2.1。
2、需要将core-site.xml和hdfs-site.xml 拷贝到spark的conf目录下。如果Spark路径下发现metastore_db,需要删除【仅第一次启动的时候】。
3、在你第一次启动创建metastore的时候,你需要指定spark.sql.warehouse.dir这个参数,
   比如:bin/spark-shell --conf spark.sql.warehouse.dir=hdfs://master01:9000/spark_warehouse
4、注意,如果你在load数据的时候,需要将数据放到HDFS上。


======= 外部Hive =====
1、需要将hive-site.xml 拷贝到spark的conf目录下。
2、如果hive的metestore使用的是mysql数据库,那么需要将mysql的jdbc驱动包放到spark的jars目录下。
3、可以通过spark-sql或者spark-shell来进行sql的查询。完成和hive的连接。

 

Spark Sql输入输出
1、对于Spark SQL的输入需要使用  sparkSession.read方法

     1、通用模式   sparkSession.read.format("json").load("path")   支持类型:parquet、json、text、csv、orc、jdbc
     2、专业模式   sparkSession.read.json、 csv  直接指定类型。

2、对于Spark SQL的输出需要使用  sparkSession.write方法

     1、通用模式   dataFrame.write.format("json").save("path")  支持类型:parquet、json、text、csv、orc、  
     2、专业模式   dataFrame.write.csv("path")  直接指定类型

3、如果你使用通用模式,spark默认parquet是默认格式,sparkSession.read.load 他加载的默认是parquet格式。dataFrame.write.save也是默认保存成parquet格式。
4、如果需要保存成一个text文件,那么需要dataFrame里面只有一列。

 

案例 加载数据
BYSL00000893,ZHAO,2007-8-23
BYSL00000897,ZHAO,2007-8-24
BYSL00000898,ZHAO,2007-8-25
BYSL00000899,ZHAO,2007-8-26

scala> case class tbStock(orderid:String, location:String, date:String) extends Serializable
scala> val tbStockRdd = spark.sparkContext.textFile("hdfs://ns1:8020/user/spark/tbStock.txt")
scala> val tbStockDS = tbStockRdd.map(_.split(",")).map(para => tbStock(para(0),para(1),para(2))).toDS
scala> tbStockDS.show
+------------+--------+---------+                                               
|     orderid|location|     date|
+------------+--------+---------+
|BYSL00000893|    ZHAO|2007-8-23|
|BYSL00000897|    ZHAO|2007-8-24|
|BYSL00000898|    ZHAO|2007-8-25|
scala> tbStockDS.createOrReplaceTempView("tbSock")

scala> spark.sql("select * from tbSock limit 10").show
+------------+--------+---------+                                               
|     orderid|location|     date|
+------------+--------+---------+
|BYSL00000893|    ZHAO|2007-8-23|
|BYSL00000897|    ZHAO|2007-8-24|
|BYSL00000898|    ZHAO|2007-8-25|
|BYSL00000899|    ZHAO|2007-8-26|
|BYSL00000900|    ZHAO|2007-8-26|
|BYSL00000901|    ZHAO|2007-8-27|
|BYSL00000902|    ZHAO|2007-8-27|
|BYSL00000904|    ZHAO|2007-8-28|
|BYSL00000905|    ZHAO|2007-8-28|
|BYSL00000906|    ZHAO|2007-8-28|
+------------+--------+---------+

 

 类似资料: