当前位置: 首页 > 工具软件 > JobServer > 使用案例 >

spark job server使用方法

司马自明
2023-12-01

入门

  • clone代码
    从github上的spark-jobserver工程clone代码到本地
  • 编译
    • 需要将工程根目录下的config文件删除
    • 将文件夹job-server/config拷贝到工程根目录下
    • 将local.conf.template/local.sh.template重命名为locao.conf/local.sh
    • 配置环境
    • export JAVA_HOME=/d/java/jdk1.7.0_75
    • `</li>
      <li>运行编译命令
      bin/server_package.sh local`(需要sbt环境)
    • 打好的包位于:/tmp/job-server/job-server.tar.gz(打包位置,可以在bin/server_package.sh中设置WORK_DIR,来改变)
    • 将job-server.tar.gz上传到linux系统中
  • 配置
    • local.conf
    • master: 指定spark的master
    • jobserver: server的配置
    • settings.sh
    • port配置
    • server路径
    • log路径
    • spark的home/conf路径等配置
    • server_start.sh
    • 此脚本,使用spark-submit提交作业
    • spark的相关配置信息,可以在此脚本里配置
  • 运行/停止server
    • ./server_start.sh
    • ./server_stop.sh
  • 日志
    • setting.sh中配置的LOG_DIR,是日志所在位置
    • server_start.log
    • 本地server日志
    • spark-job-server.log
    • spark的driver日志
  • 使用
    1. 初始化spark的context
      • curl -d "" 'ip:port/contexts/contextName?context-factory=spark.jobserver.context.SQLContextFactory'
      • ip:port:为spark job server启动的机器和端口
      • contextName:context的名字,后面执行操作需要用到,还可以在jobTracker页面,通过此名字搜索运行的application
      • context-factory:初始化context
        • spark.jobserver.context.SQLContextFactory用来初始化SQLContext
        • spark.jobserver.context.HiveContextFactory用来初始化HiveContext
        • spark.jobserver.context.DefaultSparkContextFactory用来初始化SparkContext
        • spark.jobserver.context.StreamingContextFactory用来初始化StreamingContext
    2. 上传jar包,初始化appName
      • curl --data-binary @/xx/xx/job-server-extras_2.10-0.7.0-SNAPSHOT.jar ip:port/jars/appName
      • @指定本地资源路径
      • jars/appName指定appName,后面执行操作需要用到
    3. 执行操作(sql等)
      • curl -d "sql=\"show databases\"" 'ip:port/jobs?appName=xxx&classPath=spark.jobserver.SqlTestJob&context=contextName&sync=true'
      • post方式将数据”sql=xxx”传递给server
      • appName:步骤2中初始化的值
      • classPath:用户可以自定义,实现接口spark.jobserver.api.SparkJobBase的方法runJob即可
        • spark.jobserver.SqlTestJob实现逻辑
        • 拿到传入的sql
        • SQLContext.sql(sql).collect()执行
      • context:指定提交到的contextName,步骤1中初始化的值
      • sync:是否为同步模式
        • true:会等待一段时间,如果超过这个时间,没有返回,则返回json格式,有报错信息
        • 例: {"status": "ERROR", "result": {"message": "Ask timed out on [Actor[akka://JobServer/user/context-supervisor/hive-context-test#91063205]] after[10000 ms]", "errorClass": "akka.pattern.AskTimeoutException", "stack": ["akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)", ..."]}
        • false:直接返回json,包括信息如下:
        • duration
          • 作业完成时,则为作业总共占用时间,xx secs
          • 作业未完成,则为”Job not done yet”
        • classPath:指定的classPath
        • startTime:作业提交时间
        • context:提交到的context的name
        • status
          • ”STARTED”:开始执行
        • jobId:spark运行的jobId
    4. 获取返回结果
      • curl -v 'ip:port/jobs/jobId'
      • 查询jobId的结果
        • duration:作业运行时间
        • result:步骤3中自定义的classPath的返回结果
        • status
        • FINISHED
        • ERROR
        • RUNNING
        • 例:{ "duration": "24.463 secs", "classPath": "spark.jobserver.HiveTestJob", "startTime":"2016-11-17T11:01:09.249+08:00", "context": "hive-context-test", "result": ["[2,www]",...],"status": "FINISHED", "jobId": "5bc87741-c289-4f13-8f5c-de044256fcc7"}
  • 其他使用
    • context操作
    • 获取:curl http://host:port/contexts
    • 删除:curl -X DELETE http://host:port/contexts/name
    • kill所有的context,并reload配置中的:curl -X PUT http://host:port/contexts?reset=reboot
    • job操作
    • 获取:curl http://host:port/jobs
    • kill:curl -X DELETE http://host:port/jobs/jobId

参考文献

spark job server git

 类似资料: