Spark SQL(五)—— Spark SQL数据源

越信鸥
2023-12-01

在Spark SQL中,可以使用各种各样的数据源来操作。

1. 使用load(加载函数)、save(存储函数)

默认的数据源是 Parquet文件。列式存储文件。

load加载:

// 读parquet格式文件的时候,不用指定format。因为默认的就是parquet格式的。
scala> val userDF = spark.read.load("/usr/local/tmp_files/users.parquet")
userDF: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]

scala> userDF.printSchema
root
 |-- name: string (nullable = true)
 |-- favorite_color: string (nullable = true)
 |-- favorite_numbers: array (nullable = true)
 |    |-- element: integer (containsNull = true)

scala> userDF.show
+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

// 读json文件的时候,要指定文件格式,或者用read.json。因为默认格式为parquet。
scala> val testResult = spark.read.json("/usr/local/tmp_files/emp.json")
testResult: org.apache.spark.sql.DataFrame = [comm: string, deptno: bigint ... 6 more fields]

scala> val testResult = spark.read.format("json").load("/usr/local/tmp_files/emp.json")
testResult: org.apache.spark.sql.DataFrame = [comm: string, deptno: bigint ... 6 more fields]

scala> val testResult = spark.read.load("/usr/local/tmp_files/emp.json") //会报错
 
// 可以不读指定的文件,因为有时候文件名字是不确定的,可以直接读文件所在的目录。
scala> val testResult = spark.read.load("/usr/local/tmp_files/parquet/part-00000-77d38cbb-ec43-439a-94e2-9a0382035552.snappy.parquet")
testResult: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string]

scala> testResult.show
+------+--------------+
|  name|favorite_color|
+------+--------------+
|Alyssa|          null|
|   Ben|           red|
+------+--------------+

scala> val testResult = spark.read.load("/usr/local/tmp_files/parquet")
testResult: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string]

scala> testResult.show
+------+--------------+
|  name|favorite_color|
+------+--------------+
|Alyssa|          null|
|   Ben|           red|
+------+--------------+

save存储:

write.save(“路径”)
write.mode(“overwrite”).save(“路径”)
write.mode(“overwrite”).parquet(“路径”)
write.saveAsTable(“表名”)

存储模式(Save Modes)

  • 可以采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式。需要注意的是,这些保存模式不使用任何锁定,不是原子操作。此外,当使用Overwrite方式执行时,在输出新数据之前原数据就已经被删除。
  • ErrorIfExists、Append、Overwrite、Ignore。
// 使用save函数时,可以指定存储模式:追加、覆盖。
scala> userDF.select("name").write.save("/usr/local/tmp_files/parquet")

// 如果文件存在,直接写会报已存在的错。可以用"overwrite"的方式。
scala> userDF.select("name").write.mode("overwrite").save("/usr/local/tmp_files/parquet")

// 将结果保存成表
scala> userDF.select("name").write.saveAsTable("table0821")

scala> spark.sql("select * from table0821").show
+------+
|  name|
+------+
|Alyssa|
|   Ben|
+------+

关闭Spark Shell 后 ,重启

scala> userDF.show
<console>:24: error: not found: value userDF
       userDF.show
       ^

scala> spark.sql("select * from table0821").show
+------+
|  name|
+------+
|Alyssa|
|   Ben|
+------+

DataFrame中无数据,但table0821中依然可以读到数据的原因:
在启动 Spark Shell 的目录下有 spark-warehouse 文件夹,调用saveAsTable时,会把数据保存到这个文件夹下。从其他路径启动spark shell ,不能读取到数据。如:在spark路径下启动spark shell, # ./bin/spark-shell --master spark://node3:7077,则不能访问到数据。因为在Spark路径下,没有spark-warehouse文件夹。

2. Parquet文件

Parquet文件:列式存储文件。是Spark SQL 默认的数据源。

理解:就是普通的文件。
Parquet是列式存储格式的一种文件类型,列式存储有以下的核心:

  • 可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。
  • 压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如Run Length Encoding和Delta Encoding)进一步节约存储空间。
  • 只读取需要的列,支持向量运算,能够获取更好的扫描性能。
  • Parquet格式是Spark SQL的默认数据源,可通过spark.sql.sources.default配置。
  • 当写Parquet文件时,所有的列被自动转化为nullable,因为兼容性的缘故。

2.1 把其他文件转换成Parquet文件

非常简单:调用save函数,默认就是格式。
思路:将数据读进来,再写出去。就是Parquet文件。

scala> val empDF = spark.read.json("/usr/local/tmp_files/emp.json")
empDF: org.apache.spark.sql.DataFrame = [comm: string, deptno: bigint ... 6 more fields]

scala> empDF.show
+----+------+-----+------+----------+---------+----+----+
|comm|deptno|empno| ename|  hiredate|      job| mgr| sal|
+----+------+-----+------+----------+---------+----+----+
|    |    20| 7369| SMITH|1980/12/17|    CLERK|7902| 800|
| 300|    30| 7499| ALLEN| 1981/2/20| SALESMAN|7698|1600|
| 500|    30| 7521|  WARD| 1981/2/22| SALESMAN|7698|1250|
|    |    20| 7566| JONES|  1981/4/2|  MANAGER|7839|2975|
|1400|    30| 7654|MARTIN| 1981/9/28| SALESMAN|7698|1250|
|    |    30| 7698| BLAKE|  1981/5/1|  MANAGER|7839|2850|
|    |    10| 7782| CLARK|  1981/6/9|  MANAGER|7839|2450|
|    |    20| 7788| SCOTT| 1987/4/19|  ANALYST|7566|3000|
|    |    10| 7839|  KING|1981/11/17|PRESIDENT|    |5000|
|   0|    30| 7844|TURNER|  1981/9/8| SALESMAN|7698|1500|
|    |    20| 7876| ADAMS| 1987/5/23|    CLERK|7788|1100|
|    |    30| 7900| JAMES| 1981/12/3|    CLERK|7698| 950|
|    |    20| 7902|  FORD| 1981/12/3|  ANALYST|7566|3000|
|    |    10| 7934|MILLER| 1982/1/23|    CLERK|7782|1300|
+----+------+-----+------+----------+---------+----+----+

// 两种方式写
scala> empDF.write.mode("overwrite").save("/usr/local/tmp_files/parquet") 
scala> empDF.write.mode("overwrite").parquet("/usr/local/tmp_files/parquet")

// 读
scala> val emp1 = spark.read.parquet("/usr/local/tmp_files/parquet")
emp1: org.apache.spark.sql.DataFrame = [comm: string, deptno: bigint ... 6 more fields]

scala> emp1.createOrReplaceTempView("emp1")

scala> spark
res4: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@71e6bac0

scala> spark.sql("select * from emp1")
res5: org.apache.spark.sql.DataFrame = [comm: string, deptno: bigint ... 6 more fields]

scala> spark.sql("select * from emp1").show
+----+------+-----+------+----------+---------+----+----+
|comm|deptno|empno| ename|  hiredate|      job| mgr| sal|
+----+------+-----+------+----------+---------+----+----+
|    |    20| 7369| SMITH|1980/12/17|    CLERK|7902| 800|
| 300|    30| 7499| ALLEN| 1981/2/20| SALESMAN|7698|1600|
| 500|    30| 7521|  WARD| 1981/2/22| SALESMAN|7698|1250|
|    |    20| 7566| JONES|  1981/4/2|  MANAGER|7839|2975|
|1400|    30| 7654|MARTIN| 1981/9/28| SALESMAN|7698|1250|
|    |    30| 7698| BLAKE|  1981/5/1|  MANAGER|7839|2850|
|    |    10| 7782| CLARK|  1981/6/9|  MANAGER|7839|2450|
|    |    20| 7788| SCOTT| 1987/4/19|  ANALYST|7566|3000|
|    |    10| 7839|  KING|1981/11/17|PRESIDENT|    |5000|
|   0|    30| 7844|TURNER|  1981/9/8| SALESMAN|7698|1500|
|    |    20| 7876| ADAMS| 1987/5/23|    CLERK|7788|1100|
|    |    30| 7900| JAMES| 1981/12/3|    CLERK|7698| 950|
|    |    20| 7902|  FORD| 1981/12/3|  ANALYST|7566|3000|
|    |    10| 7934|MILLER| 1982/1/23|    CLERK|7782|1300|
+----+------+-----+------+----------+---------+----+----+

2.2 支持Schema合并

项目开始,表结构简单,Schema简单。随着项目越来越大,表越来越复杂。逐步向表中增加新的列。
通过这种方式,用户可以获取多个有不同Schema但相互兼容的Parquet文件。

spark.read.option(“mergeSchema”,true).parquet(“path”)

scala> val df1 = sc.makeRDD(1 to 5).map(i => (i,i*2)).toDF("single","double")
df1: org.apache.spark.sql.DataFrame = [single: int, double: int]

scala> df1.write.parquet("/usr/local/tmp_files/test_table/key=1")

scala> df1.show
+------+------+
|single|double|
+------+------+
|     1|     2|
|     2|     4|
|     3|     6|
|     4|     8|
|     5|    10|
+------+------+
 
scala> val df2 = sc.makeRDD(6 to 10).map(i=>(i,i*3)).toDF("single","triple")
df2: org.apache.spark.sql.DataFrame = [single: int, triple: int]

scala> df2.show
+------+------+
|single|triple|
+------+------+
|     6|    18|
|     7|    21|
|     8|    24|
|     9|    27|
|    10|    30|
+------+------+


scala> df2.write.parquet("/usr/local/tmp_files/test_table/key=2")
 
// 合并Schema
scala> val df3 = spark.read.option("mergeSchema",true).parquet("/usr/local/tmp_files/test_table")
df3: org.apache.spark.sql.DataFrame = [single: int, double: int ... 2 more fields]

scala> df3.printSchema
root
 |-- single: integer (nullable = true)
 |-- double: integer (nullable = true)
 |-- triple: integer (nullable = true)
 |-- key: integer (nullable = true)

scala> df3.show
+------+------+------+---+
|single|double|triple|key|
+------+------+------+---+
|     8|  null|    24|  2|
|     9|  null|    27|  2|
|    10|  null|    30|  2|
|     3|     6|  null|  1|
|     4|     8|  null|  1|
|     5|    10|  null|  1|
|     6|  null|    18|  2|
|     7|  null|    21|  2|
|     1|     2|  null|  1|
|     2|     4|  null|  1|
+------+------+------+---+
 
// 测试将key换成其他的,也可以进行合并:
scala> val df1 = sc.makeRDD(1 to 5).map(i => (i,i*2)).toDF("single","double")
df1: org.apache.spark.sql.DataFrame = [single: int, double: int]

scala> df1.write.parquet("/usr/local/tmp_files/test_table/hehe=1")

scala> val df2 = sc.makeRDD(6 to 10).map(i=>(i,i*3)).toDF("single","triple")
df2: org.apache.spark.sql.DataFrame = [single: int, triple: int]

scala> df2.write.parquet("/usr/local/tmp_files/test_table/hehe=2")

scala> val df3 = spark.read.option("mergeSchema",true).parquet("/usr/local/tmp_files/test_table")
df3: org.apache.spark.sql.DataFrame = [single: int, double: int ... 2 more fields]

scala> df3.printSchema
root
 |-- single: integer (nullable = true)
 |-- double: integer (nullable = true)
 |-- triple: integer (nullable = true)
 |-- hehe: integer (nullable = true)
  
// 测试两次使用不同名字,发现报错,无法合并:
scala> df1.write.parquet("/usr/local/tmp_files/test_table/hehe=1")
                                                                     
scala> df2.write.parquet("/usr/local/tmp_files/test_table/key=2")

scala> val df3 = spark.read.option("mergeSchema",true).parquet("/usr/local/tmp_files/test_table")
java.lang.AssertionError: assertion failed: Conflicting partition column names detected:

    Partition column name list #0: key
    Partition column name list #1: hehe
   
//将两次名字改成相同后,可以合并:
# mv key\=2/ hehe\=2/
 
scala> val df3 = spark.read.option("mergeSchema",true).parquet("/usr/local/tmp_files/test_table")
df3: org.apache.spark.sql.DataFrame = [single: int, double: int ... 2 more fields]

scala> df3.show
+------+------+------+----+
|single|double|triple|hehe|
+------+------+------+----+
|     8|  null|    24|   2|
|     9|  null|    27|   2|
|    10|  null|    30|   2|
|     3|     6|  null|   1|
|     4|     8|  null|   1|
|     5|    10|  null|   1|
|     6|  null|    18|   2|
|     7|  null|    21|   2|
|     1|     2|  null|   1|
|     2|     4|  null|   1|
+------+------+------+----+

3. JSON文件

需要注意的是,这里的JSON文件不是常规的JSON格式。JSON文件每一行必须包含一个独立的、自满足有效的JSON对象。如果用多行描述一个JSON对象,会导致读取出错。

emp.json

{"empno":7369,"ename":"SMITH","job":"CLERK","mgr":"7902","hiredate":"1980/12/17","sal":800,"comm":"","deptno":20}
{"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":"7698","hiredate":"1981/2/20","sal":1600,"comm":"300","deptno":30}
scala> val peopleDF = spark.read.json("/usr/local/tmp_files/people.json")
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> peopleDF.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

scala> peopleDF.createOrReplaceTempView("people")

scala> spark.sql("select name from people where age=19").show
+------+
|  name|
+------+
|Justin|
+------+

4. JDBC

通过JDBC操作关系型数据库。MySQL中的数据。命令行中必须加入jars和Driver才能连接MySQL数据库。

./spark-shell --master spark://node3:7077 --jars /usr/local/tmp_files/mysql-connecto8.0.11.jar --driver-class-path /usr/local/tmp_files/mysql-connector-java-8.0.11.jar 
./spark-shell --jars /opt/top_resources/test/mysql-connector-java-5.1.38.jar --driver-class-path /opt/top_resources/test/mysql-connector-java-5.1.38.jar

4.1 方式一:read.format(“jdbc”)

scala> val mysqlDF = spark.read.format("jdbc").option("url","jdbc:mysql://topnpl200:3306/topdb_dev?serverTimezone=UTC&characterEncoding=utf-8").option("user","root").option("password","TOPtop123456").option("driver","com.mysql.jdbc.Driver").option("dbtable","test_table").load  
scala> mysqlDF.show

如果是Oracle:
.option(“url”,“jdbc:oracle:thin@topnpl200:1521/orcl”)
.option(“driver”,“oracle.jdbc.OracleDriver”)

4.2 方式二:定义Properties类

scala> import java.util.Properties
scala> val mysqlProps = new Properties()
scala> mysqlProps.setProperty("user","root")
scala> mysqlProps.setProperty("password","TOPtop123456")
scala> mysqlProps.setProperty("driver","com.mysql.jdbc.Driver")
scala> val mysqlDF1 = spark.read.jdbc("jdbc:mysql://topnpl200:3306/topdb_dev?serverTimezone=UTC&characterEncoding=utf-8","test_table",mysqlProps)
scala> mysqlDF1.show

5. Hive

可以把Hive 中的数据,读取到Spark SQL 中,使用Spark SQL 来处理。比较常见的模式。

5.1 配置Spark SQL支持Hive

  1. 搭建好Hive。

  2. 拷贝Hive、Hadoop的配置到Spark配置中。
    Hive中的hive-site.xml
    Hadoop中的core-site.xml,hdfs-site.xml
    到 SPARK_HOME/conf 目录下
    这里的Hive配置的是多个Hive客户端,一个Hive服务器端。然后Hive服务器端连接MySQL。这样的好处是,MySQL的信息只有Hive的服务器端知道,并不会把MySQL的信息暴露给Hive的多个客户端。

    <!-- Hive的Server端配置 -->
    <configuration> 
      <property> 
        <name>hive.metastore.warehouse.dir</name>  
        <value>/user/yibo/hive/warehouse</value> 
      </property>  
      <property> 
        <name>javax.jdo.option.ConnectionURL</name>  
        <value>jdbc:mysql://192.168.109.1:3306/hive?serverTimezone=UTC</value> 
      </property>  
      <property> 
        <name>javax.jdo.option.ConnectionDriverName</name>  
        <value>com.mysql.jdbc.Driver</value> 
      </property>  
      <property> 
        <name>javax.jdo.option.ConnectionUserName</name>  
        <value>root</value> 
      </property>  
      <property> 
        <name>javax.jdo.option.ConnectionPassword</name>  
        <value>123456</value> 
      </property>  
      <property> 
        <name>hive.querylog.location</name>  
        <value>/data/hive/iotmp</value> 
      </property>  
      <property> 
        <name>hive.server2.logging.operation.log.location</name>  
        <value>/data/hive/operation_logs</value> 
      </property>  
      <property> 
        <name>datanucleus.readOnlyDatastore</name>  
        <value>false</value> 
      </property>  
      <property> 
        <name>datanucleus.fixedDatastore</name>  
        <value>false</value> 
      </property>  
      <property> 
        <name>datanucleus.autoCreateSchema</name>  
        <value>true</value> 
      </property>  
      <property> 
        <name>datanucleus.autoCreateTables</name>  
        <value>true</value> 
      </property>  
      <property> 
        <name>datanucleus.autoCreateColumns</name>  
        <value>true</value> 
      </property> 
    <property>
        <name>datanucleus.schema.autoCreateAll</name>
        <value>true</value>
      </property>
    </configuration>
    
    <!-- Hive的Client端配置 -->
    <configuration> 
      <property> 
        <name>hive.metastore.warehouse.dir</name>  
        <value>/user/yibo/hive/warehouse</value> 
      </property>  
      <property> 
        <name>hive.metastore.local</name>  
        <value>false</value> 
      </property>  
      <property> 
        <name>hive.metastore.uris</name>  
        <value>thrift://192.168.109.132:9083</value> 
      </property> 
    </configuration>
    
  3. 启动Spark及相关组件。

    # 启动HDFS、YARN
    start-all
    # 如果server与clinet分离,启动Hive Server端命令:
    hive-server
    ./hive --service metastore
    # 启动Spark集群
    start-all
    # Spark Shell,指定MySQL的驱动。(Hive的元信息保存在MySQL中)
    ./spark-shell --master spark://node3:7077 --jars /usr/local/tmp_files/mysql-connector-java-8.0.11.jar    
    

