Spark Standalone模式中,资源调度是Spark框架自己实现的,其节点类型分为Master节点和Worker节点,其中Driver运行在Master节点中,并且有常驻内存的Master进程守护,Worker节点上常驻Worker守护进程,负责与Master通信,通过ExecutorRunner来控制运行在当前节点上的CoarseGrainedExecutorBackend。每个Worker 上存在一个或者多个 CoarseGrainedExecutorBackend进程。每个进程包含一个Executor对象,该对象持有一个线程池,每个线程可以执行一个 Task。
SparkStandalone是Master-Slaves架构的集群模式,和大部分的Master-Slaves结构集群类似,都存在着Master单点故障的问题。目前Spark提供了两种方式来解决此问题,一种是基于文件系统的故障恢复模式,这种方法适合当Master进程挂掉之后,直接重启即可;另一种方式是基于Zookeeper的HA方式,类似于HDFS的NameNode的HA方案,ActiveMaster挂掉之后,Standby Master会立即切换过去继续对外提供服务,同时这种基于Zookeeper的HA方案也被很多分布式框架采用,例如另外一种流式计算框架Storm以及HBase等。
在spark目录下执行,这些脚本只能在集群的master上执行
sbin/start-master.sh 创建一个master实例
sbin/start-slaves.sh 创建一个slave实例(conf/slaves中指定的每台机器上)
sbin/start-all.sh 同时创建一个master和多个slave实例
sbin/stop-all.sh 停止master(由bin/start-master.sh创建)
sbin/stop-slaves.sh 停止所有slave实例(位于conf/slaves上的机器)
sbin/stop-all.sh 同时停止master和slaves
配置好集群后可以再集群上挂载一个应用(application),只需要把master的URL传递给SparkContext或SparkConf接口的构造方法即可。
例如:
val conf = newSparkConf.setMaster(“spark://master:7077”)
val sc = newSparkContext(conf)
如果要把Spark-shell中运行集群上只需要输入:
Spark-shell--master spark://master:7077
当然Spark-shell中还有很多参数,例如--core <numCores>来控制spark-shell在集群上所使用的CPU核数。
脚本spark-submit提供了standalone集群提交编译的spark-application的最直接的方式。对于standalone集群,spark目前仅支持客户端作业进程内的驱动程序(client deploy mode)。
如果spark-application通过脚本spark-submit启动,那么application包就会自动分配到所有的worker节点上,如果application还同时依赖其他包文件,就应该用—jars 的标志来指定它们,不同包文件之间用逗号分隔(例如--jars jar1,jar2 ),更多的spark-submit参数,参考以前写过的 spark任务提交。
Standalone的集群模式目前只支持简单的FIFO作业调度方式。但是,你可以通过控制Spark作业能够获得的最大资源数来实现大量的并发作业。默认的情况下,一个作业会占用集群中的全部资源,但是这在只有一个作业进程的情况下才有意义。你可以在SparkConf中设置spark.cores.max来覆盖系统设置的CPU可用核数的属性,例如:将该作业的核数设为10
val conf = newSparkConf().set(“spark.cores.max”,”10”)
另外,可以在集群的master进程中配置脚本文件spark.deploy.defaultCores来改变作业运行所需要最大内核数的默认值属性,但这需要将下面的配置选项添加到conf/spark-env.sh中:
exportSPARK_MASTER_OPTS=”-Dspark.deploy.defaultCores=<value>”
这在共享集群中非常有用,因为用户不可能单独配置自己作业的最大可用核数属性。
Spark通过Web UI来监控集群,master和每个worker都有自己的webUI用于显示集群和作业的统计资料。默认情形下,用户可以通过访问master的主端口master:8080进入到master的Web UI,端口值可以通过配置文件或命令行选项来改变。
此外,每个作业输出的详细日志可以写入到它的工作目录中(默认为每个Worker节点上的SPARK_HOME/work)。在工作目录中,每个作业对应两个文件stdout和stderr,这两个文件包含了该作业所有控制台输出的信息。
注:还会有master发送到该worker上的程序jar包。
通常情况下,单机模式的集群调度具有一定的容错能力(目前Spark通过移除失败作业到其他的worker上来实现自身的抗故障特性)。然后,用master做调度决策会产生单点故障的问题,如果master崩溃,就无法创建新的作业了。Spark提出了两种解决方案。
将Standalone集群连接到同一个Zookeeper实例并启动多个Master,利用ZooKeeper提供的选举和状态保存功能,选举一个Master,其他的Master处于Standby状态。如果当前的Master发生故障,另一个Master同样通过选举产生,并恢复到发生故障的Master的状态,恢复调度。整个过程可能需要1-2分钟。该延时只会影响新的Application的调度,对于故障切换过程中正在运行的Application没有任何影响。
在spark-env里对SPARK_DAEMON_JAVA_OPTS设置以启用该恢复模式:
系统属性 | 描述 |
spark.deploy.recoveryMode | 设置ZOOKEEPER启用standby Master恢复模式(缺省值:NONE) |
spark.deploy.zookeeper.url | Zooker集群的URL (e.g., 192.168.1.100:2181,192.168.1.101:2181). |
spark.deploy.zookeeper.dir | 设置Zookeeper集群中用于存储恢复状态的文件目录 (缺省值: /spark). |
完成ZooKeeper集群的设置后,启用其高可用性是非常简单的。首先启动这个ZooKeeper集群,然后在不同节点上启动多个Masters,注意这些节点需要具有相同的ZooKeeper配置(ZooKeeper URL 和目录),而且Masters 可以随时被添加和移除。
当调用新的Applications或添加新的Workers时,它们需要知道当前Master的IP地址,这种HA方案处理这种情况很简单,只需要使SparkContext接口指向一个Master列表就可以了,如spark://host1:port1,host2:port2。SparkContext就会尝试着依次对Master列表进行注册并轮询,如果host1出现故障,配置信息对于寻找host2仍然会有效。
注册接入Master和正常运行没有什么重要不同,当集群启动时, application或worker需要能够找到并注册接入到当前的lead Master, 一旦注册成功,它们就会被并入系统(例如储存在ZooKeeper中)。 发生故障切换时,新被选中的Master就会联系所有先前已经注册成功的applications和Workers,并告知它们领导关系的变化。因此从某种意义上讲,这些applications和workers并不需要事先知道新Master的存在性。
正是由于这一属性,新的Masters可以在任何时候被创建。唯一需要注意的是,在这个新的Masters成为leader的情形下,新的applications和workers 能够找到并注册接入到这个Master。 一旦注册成功,这个application或worker就会被启用。
尽管Zookeeper是用于生产模式高可用性的最佳途径,但如果你仅仅想故障后重启你的Master,文件系统(FILESYSTEM)模式就可以做到。Spark提供了目录来保存Application和worker的注册状态信息,一但Master发生故障,就可以通过重新启动Master进程的方式来恢复Application和worker的旧有状态。
在spark-env里对SPARK_DAEMON_JAVA_OPTS设置:
系统属性 | 含义 |
spark.deploy.recoveryMode | 设置文件系统启用单点恢复模式(缺省值:NONE) |
spark.deploy.recoveryDirectory | Spark-Master可进入的文件目录,用于存储作业的恢复状态 |
配置在conf/spark-env.sh(通过conf/spark-env.sh.template模板创建的)中,然后拷贝到所有的worker节点上生效:
环境变量 | 含义 |
SPARK_MASTER_IP | Master节点的IP |
SPARK_MASTER_PORT | Master的端口号 (缺省值: 7077). |
SPARK_MASTER_WEBUI_PORT | master web UI端口号 (缺省值: 8080). |
SPARK_MASTER_OPTS | 设置只适用于Master的配置属性,形式为”-Dx=y”,配置选项参见下面列表 |
SPARK_LOCAL_DIRS | 设置Spark缓存空间目录,包括存储在磁盘上shuffle的map阶段的输出文件和RDD,该文件夹应该设置在你系统中一个高速的本地磁盘上(提高运行效率),也可以是用逗号分割的位于不同磁盘的多个目录 |
SPARK_WORKER_CORES | 设置Worker节点Application可用的CPU总核数(缺省值:所有) |
SPARK_WORKER_MEMORY | 设置Worker节点Application可用的内存总量,eg.1000M,2G (缺省值:内存总量-1G) 注:在单个Application的内存配置使用spark.executor.memory属性, 在SparkConf或SparkContext中设置 |
SPARK_WORKER_PORT | Worker节点端口号 (缺省值: 随机). |
SPARK_WORKER_WEBUI_PORT | Worker节点 web UI端口号 (缺省值: 8081). |
SPARK_WORKER_INSTANCES | 每个Worker节点启动的worker进程数量(缺省值:1)。 如果使worker的实例数大于1。要使用SPARK_WORKER_CORES 来限制每个worker进程所允许使用的内核数, 否则每个worker会尝试使用所有的处理器内核。 |
SPARK_WORKER_DIR | Worker节点的工作路径,包括日志和暂存空间等。 (缺省值:Woker节点上的SPARK_HOME/work) |
SPARK_WORKER_OPTS | 设置只适用于Woker节点的配置属性,形式为”-Dx=y”,配置选项参见下面列表 |
SPARK_DAEMON_MEMORY | 为Spark master和worker守护进程分配内存 (缺省值: 512m). |
SPARK_DAEMON_JAVA_OPTS | 配置Master节点和Worker节点都使用的属性(缺省值: 无) |
SPARK_PUBLIC_DNS | 设置Spark master和workers的公共DNS名称 (缺省值: 无). |
SPARK_MASTER_OPTS支持的参数如下:
属性名称 | 缺省值 | 含义 |
spark.deploy.retainedApplications | 200 | 显示已完成的Application的最大数目,当超过这个数目的时候,将从UI中删除之前的Application以维持这个数目。 |
spark.deploy.retainedDrivers | 200 | 显示已完成的driver的最大数目,当超过这个数目的时候,将从UI中删除之前的Application以维持这个数目。 |
spark.deploy.spreadOut | true | Standalone集群管理器是否自由选择节点(true)还是固定到尽可能少的节点(false),前者会有更好的数据本地性,后者对于密集型工作负载更有效 |
spark.deploy.defaultCores | (infinite) | Standalone集群模式中,在没有设置spark.cores.max的情况下,系统默认分给application的CPU核数。该情况下Application获得全部CPU核数,在共享集群上(例如mapReduce),应该手动设置spark.cores.max的值,以防止默认获得整个集群的资源。 |
spark.worker.timeout | 60 | 设置时限(秒),master超过这个时间收不到worker的心跳,既认定worker已经丢失(故障) |
SPARK_MASTER_OPTS支持的参数如下:
属性名称 | 缺省值 | 含义 |
spark.worker.cleanup.enabled | false | 是否定期清理Worker节点的应用程序工作目录,清理时不管Application是否运行, 该参数只对Standalone生效 |
spark.worker.cleanup.interval | 1800 (30 minutes) | 清理Worker节点本地过期的应用程序工作目录的时间间隔 |
spark.worker.cleanup.appDataTtl | 7 * 24 * 3600 (7 days) | Worker保留应用程序工作目录的有效时间。该时间由磁盘空间,应用程序日志,应用程序的jar包以及应用程序的提交频率来设定,默认7天。 |
在conf/spark-env.sh中配置:
//指定Standalone集群IP
export SPARK_MASTER_IP=master
//设定每个Worker节点可用内存
export SPARK_WORKER_MEMORY=1g
//设置Application能使用的CPU核数为4,默认为使用所有集群中的COU核数。
export SPARK_JAVA_OPTS=”-Dspark.cores.max=4”
//指定Master的HA,依赖于zookeeper集群,url参数和dir参数在上方的HA有说明。
export SPARK_DAEMON_JAVA_OPTS=”-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=host1:port,host2:port
-Dspark.deploy.zookeeper.dir=/spark”
注:在设置Worker进程的CPU个数和内存大小,要注意机器的实际硬件条件,如果配置的超过当前Worker节点的硬件条件,Worker进程会启动失败。