<!-- 连接mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!-- scala-jdbc-->
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc_2.12</artifactId>
<version>3.3.2</version>
</dependency>
<!-- 自动解析*.conf 配置信息-->
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc-config_2.12</artifactId>
<version>3.3.2</version>
</dependency>
♥♥♥注意:
1.配置信息必须是【.conf】文件,否则scalike不能自动解析
3.格式为【db.数据库名.driver】…
2.与java连接MySQL配置信息有区别:等于号后面有双引号
db.schooldb.driver="com.mysql.jdbc.Driver"
db.schooldb.url="jdbc:mysql://localhost:3306/schooldb?useSSL=false&characterEncoding=utf-8"
db.schooldb.username="root"
db.schooldb.password="chenwei0302"
import scalikejdbc.config.DBs
import scalikejdbc.{NamedDB, SQL}
object JdbcTest {
def main(args: Array[String]): Unit = {
//加载配置信息:自动解析*.conf下db.shcool._的所有参数
val db:Symbol=Symbol.apply("school")
DBs.setup(db)
/**
*
* readOnly:查询
* autoCommit:非事务增删改
* localTx:事务操作
*/
//创建查询的操作方法
def select(where:String="")={
val builder = new StringBuilder("select * from studentinfo")
if(where.trim.length>0){
builder.append(where)
}
NamedDB(db).readOnly{
//SQL()解析sql语句
//bind赋给sql语句中的参数?传值
//map()执行查询操作并逐行处理
//list()将map()的结果以列表返回
//apply()提取执行结果
implicit session=>SQL(builder.toString).map(
//查询sql执行完毕后,逐行Row提取,Row就是数据表中的一行记录
row=>(
row.string("StuId"),
row.string("StuName"),
row.string("StuAge"),
row.string("StuSex"),
)
).list().apply()
}
}
//单条增操作
def add (TeacherName:String) ={
NamedDB(db).autoCommit{
implicit session => SQL("insert into teacherinfo (TeacherName) value (?)")
.bind(TeacherName).update().apply()
}
}
//删除操作
def delete (where:String) ={
val builder = new StringBuilder("delete from teacherinfo where ")
builder.append(where)
NamedDB(db).autoCommit{
implicit session => SQL(builder.toString()).update().apply()
}
}
//修改操作
def update (TeacherName:String,TeacherId:Int) ={
var builder = new StringBuilder("update teacherinfo set TeacherName = ? ")
val where ="where TeacherId = "
builder.append(where)
builder.append(TeacherId)
NamedDB(db).autoCommit{
println(builder.toString())
implicit session => SQL(builder.toString()).bind(TeacherName).update().apply()
}
}
//批处理 localTx:事务处理
def batchInsert (arr: Array[String])={
NamedDB(db).localTx{
implicit session => {
var affectedRows =0;
arr.foreach(x=>{
//SQL()解析sql语句
//bind赋给sql语句中的参数?传值
//update()执行增删改操作
//apply()提取执行结果
affectedRows = affectedRows + SQL("insert into teacherinfo(TeacherName) values(?)")
.bind(x).update().apply()
})
affectedRows
}
}
}
println(update("小欣", 3))
//println(delete("TeacherId = 6"))
// println(add("小花"))
//println(batchInsert(Array("小明", "小红")))
// select().foreach(println)
}
}
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.48</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
♥♥♥注意:
PRO.setProperty(“user”,“root”) 中 是【user】,不是userName
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}
object Task01sql {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.master("local[*]").appName("spark_sql_01")
.getOrCreate()
val URL = "jdbc:mysql://192.168.64.180:3306/school?useSSL=false"
val TABLE = "teacherinfo"
val PRO = new Properties()
PRO.setProperty("driver","com.mysql.jdbc.Driver")
PRO.setProperty("user","root")
PRO.setProperty("password","******")
val frame: DataFrame = spark.read.jdbc(URL, TABLE, PRO)
frame.createTempView("tv_cmm_style")
spark.sql(
"""
|select * from tv_cmm_style
|""".stripMargin)
//.printSchema() //查看表结构
//.show() //查看数据 最多20条
.select("*")
.toJavaRDD
.saveAsTextFile("")
spark.close()
}
}
♥♥♥注意:
算子的使用必须要导入两个包
import org.apache.spark.sql.functions._ 导入sql 函数(如:聚合函数,窗口函数,window子句)
import spark.implicits._ 导入spark session 的隐式转换包 【注意此包依赖对象,需要在创建SparkSession对象后再导包】
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}
//sql 函数(如:聚合函数,窗口函数,window子句)
import org.apache.spark.sql.functions._
object Task01sql {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.master("local[*]").appName("spark_sql_01")
.getOrCreate()
//spark session 的隐式转换包【注意此包依赖对象,需要在创建SparkSession对象后再导包】
import spark.implicits._
val URL = "jdbc:mysql://192.168.64.180:3306/school?useSSL=false"
val TABLE = "teacherinfo"
val PRO = new Properties()
PRO.setProperty("driver","com.mysql.jdbc.Driver")
PRO.setProperty("user","root")
PRO.setProperty("password","********")
val frame: DataFrame = spark.read.jdbc(URL, TABLE, PRO)
frame.select($"TeacherId",
concat_ws(",",$"TeacherName",$"TeacherId").as("name_id")) //写$是为了使用函数
.where($"TeacherId".between(2,4))
.show()*/
spark.close()
}
}
基本格式及算子
frame.select($"field",
$"field".as("alias")
$"field".cast("type(scala|sql)")...) //写$是为了使用函数
.where(condition) //简单条件
$"field".eq|gt|lt|le|ge|ne(vlaue)
$"field".between(min,max)
$"field".like
$"field".isNull
$"field".isNotNull
$"field".isNaN //是否非数字
$"field".isIn(list:Any*)
$"field".InCollection(list:Iterable][T])
$"field"===value
//逻辑运算
与(...)and(...) 或(...)or(...) 非not(...)
.groupBy(...)
.agg(sql_func) //sum( $"field").as("alias")...
.sort($"field".desc|.asc...)
.limit(num:Int)
.show()
数据源可以来自MySQL、hive等数据库,也可来自文件、Json、sqequence 等。
可用过【样例类+RDD创建Dataframe】的方式,来操作数据
val value: DataFrame = spark.createDataFrame(rdd)
val value: Dataset[Record] = spark.createDataset(rdd)
import org.apache.spark.sql.{DataFrame, SparkSession}
//sql 函数(如:聚合函数,窗口函数,window子句)
import org.apache.spark.sql.functions._
object Task01sql {
case class Record(shopId:String,date:String,volume:String)
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.master("local[*]").appName("spark_sql_01")
.getOrCreate()
//spark session 的隐式转换包
import spark.implicits._
//sparkSession内置一个SparkContext
val sc = spark.sparkContext
val rdd = sc.textFile("file:///D:\\Note\\SPARK\\cha01\\file\\sales5.txt", 5)
.mapPartitions(p => {
p.map(line => {
val ps = line.split(",")
Record(ps(0), ps(1), ps(2))
})
})
val rx = "(.*?)-(.*?)-(.*?) .*"
spark.createDataFrame(rdd)
.select($"shopId".cast("int"),
regexp_extract($"date",rx,1).as("year"),
regexp_extract($"date",rx,2).as("month"),
regexp_extract($"date",rx,3).as("day"),
$"volume")
//日聚合
.groupBy($"shopId",$"year",$"month",$"day")
.agg(sum($"volume").as("sumVolume")
,count($"volume").as("cntVolume"))
//月聚合
.groupBy($"shopId",$"year",$"month")
.agg(round(sum($"sumVolume"),2).as("sumVolume")
,sum($"cntVolume").as("cntVolume"))
.filter($"sumVolume".geq(100000))
.sort($"sumVolume".desc,$"cntVolume".asc)
.limit(10)
.show()
sc.stop()
spark.close()
}
}