5.2 使用Spark SQL操作 Hive

// 显示Hive中的表
scala> spark.sql("show tables").show
+--------+-----------+-----------+
|database|  tableName|isTemporary|
+--------+-----------+-----------+
| default|emp_default|      false|
+--------+-----------+-----------+

scala> spark.sql("select * from company.emp limit 10").show
+-----+------+---------+----+----------+----+----+------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH|    CLERK|7902|1980/12/17| 800|   0|    20|
| 7499| ALLEN| SALESMAN|7698| 1981/2/20|1600| 300|    30|
| 7521|  WARD| SALESMAN|7698| 1981/2/22|1250| 500|    30|
| 7566| JONES|  MANAGER|7839|  1981/4/2|2975|   0|    20|
| 7654|MARTIN| SALESMAN|7698| 1981/9/28|1250|1400|    30|
| 7698| BLAKE|  MANAGER|7839|  1981/5/1|2850|   0|    30|
| 7782| CLARK|  MANAGER|7839|  1981/6/9|2450|   0|    10|
| 7788| SCOTT|  ANALYST|7566| 1987/4/19|3000|   0|    20|
| 7839|  KING|PRESIDENT|7839|1981/11/17|5000|   0|    10|
| 7844|TURNER| SALESMAN|7698|  1981/9/8|1500|   0|    30|
+-----+------+---------+----+----------+----+----+------+

// 通过Spark SQL创建Hive中的表
scala> spark.sql("create table company.emp_0823( empno Int, ename String, job String, mgr String, hiredate String, sal Int, comm String, deptno Int ) row format delimited fields terminated by ','")
res2: org.apache.spark.sql.DataFrame = []

scala> spark.sql("load data local inpath '/usr/local/tmp_files/emp.csv' overwrite into table company.emp_0823")
19/08/23 05:14:50 ERROR KeyProviderCache: Could not find uri with key [dfs.encryption.key.provider.uri] to create a keyProvider !!
res3: org.apache.spark.sql.DataFrame = []

scala> spark.sql("load data local inpath '/usr/local/tmp_files/emp.csv' into table company.emp_0823")
res4: org.apache.spark.sql.DataFrame = []

scala> spark.sql("select * from company.emp_0823 limit 10").show
+-----+------+---------+----+----------+----+----+------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH|    CLERK|7902|1980/12/17| 800|   0|    20|
| 7499| ALLEN| SALESMAN|7698| 1981/2/20|1600| 300|    30|
| 7521|  WARD| SALESMAN|7698| 1981/2/22|1250| 500|    30|
| 7566| JONES|  MANAGER|7839|  1981/4/2|2975|   0|    20|
| 7654|MARTIN| SALESMAN|7698| 1981/9/28|1250|1400|    30|
| 7698| BLAKE|  MANAGER|7839|  1981/5/1|2850|   0|    30|
| 7782| CLARK|  MANAGER|7839|  1981/6/9|2450|   0|    10|
| 7788| SCOTT|  ANALYST|7566| 1987/4/19|3000|   0|    20|
| 7839|  KING|PRESIDENT|7839|1981/11/17|5000|   0|    10|
| 7844|TURNER| SALESMAN|7698|  1981/9/8|1500|   0|    30|
+-----+------+---------+----+----------+----+----+------+
 类似资料: