Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame,通过对DataFrame 一系列的计算后,还可以将数据再写回关系型数据库中。如果使用 spark-shell 操作,可在启动 shell 时指定相关的数据库驱动路径或者将相关的数据库驱动放到 spark 的类路径下。
bin/spark-shell
--jars mysql-connector-java-5.1.27-bin.jar
我们这里只演示在 Idea 中通过 JDBC 对 Mysql 进行操作
1)导入依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
2)读取数据
package com.sql
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
object Spark_JDBC {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark_JDBC")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//读取数据
val df = spark.read.format("jdbc")
.option("url","jdbc:mysql://localhost:3306/huanhuan")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "123")
.option("dbtable", "emp")
.load()
df.show()
spark.stop()
}
}
3)写数据
package com.sql
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
object Spark_JDBC {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark_JDBC")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//写数据
df.write.format("jdbc")
.option("url","jdbc:mysql://localhost:3306/huanhuan")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "123")
.option("dbtable", "emp1")
.mode(SaveMode.Append)
.save()
spark.stop()
}
}