当前位置: 首页 > 工具软件 > JobServer > 使用案例 >

「·部署·」安装与使用 Spark-jobServer

公冶元青
2023-12-01


Spark-jobserver 提供了一个 RESTful 接口来提交和管理 spark 的 jobs、jars 和 job contexts。这个项目包含了完整的 Spark job server 的项目,包括单元测试和项目部署脚本。


特性


“Spark as Service”:针对 job 和 contexts 的各个方面提供了 REST 风格的 api 接口进行管理
支持 SparkSQL、Hive、Streaming Contexts/jobs 以及定制 job contexts!具体参考Contexts
通过集成 Apache Shiro 来支持 LDAP 权限验证
通过长期运行的job contexts支持亚秒级别低延迟的任务
可以通过结束 context 来停止运行的作业(job)
分割 jar 上传步骤以提高 job 的启动
异步和同步的 job API,其中同步 API 对低延时作业非常有效
支持 Standalone Spark 和 Mesos、yarn
Job 和 jar 信息通过一个可插拔的 DAO 接口来持久化
对RDD或DataFrame对象命名并缓存,通过该名称获取RDD或DataFrame。这样可以提高对象在作业间的共享和重用
支持 Scala 2.10 版本和 2.11 版本


官方文档:https://github.com/spark-jobserver/spark-jobserver#api




------------------------------------------安装并启动jobServer:


(1)
jobServer依赖sbt,所以必须先装好sbt.
ubuntu安装sbt:
echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 642AC823
sudo apt-get update
sudo apt-get install sbt




(2)
部署.(spark on yarn 或者 local 模式)


https://github.com/spark-jobserver/spark-jobserver 将spark-job-server source clone到本地,解压,进入解压后的目录:


1.拷贝 conf/local.sh.template 文件到 local.sh 。
:修改属性
--------------------------------------------------
DEPLOY_HOSTS="biyuzhe"
APP_USER=raini
APP_GROUP=raini
JMX_PORT=9999
# optional SSH Key to login to deploy server
#SSH_KEY=/path/to/keyfile.pem
INSTALL_DIR=/home/raini/spark/spark-jobserver
LOG_DIR=/home/raini/spark/spark-jobserver/log
PIDFILE=spark-jobserver.pid
JOBSERVER_MEMORY=1G
SPARK_VERSION=1.6.3
MAX_DIRECT_MEMORY=1G
SPARK_HOME=/home/raini/spark
SPARK_CONF_DIR=$SPARK_HOME/conf
# Only needed for Mesos deploys
SPARK_EXECUTOR_URI=/home/raini/spark/spark-1.6.3.tar.gz
# Only needed for YARN running outside of the cluster
# You will need to COPY these files from your cluster to the remote machine
# Normally these are kept on the cluster in /etc/hadoop/conf
# YARN_CONF_DIR=/pathToRemoteConf/conf
# HADOOP_CONF_DIR=/pathToRemoteConf/conf
#
# Also optional: extra JVM args for spark-submit
# export SPARK_SUBMIT_OPTS+="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5433"
SCALA_VERSION=2.10.6 # or 2.11.6
-------------------------------------------------




2.拷贝 config/shiro.ini.template 文件到 shiro.ini。
备注: 仅需 authentication = on时执行这一步。现在没有shiro.ini.template




3.拷贝 config/local.conf.template 到 <environment>.conf。
备注: enviroment这里为local,即local.conf,本地模式运行spark;可以是yarn。enviroment中有webui和相关的环境变量信息。




4.执行 bin/server_package.sh local,这一步将job-server以及配置文件打包,并一同推送到配置的远程服务器上。




5.在远程服务器上部署的文件目录下通过执行 server_start.sh 启动服务,如需关闭服务可执行 server_stop.sh。
(注意执行server_start.sh的用户默认情况下为spark-job-server提交任务到yarn的用户)


报错:
raini@biyuzhe:~/spark/spark-jobserver$ bin/server_start.sh 
ls: 无法访问'/home/raini/spark/spark-jobserver/bin/*.conf': 没有那个文件或目录
No configuration file found


解决:
6.需要把config下的local.conf复制到INSTALL_DIR(~/spark/spark-jobserver安装目录)下面
  ,改名为local.conf(虽然名字一样,但是也要进行这一步,不然还是报错找不到文件),并修改其中的master以及两个路径。


(1)
  # 本来没有的,我自己加的
  jar-store-rootdir = /home/raini/spark/jars


(2)
    filedao {
      rootdir = /home/raini/spark/jars/filedao/data
    }
    datadao {
      rootdir = /home/raini/spark/jars/upload
    }
    sqldao {
      rootdir = /home/raini/spark/jars/sqldao/data
    }


jar-store-rootdir = /var/lib/spark/jars
rootdir = /var/lib/spark/filedao




7.查看spark-job-server是否在运行:ps -ef | grep spark-job-server
raini    10762 10761  6 10:43 pts/20   00:00:22 /home/raini/app/jdk/bin/java ...
raini    10338  7178  0 10:26 pts/20   00:00:00 grep --color=auto spark-job-server




8.Web查看:http://localhost:8090/




-------------------------------------------------------.使用测试
curl -X POST "http://localhost:8090/contexts/test-context2"
curl --data-binary @/home/raini/IdeaProjects/SparkJobServer/target/Test-1.0-SNAPSHOT.jar http://localhost:8090/jars/test-job2
curl -d "file:/home/raini/nfs3.err" 'localhost:8090/jobs?appName=test-job2&classPath=SparkJob.WordCount&context=test-context2&sync=true'


spark-submit --class SparkJob.WordCount --master local[1] /home/raini/IdeaProjects/SparkJobServer/target/Test-1.0-SNAPSHOT.jar file:///home/raini/文档/wordcount.scala






sparkJobSever启动是会常驻一个SparkSubmit进程,我们在创建context上下文的时候可以引起这个进程因此不用手动提交Job


(1).创建一个context(名字叫test-context)




raini@biyuzhe:~/spark/spark-jobserver$ curl -d "" 'localhost:8090/contexts/test-context?num-cpu-cores=2&memory-per-node=1G'
{
  "status": "SUCCESS",
  "result": "Context initialized"
}


(注:以上的Post请求也可 http://localhost:8090/contexts/test-context (postman模拟))




(2).上传jar包到Job-Server
    
raini@biyuzhe:~/spark/spark-jobserver$ curl --data-binary @/home/raini/IdeaProjects/SparkJobServer/target/Test-1.0-SNAPSHOT.jar localhost:8090/jars/sparkTest
{
  "status": "SUCCESS",
  "result": "Jar uploaded"
}


 (注:localhost:8090/jars/appName <-- 上传jar包的别名) 打包:raini@biyuzhe:~/IdeaProjects/SparkJobServer$ mvn clean package




(3).调用jar
curl -d "input.string = a b c a b see" 'localhost:8090/jobs?appName=test(并不是程序中设置的appName,而是Jars名称)&classPath=SparkJobServer.WordCount&context=test-context&sync=true'


curl -d "" 'localhost:8090/jobs?appName=WordCount&classPath=SparkJob.WordCount&context=test-context&sync=true'
{
  "result": {
    "a": 2,
    "b": 2,
    "c": 1,
    "see": 1
  }
}


spark-submit --class SparkJob.WordCount --master local[1] /home/raini/IdeaProjects/SparkJobServer/target/Test-1.0-SNAPSHOT.jar














-----------------------------------------------------------使用JobServer自带的例子做测试:




(1)
测试job执行,这里我们直接使用job-server的test包进行测试


raini@biyuzhe:~/spark/spark-jobserver$ sbt job-server-tests/package
[info] Packaging /home/raini/app/spark-1.6.3-bin-hadoop2.6/spark-jobserver/job-server-tests/target/scala-2.10/job-server-tests_2.10-0.7.0-SNAPSHOT.jar ...
[success] Total time: 20 s, completed 2017-3-6 14:22:13


(2)
编译完成后,将打包的jar文件通过REST接口上传
REST接口的API如下:
GET /jobs 查询所有job
POST /jobs 提交一个新job
GET /jobs/ 查询某一任务的结果和状态
GET /jobs//config


$ curl --data-binary @job-server-tests/target/scala-2.10/job-server-tests_2.10-0.7.0-SNAPSHOT.jar localhost:8090/jars/test
{
  "status": "SUCCESS",
  "result": "Jar uploaded"
}




(3)
# 查看提交的jar
SHELL$ curl localhost:8090/jars/
{
  "test": "2017-03-06T14:26:26.818+08:00",
  "test-job2": "2017-03-06T14:06:40.977+08:00",
  "sparkTest": "2017-03-06T13:10:12.281+08:00"
}


(4)
# 提交job
提交的appName为test,class为spark.jobserver.WordCountExample
SHELL$ curl -d "input.string = hello job server 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample'
{
  "duration": "0.703 secs",
  "classPath": "spark.jobserver.WordCountExample",
  "startTime": "2017-03-06T14:33:34.250+08:00",
  "context": "f02e5922-spark.jobserver.WordCountExample", (Web端查看)
  "result": {
    "job": 1,
    "hello": 1,
    "server": 1
  },
  "status": "FINISHED",
  "jobId": "14be72c5-b47d-4023-9af6-a8f4497bd21a"
}


OK
--------------------------------------------------------使用JobSERver在代码上的改变:


要创建一个JobServer项目,需要下面的流程。


1. 首先添加依赖,在build.sbt中加入jobserver和spark core的package
resolvers += "Job Server Bintray" at "http://dl.bintray.com/spark-jobserver/maven"
libraryDependencies += "spark.jobserver" % "job-server-api" % "0.4.0" % "provided"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.1.0"


Maven则如下:
  <dependencies>
    <dependency>
      <groupId>spark.jobserver</groupId>
      <artifactId>job-server-api</artifactId>
    </dependency>
  </dependencies>
  <dependencies>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.6.3</version>
    </dependency>
  </dependencies>




2. 通过job server来提交的job,必须实现SparkJob相关的接口,这是jobserver复用context机制的前提,然后重写2个方法:


object SampleJob  extends SparkJob {
    override def runJob(sc:SparkContext, jobConfig: Config): Any = ???
    override def validate(sc:SparkContext, config: Config): SparkJobValidation = ???
}






3. 例子




package SparkJob


import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark._
import org.apache.spark.SparkContext._
import scala.util.Try
import spark.jobserver.SparkJob
import spark.jobserver.SparkJobValidation
import spark.jobserver.SparkJobValid
import spark.jobserver.SparkJobInvalid


object WordCount extends SparkJob{
  def main(args: Array[String]) {
    val sc = new SparkContext("local[4]", "WordCountExample")
    val config = ConfigFactory.parseString("")
    val results = runJob(sc, config)
    println("Result is " + results)
  }


  override def validate(sc: SparkContext, config: Config): SparkJobValidation = {
    Try(config.getString("input.string"))
      .map(x => SparkJobValid)
      .getOrElse(SparkJobInvalid("No input.string config param"))
  }


  override def runJob(sc: SparkContext, config: Config): Any = {
    val dd = sc.parallelize(config.getString("input.string").split(" ").toSeq)
    val rsList = dd.map((_,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortByKey(false).collect
    rsList(0)._2
  }
}












------------------------------------------------------------ 以下是网络上的安装资料
打包


mvn clean package -DSkipTests 
-Pproduct


上传jar包


curl --data-binary @target/recommendation-model-1.0-SNAPSHOT.jar 
http://bds-test-004:8090/jars/test


执行代码


curl -d "source=spark_test.wangxiang_fact_user_product_rating" 'http://bds-test-004:8090/jobs?appName=test&classPath=com.jd.jddp.dm.model.ALSRecommend'


curl -d "source=spark_test.movielens,save_to=d_0e276318a87ced54171884ed765e9962.t_8dfe9c53a6cae3d5356984f799f0d685,rec_num=6,columns=user;product;rating" 'http://bds-test-004:8090/jobs?appName=test&classPath=com.jd.jddp.dm.model.ColdStart'


查看执行状态


curl 
http://192.168.177.80:8090/jobs/adc26020-f44b-4552-97f7-687202e1497d


查看webui


http://192.168.177.80:8090/






Contexts


参考:


https://github.com/spark-jobserver/spark-jobserver#contexts


默认情况下,每次提交spark任务,如果没有传递context参数,jobserver会新建一个sparkContext,配置为yarn.conf中的配置。


查看api可以自定义一个context,提交的时候可以指定使用该context:


GET /contexts - lists all current contexts


POST /contexts/<name> - 
creates a new context


DELETE /contexts/<name> - stops a context and all jobs running in it


PUT /contexts?reset=reboot - kills all contexts and re-loads only the contexts from config


>curl -X POST "http://bds-test-004:8090/contexts/test1"


提交的时候指定context:


curl -d "source=spark_test.movielens,save_to=test.test123,rec_num=6,columns=user;product;rating" 'http://bds-test-004:8090/jobs?appName=baseAlgo&classPath=com.jd.jddp.dm.model.ALSRecommend&context=test-context2'






执行结果:


curl -d "source=spark_test.wangxiang_fact_user_product_rating,save_to=dtest.test234,rec_num=6,columns=userid;product_id;rating" 'http://bds-test-004:8090/jobs?appName=test&classPath=com.jd.jddp.dm.model.ColdStart'


{


"status": "STARTED",


"result": {


"jobId": "369670b9-ad45-454e-aeac-52e1265cf889",


"context": "feeaa22e-com.jd.jddp.dm.model.ColdStart"


}


}


-----------------------------------------------yarn版本配置
需要修改的配置


settings.sh


路径配置


INSTALL_DIR=/export/App/job-server


SPARK_HOME=/export/App/spark-1.6.0-bin-hadoop2.6.1








yarn.conf


yarn.conf配置了spark运行模式,jobserver,以及SparkContext默认设置






yarn.conf:


spark {


# spark.master will be passed to each job's JobContext


#master = "local[4]"


# master = "mesos://vm28-hulk-pub:5050"


# master = "yarn-client"


master = "yarn-client"








# Default # of CPUs for jobs to use for Spark standalone cluster


#job-number-cpus = 4








jobserver {


port = 8090


jar-store-rootdir = /tmp/jobserver/jars


context-per-jvm = true


jobdao = spark.jobserver.io.JobFileDAO


filedao {


rootdir = /tmp/spark-job-server/filedao/data


}


}


context-settings {


num-cpu-cores = 2 # Number of cores to allocate. Required.


memory-per-node = 2G # Executor memory per node, -Xmx style eg 512m, #1G, etc.








spark.executor.instances = 4


spark.cassandra.connection.host = 192.168.177.79


spark.cassandra.auth.username = test


spark.cassandra.auth.password = test123


spark.cleaner.ttl = 3600


# in case spark distribution should be accessed from HDFS (as opposed to being installed on every mesos slave)


# spark.executor.uri = "hdfs://namenode:8020/apps/spark/spark.tgz"








# uris of jars to be loaded into the classpath for this context. Uris is a string list, or a string separated by commas ','


# dependent-jar-uris = ["file:///some/path/present/in/each/mesos/slave/somepackage.jar"]








# If you wish to pass any settings directly to the sparkConf as-is, add them here in passthrough,


# such as hadoop connection settings that don't use the "spark." prefix


passthrough {


#es.nodes = "192.1.1.1"


}


}








# This needs to match SPARK_HOME for cluster SparkContexts to be created successfully


# home = "/home/spark/spark"


}








# Note that you can use this file to define settings not only for job server,


# but for your Spark jobs as well. Spark job configuration merges with this configuration file as defaults.








akka {


remote.netty.tcp {


# This controls the maximum message size, including job results, that can be sent


# maximum-frame-size = 10 MiB


}


}






spark代码中需要修改的地方:


参考源码中的test代码:


object WordCountExampleNewApi extends NewSparkJob {


type JobData = Seq[String]


type JobOutput = collection.Map[String, Long]






def runJob(sc: SparkContext, runtime: JobEnvironment, data: JobData): JobOutput =


sc.parallelize(data).countByValue






def validate(sc: SparkContext, runtime: JobEnvironment, config: Config):


JobData Or Every[ValidationProblem] = {


Try(config.getString("input.string").split(" ").toSeq)


.map(words => Good(words))


.getOrElse(Bad(One(SingleProblem("No input.string param"))))


}


}






API


参考:https://github.com/spark-jobserver/spark-jobserver#api


API


Jars


Contexts


Jobs


Data


Context configuration


Other configuration settings


Job Result Serialization










spark-job-server是使用sbt来编译的


sbt下载:http://www.scala-sbt.org/download.html (ubuntu用户可以直接使用apt-get输入命令行进行安装)


sbt使用参考官网:http://www.scala-sbt.org/0.13/docs/Hello.html


或者: 
https://github.com/CSUG/real_world_scala/blob/master/02_sbt.markdown


Spark-job-server使用的是AKKA框架, 处理rest api请求的代码在WebApi.scala中,获取job信息的函数:


defjobRoutes:Route=
 pathPrefix("jobs") {


// GET /jobs/<jobId>








获取job信息的类JobStatusActor.scala






build过程在Build.scala


修改scala版本:


scalaVersion := sys.env.getOrElse("SCALA_VERSION", "2.11.8"),


此外,scala版本在local.sh等sh中也要相应的修改。


spark版本在Versions.scala。






编译代码执行:


./server_package.sh local








spark 使用scala2.11编译后,打包会报 akka包找不到的错。







 类似资料: