当前位置: 首页 > 工具软件 > Scala.Rx > 使用案例 >

SCALA、SPARK操作Mysql

左丘成天
2023-12-01

SCALA、SPARK操作Mysql

SCALA操作Mysql

1.Jar包依赖

	<!-- 连接mysql-->
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.47</version>
    </dependency>
    <!-- scala-jdbc-->
    <dependency>
      <groupId>org.scalikejdbc</groupId>
      <artifactId>scalikejdbc_2.12</artifactId>
      <version>3.3.2</version>
    </dependency>
    <!-- 自动解析*.conf 配置信息-->
    <dependency>
      <groupId>org.scalikejdbc</groupId>
      <artifactId>scalikejdbc-config_2.12</artifactId>
      <version>3.3.2</version>
    </dependency>

2.创建配置信息

♥♥♥注意:
1.配置信息必须是【.conf】文件,否则scalike不能自动解析
3.格式为【db.数据库名.driver】…
2.与java连接MySQL配置信息有区别:等于号后面有双引号

db.schooldb.driver="com.mysql.jdbc.Driver"
db.schooldb.url="jdbc:mysql://localhost:3306/schooldb?useSSL=false&characterEncoding=utf-8"
db.schooldb.username="root"
db.schooldb.password="chenwei0302"

3.连接MySQL,进行增删改操作

import scalikejdbc.config.DBs
import scalikejdbc.{NamedDB, SQL}

object JdbcTest {
  def main(args: Array[String]): Unit = {
    //加载配置信息:自动解析*.conf下db.shcool._的所有参数
    val db:Symbol=Symbol.apply("school")
    DBs.setup(db)


    /**
     *
     * readOnly:查询
     * autoCommit:非事务增删改
     * localTx:事务操作
     */

      //创建查询的操作方法
    def select(where:String="")={
      val builder = new StringBuilder("select * from studentinfo")
      if(where.trim.length>0){
        builder.append(where)
      }
      NamedDB(db).readOnly{
        //SQL()解析sql语句
        //bind赋给sql语句中的参数?传值
        //map()执行查询操作并逐行处理
        //list()将map()的结果以列表返回
        //apply()提取执行结果
        implicit session=>SQL(builder.toString).map(
          //查询sql执行完毕后,逐行Row提取,Row就是数据表中的一行记录
          row=>(
            row.string("StuId"),
            row.string("StuName"),
            row.string("StuAge"),
            row.string("StuSex"),
          )
        ).list().apply()
      }
    }
    //单条增操作
    def add (TeacherName:String) ={
      NamedDB(db).autoCommit{
        implicit session => SQL("insert into teacherinfo (TeacherName) value (?)")
            .bind(TeacherName).update().apply()
      }
    }

    //删除操作
    def delete (where:String) ={
      val builder = new StringBuilder("delete from teacherinfo where ")
      builder.append(where)
      NamedDB(db).autoCommit{
        implicit session => SQL(builder.toString()).update().apply()

      }
    }

    //修改操作
    def update (TeacherName:String,TeacherId:Int) ={
      var builder = new StringBuilder("update teacherinfo set TeacherName = ? ")
      val where ="where TeacherId = "
      builder.append(where)
      builder.append(TeacherId)
      NamedDB(db).autoCommit{
            println(builder.toString())
        implicit session => SQL(builder.toString()).bind(TeacherName).update().apply()
      }
    }


    //批处理  localTx:事务处理
    def batchInsert (arr: Array[String])={
      NamedDB(db).localTx{
        implicit  session => {
          var affectedRows =0;
          arr.foreach(x=>{
            //SQL()解析sql语句
            //bind赋给sql语句中的参数?传值
            //update()执行增删改操作
            //apply()提取执行结果
            affectedRows = affectedRows + SQL("insert into teacherinfo(TeacherName) values(?)")
              .bind(x).update().apply()
          })
          affectedRows
        }
      }
    }

    println(update("小欣", 3))
    //println(delete("TeacherId = 6"))
    // println(add("小花"))
    //println(batchInsert(Array("小明", "小红")))
    // select().foreach(println)


  }
}

SPARK操作Mysql

1.Jar包依赖

	<dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.48</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-jdbc</artifactId>
      <version>3.1.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>3.1.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.12</artifactId>
      <version>3.1.2</version>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.30</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.30</version>
    </dependency>

2.连接操作MySQL

2.1基于sql语句对数据库的操作

♥♥♥注意:
PRO.setProperty(“user”,“root”) 中 是【user】,不是userName

import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}

object Task01sql {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .master("local[*]").appName("spark_sql_01")
      .getOrCreate()

    val URL = "jdbc:mysql://192.168.64.180:3306/school?useSSL=false"
    val TABLE = "teacherinfo"
    val PRO = new Properties()
    PRO.setProperty("driver","com.mysql.jdbc.Driver")
    PRO.setProperty("user","root")
    PRO.setProperty("password","******")

    val frame: DataFrame = spark.read.jdbc(URL, TABLE, PRO)
    frame.createTempView("tv_cmm_style")

    spark.sql(
      """
        |select * from tv_cmm_style
        |""".stripMargin)
        //.printSchema()  //查看表结构
        //.show()        //查看数据 最多20条
        .select("*")
        .toJavaRDD
        .saveAsTextFile("")

    spark.close()

  }
}
2.1基于dataset|dataframe的sql算子对数据库的操作

♥♥♥注意:
算子的使用必须要导入两个包
import org.apache.spark.sql.functions._ 导入sql 函数(如:聚合函数,窗口函数,window子句)
import spark.implicits._ 导入spark session 的隐式转换包 【注意此包依赖对象,需要在创建SparkSession对象后再导包】

import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}
//sql 函数(如:聚合函数,窗口函数,window子句)
import org.apache.spark.sql.functions._

object Task01sql {
  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder()
      .master("local[*]").appName("spark_sql_01")
      .getOrCreate()
    //spark session 的隐式转换包【注意此包依赖对象,需要在创建SparkSession对象后再导包】
    import spark.implicits._

    val URL = "jdbc:mysql://192.168.64.180:3306/school?useSSL=false"
    val TABLE = "teacherinfo"
    val PRO = new Properties()
    PRO.setProperty("driver","com.mysql.jdbc.Driver")
    PRO.setProperty("user","root")
    PRO.setProperty("password","********")

    val frame: DataFrame = spark.read.jdbc(URL, TABLE, PRO)
  
    frame.select($"TeacherId",
    concat_ws(",",$"TeacherName",$"TeacherId").as("name_id")) //写$是为了使用函数
      .where($"TeacherId".between(2,4))
      .show()*/
    spark.close()

  }
}

基本格式及算子

 frame.select($"field",
               $"field".as("alias")
              $"field".cast("type(scala|sql)")...) //写$是为了使用函数
      .where(condition) //简单条件
						  $"field".eq|gt|lt|le|ge|ne(vlaue)
						  $"field".between(min,max)
						  $"field".like
						  $"field".isNull
						  $"field".isNotNull
						  $"field".isNaN  //是否非数字
						  $"field".isIn(list:Any*)
						  $"field".InCollection(list:Iterable][T])
						  $"field"===value
						//逻辑运算
						  与(...)and(...) 或(...)or(...) 非not(...)
		.groupBy(...)
		.agg(sql_func)  //sum( $"field").as("alias")...
		.sort($"field".desc|.asc...)
		.limit(num:Int)
      .show() 

3.spark扩展

数据源可以来自MySQL、hive等数据库,也可来自文件、Json、sqequence 等。
可用过【样例类+RDD创建Dataframe】的方式,来操作数据

val value: DataFrame = spark.createDataFrame(rdd)
val value: Dataset[Record] = spark.createDataset(rdd)
import org.apache.spark.sql.{DataFrame, SparkSession}
//sql 函数(如:聚合函数,窗口函数,window子句)
import org.apache.spark.sql.functions._

object Task01sql {

  case class Record(shopId:String,date:String,volume:String)

  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .master("local[*]").appName("spark_sql_01")
      .getOrCreate()
    //spark session 的隐式转换包
    import spark.implicits._
    
    //sparkSession内置一个SparkContext
    val sc = spark.sparkContext
    val rdd = sc.textFile("file:///D:\\Note\\SPARK\\cha01\\file\\sales5.txt", 5)
      .mapPartitions(p => {
        p.map(line => {
          val ps = line.split(",")
          Record(ps(0), ps(1), ps(2))
        })
      })

    val rx = "(.*?)-(.*?)-(.*?) .*"
    spark.createDataFrame(rdd)
      .select($"shopId".cast("int"),
        regexp_extract($"date",rx,1).as("year"),
        regexp_extract($"date",rx,2).as("month"),
        regexp_extract($"date",rx,3).as("day"),
        $"volume")
      //日聚合
      .groupBy($"shopId",$"year",$"month",$"day")
      .agg(sum($"volume").as("sumVolume")
        ,count($"volume").as("cntVolume"))
      //月聚合
      .groupBy($"shopId",$"year",$"month")
      .agg(round(sum($"sumVolume"),2).as("sumVolume")
        ,sum($"cntVolume").as("cntVolume"))
      .filter($"sumVolume".geq(100000))
      .sort($"sumVolume".desc,$"cntVolume".asc)
      .limit(10)
      .show()
      
    sc.stop()
    spark.close()

  }
}

 类似资料: