【Spark】Spark SQL总结

常宸
2023-12-01

一、SparkSQL介绍

Hive是Shark的前身,Shark是SparkSQL的前身,SparkSQL产生的根本原因是其完全脱离了Hive的限制。

1.spark sql的特点

1)引入了新的RDD类型SchemaRDD,可以像传统数据库定义表一样来定义SchemaRDD。

2)在应用程序中可以混合使用不同来源的数据,如可以将来自HiveQL的数据和来自SQL的数据进行Join操作。

3)内嵌了查询优化框架,在把SQL解析成逻辑执行计划之后,最后变成RDD的计算。

2.spark sql的种类

Spark on Hive: Hive只作为储存角色,Spark负责sql解析优化,执行。

Hive on Spark:Hive即作为存储又负责sql的解析优化,Spark负责执行。

3. Dataset与DataFrame

1)Dataset是一个分布式数据容器,与RDD类似。除了数据以外,还掌握数据的结构信息即schema,Dataset的底层封装的是RDD。

通过org.apache.spark.sql.SparkSession来创建Spark上下文
SparkSession中有很多封装好的方法可以来读取文件、创建DataSet,甚至是直接用来写sql语句
Dataset.registerTempTable("表名")可以将DataSet转化成一张临时表,便于sql语句查询

2)Dataset<Row> = DataFrame

4.SparkSQL的底层架构

提交sql——>解析一批未被解决的逻辑计划——>分析后的逻辑计划——>优化规则(谓词下推)转化最佳优化的逻辑计划——>SparkPlanner转化为物理计划——>Spark任务执行

二、创建dataset的方式

1.读取json格式的文件创建Dataset

Dataset<Row> ds = sparkSession.read().format("json").load("data/json");
或者
Dataset<Row> ds = sparkSession.read().json("data/json");

2.通过json格式的RDD创建Dataset

直接创建json格式的RDD,然后可以将它直接转化DataSet

3.通过反射的方式将RDD转换成Dataset

Dataset<Row> dataFrame = sparkSession.createDataFrame(personRDD, Person.class);

上例中:

1)person类要可序列化、访问级别是public

2)RDD转成Dataset后会根据映射将字段按Assci码排序

3)将Dataset转换成RDD时获取字段两种方式,一种是ds.getInt(0)下标获取(不推荐使用),另一种是ds.getAs(“列名”)获取(推荐使用)

4.动态创建Schema转换成Dataset

可以动态的对字段修改

List<StructField> asList = Arrays.asList(
                DataTypes.createStructField("id", DataTypes.StringType, true),
                DataTypes.createStructField("name", DataTypes.StringType, true),
                DataTypes.createStructField("age", DataTypes.IntegerType, true)
        );


		StructType schema = DataTypes.createStructType(asList);

		Dataset<Row> df = sparkSession.createDataFrame(rowRDD, schema);

5.读取parquet文件创建Dataset

可以将Dataset存储成parquet文件,然后在转化成DataSet

df.write().mode(SaveMode.Overwrite)format("parquet").save("./sparksql/parquet");
df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");

SaveMode指定文件保存时的模式:

Overwrite:覆盖

Append:追加

ErrorIfExists:如果存在就报错

Ignore:如果存在就忽略

5.读取JDBC中的数据创建Dataset

第一种: 
Map<String, String> options = new HashMap<String, String>();
        options.put("url", "jdbc:mysql://127.0.0.1:3306/spark");
        options.put("driver", "com.mysql.jdbc.Driver");
        options.put("user", "root");
        options.put("password", "root");
        options.put("dbtable", "person");

        Dataset<Row> person = sparkSession.read().format("jdbc").options(options).load();
第二种:
 DataFrameReader reader = sparkSession.read().format("jdbc");
        reader.option("url", "jdbc:mysql://127.0.0.1:3306/spark");
        reader.option("driver", "com.mysql.jdbc.Driver");
        reader.option("user", "root");
        reader.option("password", "root");
        reader.option("dbtable", "score");

        Dataset<Row> score = reader.load();

6. 读取Hive中的数据加载成Dataset

SparkSession sparkSession = SparkSession
                .builder()
                .master("local")
                .appName("hvie")
                //开启hive的支持,接下来就可以操作hive表了
                // 前提需要是需要开启hive metastore 服务
                .enableHiveSupport()
                .getOrCreate();

创建SparkSession时需要指定支持hive,然后调用sql方法就可以直接对Hive写操作语句

三、存储DataSet

1. 将DataSet存储为parquet文件。

2. 将DataSet存储到JDBC数据库。

3. 将DataSet存储到Hive表。

四、自定义函数UDF和UDAF

UDF一进一出

	/**
	 * 根据UDF函数参数的个数来决定是实现哪一个UDF  UDF1,UDF2。。。。UDF1xxx
	*/

	sparkSession.udf().register("StrLen",new UDF2<String, Integer, Integer>() {

		private static final long serialVersionUID = 1L;

		@Override
		public Integer call(String t1, Integer t2) throws Exception {
			return t1.length() + t2;
		}
	} ,DataTypes.IntegerType );

UDAF一进多出(聚合函数)

sparkSession.udf().register("StringCount", new UserDefinedAggregateFunction() {

     。。。这里面有很多方法,需要根据业务逻辑自定义
}

五、开窗函数

row_number() 开窗函数是按照某个字段分组,然后取另一字段的前几个的值,相当于分组取topN

开窗函数格式:

row_number() over (partitin by XXX order by XXX)

("select riqi,leibie,jine,rank from (select riqi,leibie,jine,row_number() over" 
                +"(partition by leibie order by jine desc) rank "
		+ "from sales) t where t.rank<=3")
 类似资料: