原来用jetty 和 restful 做过驱动spark的计算框架,最近想用spring boot +scala + spark 重新做一个,一下是pom.xml 和代码分享
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.i-tudou.bd</groupId>
<artifactId>spring-spark-demo</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>A Camel Scala Route</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<scala.version>2.11</scala.version>
<spark.version>2.4.0</spark.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.3.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>
application.scala 分享
package com.itudou.bd
import org.springframework.boot.SpringApplication
import org.springframework.boot.autoconfigure.{EnableAutoConfiguration, SpringBootApplication}
import org.springframework.context.annotation.{ComponentScan, Configuration}
@Configuration
@EnableAutoConfiguration
@ComponentScan
@SpringBootApplication
class Config
object springsparkdemoApplication extends App{
SpringApplication.run(classOf[Config])
}
sparkconfig.scala 分享
package com.itudou.bd.config
import org.apache.spark.{SparkConf, SparkContext}
import org.springframework.context.annotation.{Bean, Configuration}
@Configuration
class Sparkconfig {
private val sparkHome = "."
private val appName = "sparkTest"
private val master = "local"
@Bean
def SparkConf: SparkConf = {
val conf = new SparkConf().setAppName(appName).setMaster(master)
return conf
}
@Bean
def SparkContext = new SparkContext(SparkConf)
}
DataController.scala 分享
package com.itudou.bd.Controller
import java.util.Properties
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.web.bind.annotation._
import scala.util.parsing.json.{JSON, JSONObject}
@RestController
@RequestMapping (value = Array("DataController/data/"))
@CrossOrigin
class DataController {
@Autowired
var sc:SparkContext = _
@GetMapping(value = Array("test"))
def test =
{
val url = "jdbc:mysql://10.1.3.49:3309/tdgistaskDB?useUnicode=true&characterEncoding=UTF-8&user=root&password=123";
val prop = new Properties();
val sqlContext = new SQLContext(sc);
val df = sqlContext.read.jdbc(url, "t_task", prop);
df.createOrReplaceTempView("t_task")
//使用SQL语句进行查询
var df1 = sqlContext.sql("select * from t_task where parent_id = 0")
println("1.------------->" + df1.show().toString())
//println("1.------------->" + df1.rdd.partitions.size)
JSON.parseFull("{lige:1}")
}
}