spark-3 Spark SQL

亢胤运
2023-12-01

三、Spark SQL

3.1 Spark SQL与Hive

Spark SQL实际上并不能完全替代Hive,因为Hive是一种基于HDFS的数据仓库,并且提供了基于SQL模型的查询,针对存储了大数据的数据仓库,进行分布式交互查询的查询引擎。Spark SQL所替代的,是Hive的查询引擎,而不是Hive本身。在生产环境下,Spark SQL 是针对Hive数据仓库中的数据进行查询,Spark本身是不提供存储,自然也不可能替代Hive作为数据仓库的这个功能。

Spark SQL的一个优点,相较于Hive查询引擎来说,就是速度快,同样的SQL语句,可能使用Hive的查询引擎,由于其底层基于 MapReduce,必须经过 shuffle过程走磁盘,因此速度是非常缓慢的。很多复杂的SQL语句,在Hive中执行都需要一个小时以上的时间。而 Spark SQL由于其底层基于内存的特点,因此速度达到了Hive查询引擎的数倍以上。

Spark SQL相较于Hive的另外一个优点,就是支持大量不同的数据源,数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据。此外,Spark SQL由于身处技术堆栈内,也是基于RDD来工作,因此可以与 Spark的其他组件无缝整合使用,配合起来实现许多复杂的功能。比如 Spark SQL支持可以直接针对hdfs文件执行SQL语句。

3.2 Shark和Spark SQL

Shark的出现,使得SQL-on-Hadoop的性能比Hive有了10-100倍的提高,但Shark的设计导致了两个问题:
(1)执行计划优化完全依赖于Hive,不方便添加新的优化策略;
(2)因为Spark是线程级并行,而MapReduce是进程级并行,因此,Spark在兼容Hive的实现上存在线程安全问题,导致Shark不得不使用另外一套独立维护的打了补丁的Hive源码分支。
Spark SQL在Hive兼容层面仅依赖HiveQL解析、Hive元数据,而从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了。Spark SQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责。

3.3 DataFrame和RDD

Spark SQL增加了DataFrame(即带有Schema信息的RDD),DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的RDD转化方式更加简单易用,且获得了更高的计算性能;Spark可轻松实现从MySQL到DataFrame的转化,且支持SQL查询。

RDD是分布式的Java对象的集合,但是对象内部结构对于RDD而言却是不可知的;DataFrame是一种以RDD为基础的分布式数据集,提供了详细的结构信息。RDD就像一个屋子,找东西要把这个屋子翻遍才能找到;DataFrame相当于在你的屋子里面打上了货架,只要告诉他你是在第几个货架的第几个位置,DataFrame就是在RDD基础上加入了列,处理数据就像处理二维表一样。

DataFrame与RDD的主要区别在于,DataFrame带schema元信息,即数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而可以对DataFrame背后的数据源以及作用于DataFrame之上的变换进行针对性的优化,达到大幅提升运行效率的目标。而RDD由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

3.4 DataFrame的创建

Spark2.0版本开始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口,来实现其对数据加载、转换、处理等功能。SparkSession实现了SQLContext及HiveContext所有功能。SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。SparkSession也提供了HiveQL以及其他依赖于Hive的功能的支持。
可以通过如下语句创建一个SparkSession对象:

import org.apache.spark.sql.SparkSession
val spark=SparkSession.builder().getOrCreate()

在创建DataFrame前,为支持RDD转换为DataFrame及后续的SQL操作,需通过import语句(即import spark.implicits._)导入相应包,启用隐式转换。
在创建DataFrame时,可使用spark.read操作从不同类型的文件中加载数据创建DataFrame,如:spark.read.json(“people.json”):读取people.json文件创建DataFrame;在读取本地文件或HDFS文件时,要注意给出正确的文件路径;spark.read.csv(“people.csv”):读取people.csv文件创建DataFrame;
读取hdfs上的json文件,并打印,json文件为:

{"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}

读取代码:

import org.apache.spark.sql.SparkSession 
val spark=SparkSession.builder().getOrCreate() 
import spark.implicits._ 
val df =spark.read.json("hdfs://172.22.241.183:8020/user/spark/json_sparksql.json") df.show()

3.5 RDD转换DataFrame

Spark提供了两种方法来实现从RDD转换得到DataFrame:
① 利用反射来推断包含特定类型对象的RDD的schema,适用对已知数据结构的RDD转换;
② 使用编程接口,构造一个schema并将其应用在已知的RDD上,该种方式编写的代码更冗长,但在不知道colum及其type的情况下,可以使用这种方式。

3.6 SparkSQL即席查询

SparkSQL的元数据的状态有两种:
① in_memory,用完了元数据也就丢了;
② 通过hive保存,hive的元数据存在哪儿,它的元数据也就存在哪。
SparkSQL命令行提供了即席查询能力,可以使用类SQL方式操作数据源,效率高于hive,同时SparkSQL支持将数据导入到数据仓库。

 类似资料: