一、SparkSQL介绍
Hive是Shark的前身,Shark是SparkSQL的前身,SparkSQL产生的根本原因是其完全脱离了Hive的限制。
1.spark sql的特点
1)引入了新的RDD类型SchemaRDD,可以像传统数据库定义表一样来定义SchemaRDD。
2)在应用程序中可以混合使用不同来源的数据,如可以将来自HiveQL的数据和来自SQL的数据进行Join操作。
3)内嵌了查询优化框架,在把SQL解析成逻辑执行计划之后,最后变成RDD的计算。
2.spark sql的种类
Spark on Hive: Hive只作为储存角色,Spark负责sql解析优化,执行。
Hive on Spark:Hive即作为存储又负责sql的解析优化,Spark负责执行。
3. Dataset与DataFrame
1)Dataset是一个分布式数据容器,与RDD类似。除了数据以外,还掌握数据的结构信息即schema,Dataset的底层封装的是RDD。
通过org.apache.spark.sql.SparkSession来创建Spark上下文
SparkSession中有很多封装好的方法可以来读取文件、创建DataSet,甚至是直接用来写sql语句
Dataset.registerTempTable("表名")可以将DataSet转化成一张临时表,便于sql语句查询
2)Dataset<Row> = DataFrame
4.SparkSQL的底层架构
提交sql——>解析一批未被解决的逻辑计划——>分析后的逻辑计划——>优化规则(谓词下推)转化最佳优化的逻辑计划——>SparkPlanner转化为物理计划——>Spark任务执行
二、创建dataset的方式
1.读取json格式的文件创建Dataset
Dataset<Row> ds = sparkSession.read().format("json").load("data/json");
或者
Dataset<Row> ds = sparkSession.read().json("data/json");
2.通过json格式的RDD创建Dataset
直接创建json格式的RDD,然后可以将它直接转化DataSet
3.通过反射的方式将RDD转换成Dataset
Dataset<Row> dataFrame = sparkSession.createDataFrame(personRDD, Person.class);
上例中:
1)person类要可序列化、访问级别是public
2)RDD转成Dataset后会根据映射将字段按Assci码排序
3)将Dataset转换成RDD时获取字段两种方式,一种是ds.getInt(0)下标获取(不推荐使用),另一种是ds.getAs(“列名”)获取(推荐使用)
4.动态创建Schema转换成Dataset
可以动态的对字段修改
List<StructField> asList = Arrays.asList(
DataTypes.createStructField("id", DataTypes.StringType, true),
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("age", DataTypes.IntegerType, true)
);
StructType schema = DataTypes.createStructType(asList);
Dataset<Row> df = sparkSession.createDataFrame(rowRDD, schema);
5.读取parquet文件创建Dataset
可以将Dataset存储成parquet文件,然后在转化成DataSet
df.write().mode(SaveMode.Overwrite)format("parquet").save("./sparksql/parquet");
df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");
SaveMode指定文件保存时的模式:
Overwrite:覆盖
Append:追加
ErrorIfExists:如果存在就报错
Ignore:如果存在就忽略
5.读取JDBC中的数据创建Dataset
第一种:
Map<String, String> options = new HashMap<String, String>();
options.put("url", "jdbc:mysql://127.0.0.1:3306/spark");
options.put("driver", "com.mysql.jdbc.Driver");
options.put("user", "root");
options.put("password", "root");
options.put("dbtable", "person");
Dataset<Row> person = sparkSession.read().format("jdbc").options(options).load();
第二种:
DataFrameReader reader = sparkSession.read().format("jdbc");
reader.option("url", "jdbc:mysql://127.0.0.1:3306/spark");
reader.option("driver", "com.mysql.jdbc.Driver");
reader.option("user", "root");
reader.option("password", "root");
reader.option("dbtable", "score");
Dataset<Row> score = reader.load();
6. 读取Hive中的数据加载成Dataset
SparkSession sparkSession = SparkSession
.builder()
.master("local")
.appName("hvie")
//开启hive的支持,接下来就可以操作hive表了
// 前提需要是需要开启hive metastore 服务
.enableHiveSupport()
.getOrCreate();
创建SparkSession时需要指定支持hive,然后调用sql方法就可以直接对Hive写操作语句
三、存储DataSet
1. 将DataSet存储为parquet文件。
2. 将DataSet存储到JDBC数据库。
3. 将DataSet存储到Hive表。
四、自定义函数UDF和UDAF
UDF一进一出
/**
* 根据UDF函数参数的个数来决定是实现哪一个UDF UDF1,UDF2。。。。UDF1xxx
*/
sparkSession.udf().register("StrLen",new UDF2<String, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(String t1, Integer t2) throws Exception {
return t1.length() + t2;
}
} ,DataTypes.IntegerType );
UDAF一进多出(聚合函数)
sparkSession.udf().register("StringCount", new UserDefinedAggregateFunction() {
。。。这里面有很多方法,需要根据业务逻辑自定义
}
五、开窗函数
row_number() 开窗函数是按照某个字段分组,然后取另一字段的前几个的值,相当于分组取topN
开窗函数格式:
row_number() over (partitin by XXX order by XXX)
("select riqi,leibie,jine,rank from (select riqi,leibie,jine,row_number() over"
+"(partition by leibie order by jine desc) rank "
+ "from sales) t where t.rank<=3")