「·部署·」安装与使用 Spark-jobServer
公冶元青
2023-12-01
Spark-jobserver 提供了一个 RESTful 接口来提交和管理 spark 的 jobs、jars 和 job contexts。这个项目包含了完整的 Spark job server 的项目,包括单元测试和项目部署脚本。
特性
“Spark as Service”:针对 job 和 contexts 的各个方面提供了 REST 风格的 api 接口进行管理
支持 SparkSQL、Hive、Streaming Contexts/jobs 以及定制 job contexts!具体参考Contexts
通过集成 Apache Shiro 来支持 LDAP 权限验证
通过长期运行的job contexts支持亚秒级别低延迟的任务
可以通过结束 context 来停止运行的作业(job)
分割 jar 上传步骤以提高 job 的启动
异步和同步的 job API,其中同步 API 对低延时作业非常有效
支持 Standalone Spark 和 Mesos、yarn
Job 和 jar 信息通过一个可插拔的 DAO 接口来持久化
对RDD或DataFrame对象命名并缓存,通过该名称获取RDD或DataFrame。这样可以提高对象在作业间的共享和重用
支持 Scala 2.10 版本和 2.11 版本
官方文档:https://github.com/spark-jobserver/spark-jobserver#api
------------------------------------------安装并启动jobServer:
(1)
jobServer依赖sbt,所以必须先装好sbt.
ubuntu安装sbt:
echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 642AC823
sudo apt-get update
sudo apt-get install sbt
(2)
部署.(spark on yarn 或者 local 模式)
https://github.com/spark-jobserver/spark-jobserver 将spark-job-server source clone到本地,解压,进入解压后的目录:
1.拷贝 conf/local.sh.template 文件到 local.sh 。
:修改属性
--------------------------------------------------
DEPLOY_HOSTS="biyuzhe"
APP_USER=raini
APP_GROUP=raini
JMX_PORT=9999
# optional SSH Key to login to deploy server
#SSH_KEY=/path/to/keyfile.pem
INSTALL_DIR=/home/raini/spark/spark-jobserver
LOG_DIR=/home/raini/spark/spark-jobserver/log
PIDFILE=spark-jobserver.pid
JOBSERVER_MEMORY=1G
SPARK_VERSION=1.6.3
MAX_DIRECT_MEMORY=1G
SPARK_HOME=/home/raini/spark
SPARK_CONF_DIR=$SPARK_HOME/conf
# Only needed for Mesos deploys
SPARK_EXECUTOR_URI=/home/raini/spark/spark-1.6.3.tar.gz
# Only needed for YARN running outside of the cluster
# You will need to COPY these files from your cluster to the remote machine
# Normally these are kept on the cluster in /etc/hadoop/conf
# YARN_CONF_DIR=/pathToRemoteConf/conf
# HADOOP_CONF_DIR=/pathToRemoteConf/conf
#
# Also optional: extra JVM args for spark-submit
# export SPARK_SUBMIT_OPTS+="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5433"
SCALA_VERSION=2.10.6 # or 2.11.6
-------------------------------------------------
2.拷贝 config/shiro.ini.template 文件到 shiro.ini。
备注: 仅需 authentication = on时执行这一步。现在没有shiro.ini.template
3.拷贝 config/local.conf.template 到 <environment>.conf。
备注: enviroment这里为local,即local.conf,本地模式运行spark;可以是yarn。enviroment中有webui和相关的环境变量信息。
4.执行 bin/server_package.sh local,这一步将job-server以及配置文件打包,并一同推送到配置的远程服务器上。
5.在远程服务器上部署的文件目录下通过执行 server_start.sh 启动服务,如需关闭服务可执行 server_stop.sh。
(注意执行server_start.sh的用户默认情况下为spark-job-server提交任务到yarn的用户)
报错:
raini@biyuzhe:~/spark/spark-jobserver$ bin/server_start.sh
ls: 无法访问'/home/raini/spark/spark-jobserver/bin/*.conf': 没有那个文件或目录
No configuration file found
解决:
6.需要把config下的local.conf复制到INSTALL_DIR(~/spark/spark-jobserver安装目录)下面
,改名为local.conf(虽然名字一样,但是也要进行这一步,不然还是报错找不到文件),并修改其中的master以及两个路径。
(1)
# 本来没有的,我自己加的
jar-store-rootdir = /home/raini/spark/jars
(2)
filedao {
rootdir = /home/raini/spark/jars/filedao/data
}
datadao {
rootdir = /home/raini/spark/jars/upload
}
sqldao {
rootdir = /home/raini/spark/jars/sqldao/data
}
jar-store-rootdir = /var/lib/spark/jars
rootdir = /var/lib/spark/filedao
7.查看spark-job-server是否在运行:ps -ef | grep spark-job-server
raini 10762 10761 6 10:43 pts/20 00:00:22 /home/raini/app/jdk/bin/java ...
raini 10338 7178 0 10:26 pts/20 00:00:00 grep --color=auto spark-job-server
8.Web查看:http://localhost:8090/
-------------------------------------------------------.使用测试
curl -X POST "http://localhost:8090/contexts/test-context2"
curl --data-binary @/home/raini/IdeaProjects/SparkJobServer/target/Test-1.0-SNAPSHOT.jar http://localhost:8090/jars/test-job2
curl -d "file:/home/raini/nfs3.err" 'localhost:8090/jobs?appName=test-job2&classPath=SparkJob.WordCount&context=test-context2&sync=true'
spark-submit --class SparkJob.WordCount --master local[1] /home/raini/IdeaProjects/SparkJobServer/target/Test-1.0-SNAPSHOT.jar file:///home/raini/文档/wordcount.scala
sparkJobSever启动是会常驻一个SparkSubmit进程,我们在创建context上下文的时候可以引起这个进程因此不用手动提交Job
(1).创建一个context(名字叫test-context)
raini@biyuzhe:~/spark/spark-jobserver$ curl -d "" 'localhost:8090/contexts/test-context?num-cpu-cores=2&memory-per-node=1G'
{
"status": "SUCCESS",
"result": "Context initialized"
}
(注:以上的Post请求也可 http://localhost:8090/contexts/test-context (postman模拟))
(2).上传jar包到Job-Server
raini@biyuzhe:~/spark/spark-jobserver$ curl --data-binary @/home/raini/IdeaProjects/SparkJobServer/target/Test-1.0-SNAPSHOT.jar localhost:8090/jars/sparkTest
{
"status": "SUCCESS",
"result": "Jar uploaded"
}
(注:localhost:8090/jars/appName <-- 上传jar包的别名) 打包:raini@biyuzhe:~/IdeaProjects/SparkJobServer$ mvn clean package
(3).调用jar
curl -d "input.string = a b c a b see" 'localhost:8090/jobs?appName=test(并不是程序中设置的appName,而是Jars名称)&classPath=SparkJobServer.WordCount&context=test-context&sync=true'
curl -d "" 'localhost:8090/jobs?appName=WordCount&classPath=SparkJob.WordCount&context=test-context&sync=true'
{
"result": {
"a": 2,
"b": 2,
"c": 1,
"see": 1
}
}
spark-submit --class SparkJob.WordCount --master local[1] /home/raini/IdeaProjects/SparkJobServer/target/Test-1.0-SNAPSHOT.jar
-----------------------------------------------------------使用JobServer自带的例子做测试:
(1)
测试job执行,这里我们直接使用job-server的test包进行测试
raini@biyuzhe:~/spark/spark-jobserver$ sbt job-server-tests/package
[info] Packaging /home/raini/app/spark-1.6.3-bin-hadoop2.6/spark-jobserver/job-server-tests/target/scala-2.10/job-server-tests_2.10-0.7.0-SNAPSHOT.jar ...
[success] Total time: 20 s, completed 2017-3-6 14:22:13
(2)
编译完成后,将打包的jar文件通过REST接口上传
REST接口的API如下:
GET /jobs 查询所有job
POST /jobs 提交一个新job
GET /jobs/ 查询某一任务的结果和状态
GET /jobs//config
$ curl --data-binary @job-server-tests/target/scala-2.10/job-server-tests_2.10-0.7.0-SNAPSHOT.jar localhost:8090/jars/test
{
"status": "SUCCESS",
"result": "Jar uploaded"
}
(3)
# 查看提交的jar
SHELL$ curl localhost:8090/jars/
{
"test": "2017-03-06T14:26:26.818+08:00",
"test-job2": "2017-03-06T14:06:40.977+08:00",
"sparkTest": "2017-03-06T13:10:12.281+08:00"
}
(4)
# 提交job
提交的appName为test,class为spark.jobserver.WordCountExample
SHELL$ curl -d "input.string = hello job server 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample'
{
"duration": "0.703 secs",
"classPath": "spark.jobserver.WordCountExample",
"startTime": "2017-03-06T14:33:34.250+08:00",
"context": "f02e5922-spark.jobserver.WordCountExample", (Web端查看)
"result": {
"job": 1,
"hello": 1,
"server": 1
},
"status": "FINISHED",
"jobId": "14be72c5-b47d-4023-9af6-a8f4497bd21a"
}
OK
--------------------------------------------------------使用JobSERver在代码上的改变:
要创建一个JobServer项目,需要下面的流程。
1. 首先添加依赖,在build.sbt中加入jobserver和spark core的package
resolvers += "Job Server Bintray" at "http://dl.bintray.com/spark-jobserver/maven"
libraryDependencies += "spark.jobserver" % "job-server-api" % "0.4.0" % "provided"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.1.0"
Maven则如下:
<dependencies>
<dependency>
<groupId>spark.jobserver</groupId>
<artifactId>job-server-api</artifactId>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.3</version>
</dependency>
</dependencies>
2. 通过job server来提交的job,必须实现SparkJob相关的接口,这是jobserver复用context机制的前提,然后重写2个方法:
object SampleJob extends SparkJob {
override def runJob(sc:SparkContext, jobConfig: Config): Any = ???
override def validate(sc:SparkContext, config: Config): SparkJobValidation = ???
}
3. 例子
package SparkJob
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark._
import org.apache.spark.SparkContext._
import scala.util.Try
import spark.jobserver.SparkJob
import spark.jobserver.SparkJobValidation
import spark.jobserver.SparkJobValid
import spark.jobserver.SparkJobInvalid
object WordCount extends SparkJob{
def main(args: Array[String]) {
val sc = new SparkContext("local[4]", "WordCountExample")
val config = ConfigFactory.parseString("")
val results = runJob(sc, config)
println("Result is " + results)
}
override def validate(sc: SparkContext, config: Config): SparkJobValidation = {
Try(config.getString("input.string"))
.map(x => SparkJobValid)
.getOrElse(SparkJobInvalid("No input.string config param"))
}
override def runJob(sc: SparkContext, config: Config): Any = {
val dd = sc.parallelize(config.getString("input.string").split(" ").toSeq)
val rsList = dd.map((_,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortByKey(false).collect
rsList(0)._2
}
}
------------------------------------------------------------ 以下是网络上的安装资料
打包
mvn clean package -DSkipTests
-Pproduct
上传jar包
curl --data-binary @target/recommendation-model-1.0-SNAPSHOT.jar
http://bds-test-004:8090/jars/test
执行代码
curl -d "source=spark_test.wangxiang_fact_user_product_rating" 'http://bds-test-004:8090/jobs?appName=test&classPath=com.jd.jddp.dm.model.ALSRecommend'
curl -d "source=spark_test.movielens,save_to=d_0e276318a87ced54171884ed765e9962.t_8dfe9c53a6cae3d5356984f799f0d685,rec_num=6,columns=user;product;rating" 'http://bds-test-004:8090/jobs?appName=test&classPath=com.jd.jddp.dm.model.ColdStart'
查看执行状态
curl
http://192.168.177.80:8090/jobs/adc26020-f44b-4552-97f7-687202e1497d
查看webui
http://192.168.177.80:8090/
Contexts
参考:
https://github.com/spark-jobserver/spark-jobserver#contexts
默认情况下,每次提交spark任务,如果没有传递context参数,jobserver会新建一个sparkContext,配置为yarn.conf中的配置。
查看api可以自定义一个context,提交的时候可以指定使用该context:
GET /contexts - lists all current contexts
POST /contexts/<name> -
creates a new context
DELETE /contexts/<name> - stops a context and all jobs running in it
PUT /contexts?reset=reboot - kills all contexts and re-loads only the contexts from config
>curl -X POST "http://bds-test-004:8090/contexts/test1"
提交的时候指定context:
curl -d "source=spark_test.movielens,save_to=test.test123,rec_num=6,columns=user;product;rating" 'http://bds-test-004:8090/jobs?appName=baseAlgo&classPath=com.jd.jddp.dm.model.ALSRecommend&context=test-context2'
执行结果:
curl -d "source=spark_test.wangxiang_fact_user_product_rating,save_to=dtest.test234,rec_num=6,columns=userid;product_id;rating" 'http://bds-test-004:8090/jobs?appName=test&classPath=com.jd.jddp.dm.model.ColdStart'
{
"status": "STARTED",
"result": {
"jobId": "369670b9-ad45-454e-aeac-52e1265cf889",
"context": "feeaa22e-com.jd.jddp.dm.model.ColdStart"
}
}
-----------------------------------------------yarn版本配置
需要修改的配置
settings.sh
路径配置
INSTALL_DIR=/export/App/job-server
SPARK_HOME=/export/App/spark-1.6.0-bin-hadoop2.6.1
yarn.conf
yarn.conf配置了spark运行模式,jobserver,以及SparkContext默认设置
yarn.conf:
spark {
# spark.master will be passed to each job's JobContext
#master = "local[4]"
# master = "mesos://vm28-hulk-pub:5050"
# master = "yarn-client"
master = "yarn-client"
# Default # of CPUs for jobs to use for Spark standalone cluster
#job-number-cpus = 4
jobserver {
port = 8090
jar-store-rootdir = /tmp/jobserver/jars
context-per-jvm = true
jobdao = spark.jobserver.io.JobFileDAO
filedao {
rootdir = /tmp/spark-job-server/filedao/data
}
}
context-settings {
num-cpu-cores = 2 # Number of cores to allocate. Required.
memory-per-node = 2G # Executor memory per node, -Xmx style eg 512m, #1G, etc.
spark.executor.instances = 4
spark.cassandra.connection.host = 192.168.177.79
spark.cassandra.auth.username = test
spark.cassandra.auth.password = test123
spark.cleaner.ttl = 3600
# in case spark distribution should be accessed from HDFS (as opposed to being installed on every mesos slave)
# spark.executor.uri = "hdfs://namenode:8020/apps/spark/spark.tgz"
# uris of jars to be loaded into the classpath for this context. Uris is a string list, or a string separated by commas ','
# dependent-jar-uris = ["file:///some/path/present/in/each/mesos/slave/somepackage.jar"]
# If you wish to pass any settings directly to the sparkConf as-is, add them here in passthrough,
# such as hadoop connection settings that don't use the "spark." prefix
passthrough {
#es.nodes = "192.1.1.1"
}
}
# This needs to match SPARK_HOME for cluster SparkContexts to be created successfully
# home = "/home/spark/spark"
}
# Note that you can use this file to define settings not only for job server,
# but for your Spark jobs as well. Spark job configuration merges with this configuration file as defaults.
akka {
remote.netty.tcp {
# This controls the maximum message size, including job results, that can be sent
# maximum-frame-size = 10 MiB
}
}
spark代码中需要修改的地方:
参考源码中的test代码:
object WordCountExampleNewApi extends NewSparkJob {
type JobData = Seq[String]
type JobOutput = collection.Map[String, Long]
def runJob(sc: SparkContext, runtime: JobEnvironment, data: JobData): JobOutput =
sc.parallelize(data).countByValue
def validate(sc: SparkContext, runtime: JobEnvironment, config: Config):
JobData Or Every[ValidationProblem] = {
Try(config.getString("input.string").split(" ").toSeq)
.map(words => Good(words))
.getOrElse(Bad(One(SingleProblem("No input.string param"))))
}
}
API
参考:https://github.com/spark-jobserver/spark-jobserver#api
API
Jars
Contexts
Jobs
Data
Context configuration
Other configuration settings
Job Result Serialization
spark-job-server是使用sbt来编译的
sbt下载:http://www.scala-sbt.org/download.html (ubuntu用户可以直接使用apt-get输入命令行进行安装)
sbt使用参考官网:http://www.scala-sbt.org/0.13/docs/Hello.html
或者:
https://github.com/CSUG/real_world_scala/blob/master/02_sbt.markdown
Spark-job-server使用的是AKKA框架, 处理rest api请求的代码在WebApi.scala中,获取job信息的函数:
defjobRoutes:Route=
pathPrefix("jobs") {
// GET /jobs/<jobId>
获取job信息的类JobStatusActor.scala
build过程在Build.scala
修改scala版本:
scalaVersion := sys.env.getOrElse("SCALA_VERSION", "2.11.8"),
此外,scala版本在local.sh等sh中也要相应的修改。
spark版本在Versions.scala。
编译代码执行:
./server_package.sh local
spark 使用scala2.11编译后,打包会报 akka包找不到的错。