在Spark SQL中,可以使用各种各样的数据源来操作。
默认的数据源是 Parquet文件。列式存储文件。
load加载:
// 读parquet格式文件的时候,不用指定format。因为默认的就是parquet格式的。
scala> val userDF = spark.read.load("/usr/local/tmp_files/users.parquet")
userDF: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]
scala> userDF.printSchema
root
|-- name: string (nullable = true)
|-- favorite_color: string (nullable = true)
|-- favorite_numbers: array (nullable = true)
| |-- element: integer (containsNull = true)
scala> userDF.show
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+
// 读json文件的时候,要指定文件格式,或者用read.json。因为默认格式为parquet。
scala> val testResult = spark.read.json("/usr/local/tmp_files/emp.json")
testResult: org.apache.spark.sql.DataFrame = [comm: string, deptno: bigint ... 6 more fields]
scala> val testResult = spark.read.format("json").load("/usr/local/tmp_files/emp.json")
testResult: org.apache.spark.sql.DataFrame = [comm: string, deptno: bigint ... 6 more fields]
scala> val testResult = spark.read.load("/usr/local/tmp_files/emp.json") //会报错
// 可以不读指定的文件,因为有时候文件名字是不确定的,可以直接读文件所在的目录。
scala> val testResult = spark.read.load("/usr/local/tmp_files/parquet/part-00000-77d38cbb-ec43-439a-94e2-9a0382035552.snappy.parquet")
testResult: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string]
scala> testResult.show
+------+--------------+
| name|favorite_color|
+------+--------------+
|Alyssa| null|
| Ben| red|
+------+--------------+
scala> val testResult = spark.read.load("/usr/local/tmp_files/parquet")
testResult: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string]
scala> testResult.show
+------+--------------+
| name|favorite_color|
+------+--------------+
|Alyssa| null|
| Ben| red|
+------+--------------+
save存储:
write.save(“路径”)
write.mode(“overwrite”).save(“路径”)
write.mode(“overwrite”).parquet(“路径”)
write.saveAsTable(“表名”)
存储模式(Save Modes)
// 使用save函数时,可以指定存储模式:追加、覆盖。
scala> userDF.select("name").write.save("/usr/local/tmp_files/parquet")
// 如果文件存在,直接写会报已存在的错。可以用"overwrite"的方式。
scala> userDF.select("name").write.mode("overwrite").save("/usr/local/tmp_files/parquet")
// 将结果保存成表
scala> userDF.select("name").write.saveAsTable("table0821")
scala> spark.sql("select * from table0821").show
+------+
| name|
+------+
|Alyssa|
| Ben|
+------+
关闭Spark Shell 后 ,重启
scala> userDF.show
<console>:24: error: not found: value userDF
userDF.show
^
scala> spark.sql("select * from table0821").show
+------+
| name|
+------+
|Alyssa|
| Ben|
+------+
DataFrame中无数据,但table0821中依然可以读到数据的原因:
在启动 Spark Shell 的目录下有 spark-warehouse 文件夹,调用saveAsTable时,会把数据保存到这个文件夹下。从其他路径启动spark shell ,不能读取到数据。如:在spark路径下启动spark shell, # ./bin/spark-shell --master spark://node3:7077,则不能访问到数据。因为在Spark路径下,没有spark-warehouse文件夹。
Parquet文件:列式存储文件。是Spark SQL 默认的数据源。
理解:就是普通的文件。
Parquet是列式存储格式的一种文件类型,列式存储有以下的核心:
非常简单:调用save函数,默认就是格式。
思路:将数据读进来,再写出去。就是Parquet文件。
scala> val empDF = spark.read.json("/usr/local/tmp_files/emp.json")
empDF: org.apache.spark.sql.DataFrame = [comm: string, deptno: bigint ... 6 more fields]
scala> empDF.show
+----+------+-----+------+----------+---------+----+----+
|comm|deptno|empno| ename| hiredate| job| mgr| sal|
+----+------+-----+------+----------+---------+----+----+
| | 20| 7369| SMITH|1980/12/17| CLERK|7902| 800|
| 300| 30| 7499| ALLEN| 1981/2/20| SALESMAN|7698|1600|
| 500| 30| 7521| WARD| 1981/2/22| SALESMAN|7698|1250|
| | 20| 7566| JONES| 1981/4/2| MANAGER|7839|2975|
|1400| 30| 7654|MARTIN| 1981/9/28| SALESMAN|7698|1250|
| | 30| 7698| BLAKE| 1981/5/1| MANAGER|7839|2850|
| | 10| 7782| CLARK| 1981/6/9| MANAGER|7839|2450|
| | 20| 7788| SCOTT| 1987/4/19| ANALYST|7566|3000|
| | 10| 7839| KING|1981/11/17|PRESIDENT| |5000|
| 0| 30| 7844|TURNER| 1981/9/8| SALESMAN|7698|1500|
| | 20| 7876| ADAMS| 1987/5/23| CLERK|7788|1100|
| | 30| 7900| JAMES| 1981/12/3| CLERK|7698| 950|
| | 20| 7902| FORD| 1981/12/3| ANALYST|7566|3000|
| | 10| 7934|MILLER| 1982/1/23| CLERK|7782|1300|
+----+------+-----+------+----------+---------+----+----+
// 两种方式写
scala> empDF.write.mode("overwrite").save("/usr/local/tmp_files/parquet")
scala> empDF.write.mode("overwrite").parquet("/usr/local/tmp_files/parquet")
// 读
scala> val emp1 = spark.read.parquet("/usr/local/tmp_files/parquet")
emp1: org.apache.spark.sql.DataFrame = [comm: string, deptno: bigint ... 6 more fields]
scala> emp1.createOrReplaceTempView("emp1")
scala> spark
res4: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@71e6bac0
scala> spark.sql("select * from emp1")
res5: org.apache.spark.sql.DataFrame = [comm: string, deptno: bigint ... 6 more fields]
scala> spark.sql("select * from emp1").show
+----+------+-----+------+----------+---------+----+----+
|comm|deptno|empno| ename| hiredate| job| mgr| sal|
+----+------+-----+------+----------+---------+----+----+
| | 20| 7369| SMITH|1980/12/17| CLERK|7902| 800|
| 300| 30| 7499| ALLEN| 1981/2/20| SALESMAN|7698|1600|
| 500| 30| 7521| WARD| 1981/2/22| SALESMAN|7698|1250|
| | 20| 7566| JONES| 1981/4/2| MANAGER|7839|2975|
|1400| 30| 7654|MARTIN| 1981/9/28| SALESMAN|7698|1250|
| | 30| 7698| BLAKE| 1981/5/1| MANAGER|7839|2850|
| | 10| 7782| CLARK| 1981/6/9| MANAGER|7839|2450|
| | 20| 7788| SCOTT| 1987/4/19| ANALYST|7566|3000|
| | 10| 7839| KING|1981/11/17|PRESIDENT| |5000|
| 0| 30| 7844|TURNER| 1981/9/8| SALESMAN|7698|1500|
| | 20| 7876| ADAMS| 1987/5/23| CLERK|7788|1100|
| | 30| 7900| JAMES| 1981/12/3| CLERK|7698| 950|
| | 20| 7902| FORD| 1981/12/3| ANALYST|7566|3000|
| | 10| 7934|MILLER| 1982/1/23| CLERK|7782|1300|
+----+------+-----+------+----------+---------+----+----+
项目开始,表结构简单,Schema简单。随着项目越来越大,表越来越复杂。逐步向表中增加新的列。
通过这种方式,用户可以获取多个有不同Schema但相互兼容的Parquet文件。
spark.read.option(“mergeSchema”,true).parquet(“path”)
scala> val df1 = sc.makeRDD(1 to 5).map(i => (i,i*2)).toDF("single","double")
df1: org.apache.spark.sql.DataFrame = [single: int, double: int]
scala> df1.write.parquet("/usr/local/tmp_files/test_table/key=1")
scala> df1.show
+------+------+
|single|double|
+------+------+
| 1| 2|
| 2| 4|
| 3| 6|
| 4| 8|
| 5| 10|
+------+------+
scala> val df2 = sc.makeRDD(6 to 10).map(i=>(i,i*3)).toDF("single","triple")
df2: org.apache.spark.sql.DataFrame = [single: int, triple: int]
scala> df2.show
+------+------+
|single|triple|
+------+------+
| 6| 18|
| 7| 21|
| 8| 24|
| 9| 27|
| 10| 30|
+------+------+
scala> df2.write.parquet("/usr/local/tmp_files/test_table/key=2")
// 合并Schema
scala> val df3 = spark.read.option("mergeSchema",true).parquet("/usr/local/tmp_files/test_table")
df3: org.apache.spark.sql.DataFrame = [single: int, double: int ... 2 more fields]
scala> df3.printSchema
root
|-- single: integer (nullable = true)
|-- double: integer (nullable = true)
|-- triple: integer (nullable = true)
|-- key: integer (nullable = true)
scala> df3.show
+------+------+------+---+
|single|double|triple|key|
+------+------+------+---+
| 8| null| 24| 2|
| 9| null| 27| 2|
| 10| null| 30| 2|
| 3| 6| null| 1|
| 4| 8| null| 1|
| 5| 10| null| 1|
| 6| null| 18| 2|
| 7| null| 21| 2|
| 1| 2| null| 1|
| 2| 4| null| 1|
+------+------+------+---+
// 测试将key换成其他的,也可以进行合并:
scala> val df1 = sc.makeRDD(1 to 5).map(i => (i,i*2)).toDF("single","double")
df1: org.apache.spark.sql.DataFrame = [single: int, double: int]
scala> df1.write.parquet("/usr/local/tmp_files/test_table/hehe=1")
scala> val df2 = sc.makeRDD(6 to 10).map(i=>(i,i*3)).toDF("single","triple")
df2: org.apache.spark.sql.DataFrame = [single: int, triple: int]
scala> df2.write.parquet("/usr/local/tmp_files/test_table/hehe=2")
scala> val df3 = spark.read.option("mergeSchema",true).parquet("/usr/local/tmp_files/test_table")
df3: org.apache.spark.sql.DataFrame = [single: int, double: int ... 2 more fields]
scala> df3.printSchema
root
|-- single: integer (nullable = true)
|-- double: integer (nullable = true)
|-- triple: integer (nullable = true)
|-- hehe: integer (nullable = true)
// 测试两次使用不同名字,发现报错,无法合并:
scala> df1.write.parquet("/usr/local/tmp_files/test_table/hehe=1")
scala> df2.write.parquet("/usr/local/tmp_files/test_table/key=2")
scala> val df3 = spark.read.option("mergeSchema",true).parquet("/usr/local/tmp_files/test_table")
java.lang.AssertionError: assertion failed: Conflicting partition column names detected:
Partition column name list #0: key
Partition column name list #1: hehe
//将两次名字改成相同后,可以合并:
# mv key\=2/ hehe\=2/
scala> val df3 = spark.read.option("mergeSchema",true).parquet("/usr/local/tmp_files/test_table")
df3: org.apache.spark.sql.DataFrame = [single: int, double: int ... 2 more fields]
scala> df3.show
+------+------+------+----+
|single|double|triple|hehe|
+------+------+------+----+
| 8| null| 24| 2|
| 9| null| 27| 2|
| 10| null| 30| 2|
| 3| 6| null| 1|
| 4| 8| null| 1|
| 5| 10| null| 1|
| 6| null| 18| 2|
| 7| null| 21| 2|
| 1| 2| null| 1|
| 2| 4| null| 1|
+------+------+------+----+
需要注意的是,这里的JSON文件不是常规的JSON格式。JSON文件每一行必须包含一个独立的、自满足有效的JSON对象。如果用多行描述一个JSON对象,会导致读取出错。
emp.json
{"empno":7369,"ename":"SMITH","job":"CLERK","mgr":"7902","hiredate":"1980/12/17","sal":800,"comm":"","deptno":20}
{"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":"7698","hiredate":"1981/2/20","sal":1600,"comm":"300","deptno":30}
scala> val peopleDF = spark.read.json("/usr/local/tmp_files/people.json")
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> peopleDF.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> peopleDF.createOrReplaceTempView("people")
scala> spark.sql("select name from people where age=19").show
+------+
| name|
+------+
|Justin|
+------+
通过JDBC操作关系型数据库。MySQL中的数据。命令行中必须加入jars和Driver才能连接MySQL数据库。
./spark-shell --master spark://node3:7077 --jars /usr/local/tmp_files/mysql-connecto8.0.11.jar --driver-class-path /usr/local/tmp_files/mysql-connector-java-8.0.11.jar
./spark-shell --jars /opt/top_resources/test/mysql-connector-java-5.1.38.jar --driver-class-path /opt/top_resources/test/mysql-connector-java-5.1.38.jar
scala> val mysqlDF = spark.read.format("jdbc").option("url","jdbc:mysql://topnpl200:3306/topdb_dev?serverTimezone=UTC&characterEncoding=utf-8").option("user","root").option("password","TOPtop123456").option("driver","com.mysql.jdbc.Driver").option("dbtable","test_table").load
scala> mysqlDF.show
如果是Oracle:
.option(“url”,“jdbc:oracle:thin@topnpl200:1521/orcl”)
.option(“driver”,“oracle.jdbc.OracleDriver”)
scala> import java.util.Properties
scala> val mysqlProps = new Properties()
scala> mysqlProps.setProperty("user","root")
scala> mysqlProps.setProperty("password","TOPtop123456")
scala> mysqlProps.setProperty("driver","com.mysql.jdbc.Driver")
scala> val mysqlDF1 = spark.read.jdbc("jdbc:mysql://topnpl200:3306/topdb_dev?serverTimezone=UTC&characterEncoding=utf-8","test_table",mysqlProps)
scala> mysqlDF1.show
可以把Hive 中的数据,读取到Spark SQL 中,使用Spark SQL 来处理。比较常见的模式。
搭建好Hive。
拷贝Hive、Hadoop的配置到Spark配置中。
Hive中的hive-site.xml
Hadoop中的core-site.xml,hdfs-site.xml
到 SPARK_HOME/conf 目录下
这里的Hive配置的是多个Hive客户端,一个Hive服务器端。然后Hive服务器端连接MySQL。这样的好处是,MySQL的信息只有Hive的服务器端知道,并不会把MySQL的信息暴露给Hive的多个客户端。
<!-- Hive的Server端配置 -->
<configuration>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/yibo/hive/warehouse</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://192.168.109.1:3306/hive?serverTimezone=UTC</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
<property>
<name>hive.querylog.location</name>
<value>/data/hive/iotmp</value>
</property>
<property>
<name>hive.server2.logging.operation.log.location</name>
<value>/data/hive/operation_logs</value>
</property>
<property>
<name>datanucleus.readOnlyDatastore</name>
<value>false</value>
</property>
<property>
<name>datanucleus.fixedDatastore</name>
<value>false</value>
</property>
<property>
<name>datanucleus.autoCreateSchema</name>
<value>true</value>
</property>
<property>
<name>datanucleus.autoCreateTables</name>
<value>true</value>
</property>
<property>
<name>datanucleus.autoCreateColumns</name>
<value>true</value>
</property>
<property>
<name>datanucleus.schema.autoCreateAll</name>
<value>true</value>
</property>
</configuration>
<!-- Hive的Client端配置 -->
<configuration>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/yibo/hive/warehouse</value>
</property>
<property>
<name>hive.metastore.local</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://192.168.109.132:9083</value>
</property>
</configuration>
启动Spark及相关组件。
# 启动HDFS、YARN
start-all
# 如果server与clinet分离,启动Hive Server端命令:
hive-server
./hive --service metastore
# 启动Spark集群
start-all
# Spark Shell,指定MySQL的驱动。(Hive的元信息保存在MySQL中)
./spark-shell --master spark://node3:7077 --jars /usr/local/tmp_files/mysql-connector-java-8.0.11.jar
// 显示Hive中的表
scala> spark.sql("show tables").show
+--------+-----------+-----------+
|database| tableName|isTemporary|
+--------+-----------+-----------+
| default|emp_default| false|
+--------+-----------+-----------+
scala> spark.sql("select * from company.emp limit 10").show
+-----+------+---------+----+----------+----+----+------+
|empno| ename| job| mgr| hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH| CLERK|7902|1980/12/17| 800| 0| 20|
| 7499| ALLEN| SALESMAN|7698| 1981/2/20|1600| 300| 30|
| 7521| WARD| SALESMAN|7698| 1981/2/22|1250| 500| 30|
| 7566| JONES| MANAGER|7839| 1981/4/2|2975| 0| 20|
| 7654|MARTIN| SALESMAN|7698| 1981/9/28|1250|1400| 30|
| 7698| BLAKE| MANAGER|7839| 1981/5/1|2850| 0| 30|
| 7782| CLARK| MANAGER|7839| 1981/6/9|2450| 0| 10|
| 7788| SCOTT| ANALYST|7566| 1987/4/19|3000| 0| 20|
| 7839| KING|PRESIDENT|7839|1981/11/17|5000| 0| 10|
| 7844|TURNER| SALESMAN|7698| 1981/9/8|1500| 0| 30|
+-----+------+---------+----+----------+----+----+------+
// 通过Spark SQL创建Hive中的表
scala> spark.sql("create table company.emp_0823( empno Int, ename String, job String, mgr String, hiredate String, sal Int, comm String, deptno Int ) row format delimited fields terminated by ','")
res2: org.apache.spark.sql.DataFrame = []
scala> spark.sql("load data local inpath '/usr/local/tmp_files/emp.csv' overwrite into table company.emp_0823")
19/08/23 05:14:50 ERROR KeyProviderCache: Could not find uri with key [dfs.encryption.key.provider.uri] to create a keyProvider !!
res3: org.apache.spark.sql.DataFrame = []
scala> spark.sql("load data local inpath '/usr/local/tmp_files/emp.csv' into table company.emp_0823")
res4: org.apache.spark.sql.DataFrame = []
scala> spark.sql("select * from company.emp_0823 limit 10").show
+-----+------+---------+----+----------+----+----+------+
|empno| ename| job| mgr| hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH| CLERK|7902|1980/12/17| 800| 0| 20|
| 7499| ALLEN| SALESMAN|7698| 1981/2/20|1600| 300| 30|
| 7521| WARD| SALESMAN|7698| 1981/2/22|1250| 500| 30|
| 7566| JONES| MANAGER|7839| 1981/4/2|2975| 0| 20|
| 7654|MARTIN| SALESMAN|7698| 1981/9/28|1250|1400| 30|
| 7698| BLAKE| MANAGER|7839| 1981/5/1|2850| 0| 30|
| 7782| CLARK| MANAGER|7839| 1981/6/9|2450| 0| 10|
| 7788| SCOTT| ANALYST|7566| 1987/4/19|3000| 0| 20|
| 7839| KING|PRESIDENT|7839|1981/11/17|5000| 0| 10|
| 7844|TURNER| SALESMAN|7698| 1981/9/8|1500| 0| 30|
+-----+------+---------+----+----------+----+----+------+