当前位置: 首页 > 工具软件 > spark-wallet > 使用案例 >

spark Standalone

韩靖琪
2023-12-01

Spark Standalone

        Spark Standalone模式中,资源调度是Spark框架自己实现的,其节点类型分为Master节点和Worker节点,其中Driver运行在Master节点中,并且有常驻内存的Master进程守护,Worker节点上常驻Worker守护进程,负责与Master通信,通过ExecutorRunner来控制运行在当前节点上的CoarseGrainedExecutorBackend。每个Worker 上存在一个或者多个 CoarseGrainedExecutorBackend进程。每个进程包含一个Executor对象,该对象持有一个线程池,每个线程可以执行一个 Task。

        SparkStandalone是Master-Slaves架构的集群模式,和大部分的Master-Slaves结构集群类似,都存在着Master单点故障的问题。目前Spark提供了两种方式来解决此问题,一种是基于文件系统的故障恢复模式,这种方法适合当Master进程挂掉之后,直接重启即可;另一种方式是基于Zookeeper的HA方式,类似于HDFS的NameNode的HA方案,ActiveMaster挂掉之后,Standby Master会立即切换过去继续对外提供服务,同时这种基于Zookeeper的HA方案也被很多分布式框架采用,例如另外一种流式计算框架Storm以及HBase等。

集群启动脚本

在spark目录下执行,这些脚本只能在集群的master上执行


sbin/start-master.sh  创建一个master实例

sbin/start-slaves.sh   创建一个slave实例(conf/slaves中指定的每台机器上)

sbin/start-all.sh      同时创建一个master和多个slave实例

sbin/stop-all.sh      停止master(由bin/start-master.sh创建)

sbin/stop-slaves.sh   停止所有slave实例(位于conf/slaves上的机器)

sbin/stop-all.sh      同时停止master和slaves

 

Application的集群挂载

        配置好集群后可以再集群上挂载一个应用(application),只需要把master的URL传递给SparkContext或SparkConf接口的构造方法即可。

例如:

val conf = newSparkConf.setMaster(“spark://master:7077”)

val sc = newSparkContext(conf)

 

如果要把Spark-shell中运行集群上只需要输入:

Spark-shell--master spark://master:7077

 

当然Spark-shell中还有很多参数,例如--core <numCores>来控制spark-shell在集群上所使用的CPU核数。

 

启动编译的程序(Application)

        脚本spark-submit提供了standalone集群提交编译的spark-application的最直接的方式。对于standalone集群,spark目前仅支持客户端作业进程内的驱动程序(client deploy mode)。

        如果spark-application通过脚本spark-submit启动,那么application包就会自动分配到所有的worker节点上,如果application还同时依赖其他包文件,就应该用—jars 的标志来指定它们,不同包文件之间用逗号分隔(例如--jars jar1,jar2 ),更多的spark-submit参数,参考以前写过的 spark任务提交。

 

资源调度

        Standalone的集群模式目前只支持简单的FIFO作业调度方式。但是,你可以通过控制Spark作业能够获得的最大资源数来实现大量的并发作业。默认的情况下,一个作业会占用集群中的全部资源,但是这在只有一个作业进程的情况下才有意义。你可以在SparkConf中设置spark.cores.max来覆盖系统设置的CPU可用核数的属性,例如:将该作业的核数设为10

        val conf = newSparkConf().set(“spark.cores.max”,”10”)

        另外,可以在集群的master进程中配置脚本文件spark.deploy.defaultCores来改变作业运行所需要最大内核数的默认值属性,但这需要将下面的配置选项添加到conf/spark-env.sh中:

        exportSPARK_MASTER_OPTS=”-Dspark.deploy.defaultCores=<value>”

        这在共享集群中非常有用,因为用户不可能单独配置自己作业的最大可用核数属性。

监控和日志

        Spark通过Web UI来监控集群,master和每个worker都有自己的webUI用于显示集群和作业的统计资料。默认情形下,用户可以通过访问master的主端口master:8080进入到master的Web UI,端口值可以通过配置文件或命令行选项来改变。

        此外,每个作业输出的详细日志可以写入到它的工作目录中(默认为每个Worker节点上的SPARK_HOME/work)。在工作目录中,每个作业对应两个文件stdout和stderr,这两个文件包含了该作业所有控制台输出的信息。

        注:还会有master发送到该worker上的程序jar包。

高可用性(HA)

        通常情况下,单机模式的集群调度具有一定的容错能力(目前Spark通过移除失败作业到其他的worker上来实现自身的抗故障特性)。然后,用master做调度决策会产生单点故障的问题,如果master崩溃,就无法创建新的作业了。Spark提出了两种解决方案。

1. ZooKeeper的StandbyMasters

1.1 描述

        将Standalone集群连接到同一个Zookeeper实例并启动多个Master,利用ZooKeeper提供的选举和状态保存功能,选举一个Master,其他的Master处于Standby状态。如果当前的Master发生故障,另一个Master同样通过选举产生,并恢复到发生故障的Master的状态,恢复调度。整个过程可能需要1-2分钟。该延时只会影响新的Application的调度,对于故障切换过程中正在运行的Application没有任何影响。

1.2 配置

在spark-env里对SPARK_DAEMON_JAVA_OPTS设置以启用该恢复模式:

 

系统属性

描述

spark.deploy.recoveryMode

设置ZOOKEEPER启用standby Master恢复模式(缺省值:NONE

spark.deploy.zookeeper.url

Zooker集群的URL (e.g., 192.168.1.100:2181,192.168.1.101:2181).

spark.deploy.zookeeper.dir

设置Zookeeper集群中用于存储恢复状态的文件目录 (缺省值: /spark).


1.3 补充

        完成ZooKeeper集群的设置后,启用其高可用性是非常简单的。首先启动这个ZooKeeper集群,然后在不同节点上启动多个Masters,注意这些节点需要具有相同的ZooKeeper配置(ZooKeeper URL 和目录),而且Masters 可以随时被添加和移除。

        当调用新的Applications或添加新的Workers时,它们需要知道当前Master的IP地址,这种HA方案处理这种情况很简单,只需要使SparkContext接口指向一个Master列表就可以了,如spark://host1:port1,host2:port2。SparkContext就会尝试着依次对Master列表进行注册并轮询,如果host1出现故障,配置信息对于寻找host2仍然会有效。

        注册接入Master和正常运行没有什么重要不同,当集群启动时, application或worker需要能够找到并注册接入到当前的lead Master, 一旦注册成功,它们就会被并入系统(例如储存在ZooKeeper中)。 发生故障切换时,新被选中的Master就会联系所有先前已经注册成功的applications和Workers,并告知它们领导关系的变化。因此从某种意义上讲,这些applications和workers并不需要事先知道新Master的存在性。

        正是由于这一属性,新的Masters可以在任何时候被创建。唯一需要注意的是,在这个新的Masters成为leader的情形下,新的applications和workers 能够找到并注册接入到这个Master。 一旦注册成功,这个application或worker就会被启用。

2. 基于文件系统的单点恢复

2.1 描述

尽管Zookeeper是用于生产模式高可用性的最佳途径,但如果你仅仅想故障后重启你的Master,文件系统(FILESYSTEM)模式就可以做到。Spark提供了目录来保存Application和worker的注册状态信息,一但Master发生故障,就可以通过重新启动Master进程的方式来恢复Application和worker的旧有状态。

2.2 配置

 在spark-env里对SPARK_DAEMON_JAVA_OPTS设置:

系统属性

含义

spark.deploy.recoveryMode

设置文件系统启用单点恢复模式(缺省值:NONE

spark.deploy.recoveryDirectory

Spark-Master可进入的文件目录,用于存储作业的恢复状态


集群环境变量配置

1. 环境变量

配置在conf/spark-env.sh(通过conf/spark-env.sh.template模板创建的)中,然后拷贝到所有的worker节点上生效:

环境变量

含义

SPARK_MASTER_IP

Master节点的IP

SPARK_MASTER_PORT

Master的端口号 (缺省值: 7077).

SPARK_MASTER_WEBUI_PORT

master web UI端口号 (缺省值: 8080).

SPARK_MASTER_OPTS

设置只适用于Master的配置属性,形式为”-Dx=y”,配置选项参见下面列表

SPARK_LOCAL_DIRS

设置Spark缓存空间目录,包括存储在磁盘上shufflemap阶段的输出文件和RDD,该文件夹应该设置在你系统中一个高速的本地磁盘上(提高运行效率),也可以是用逗号分割的位于不同磁盘的多个目录

SPARK_WORKER_CORES

设置Worker节点Application可用的CPU总核数(缺省值:所有)

SPARK_WORKER_MEMORY

设置Worker节点Application可用的内存总量,eg.1000M,2G

(缺省值:内存总量-1G) 

注:在单个Application的内存配置使用spark.executor.memory属性,

SparkConfSparkContext中设置

SPARK_WORKER_PORT

Worker节点端口号 (缺省值: 随机).

SPARK_WORKER_WEBUI_PORT

Worker节点 web UI端口号 (缺省值: 8081).

SPARK_WORKER_INSTANCES

每个Worker节点启动的worker进程数量(缺省值:1)。

如果使worker的实例数大于1。要使用SPARK_WORKER_CORES

来限制每个worker进程所允许使用的内核数,

否则每个worker会尝试使用所有的处理器内核。

SPARK_WORKER_DIR

Worker节点的工作路径,包括日志和暂存空间等。

(缺省值:Woker节点上的SPARK_HOME/work

SPARK_WORKER_OPTS

设置只适用于Woker节点的配置属性,形式为”-Dx=y”,配置选项参见下面列表

SPARK_DAEMON_MEMORY

Spark masterworker守护进程分配内存 (缺省值: 512m).

SPARK_DAEMON_JAVA_OPTS

配置Master节点和Worker节点都使用的属性(缺省值: 无)

SPARK_PUBLIC_DNS

设置Spark masterworkers的公共DNS名称 (缺省值: ).



2. Master参数

SPARK_MASTER_OPTS支持的参数如下:

属性名称

缺省值

含义

spark.deploy.retainedApplications

200

显示已完成的Application的最大数目,当超过这个数目的时候,将从UI中删除之前的Application以维持这个数目。

spark.deploy.retainedDrivers

200

显示已完成的driver的最大数目,当超过这个数目的时候,将从UI中删除之前的Application以维持这个数目。

spark.deploy.spreadOut

true

Standalone集群管理器是否自由选择节点(true)还是固定到尽可能少的节点(false),前者会有更好的数据本地性,后者对于密集型工作负载更有效

spark.deploy.defaultCores

(infinite)

Standalone集群模式中,在没有设置spark.cores.max的情况下,系统默认分给applicationCPU核数。该情况下Application获得全部CPU核数,在共享集群上(例如mapReduce),应该手动设置spark.cores.max的值,以防止默认获得整个集群的资源。

spark.worker.timeout

60

设置时限(秒),master超过这个时间收不到worker的心跳,既认定worker已经丢失(故障)


3. Worker的参数

SPARK_MASTER_OPTS支持的参数如下:

属性名称

缺省值

含义

spark.worker.cleanup.enabled

false

是否定期清理Worker节点的应用程序工作目录,清理时不管Application是否运行,

该参数只对Standalone生效

spark.worker.cleanup.interval

1800 (30 minutes)

清理Worker节点本地过期的应用程序工作目录的时间间隔

spark.worker.cleanup.appDataTtl

7 * 24 * 3600 (7 days)

Worker保留应用程序工作目录的有效时间。该时间由磁盘空间,应用程序日志,应用程序的jar包以及应用程序的提交频率来设定,默认7天。

4. 一些配置的样例

在conf/spark-env.sh中配置:

//指定Standalone集群IP

export SPARK_MASTER_IP=master  

 

//设定每个Worker节点可用内存

export SPARK_WORKER_MEMORY=1g

 

//设置Application能使用的CPU核数为4,默认为使用所有集群中的COU核数。

export SPARK_JAVA_OPTS=”-Dspark.cores.max=4” 

 

//指定Master的HA,依赖于zookeeper集群,url参数和dir参数在上方的HA有说明。

export SPARK_DAEMON_JAVA_OPTS=”-Dspark.deploy.recoveryMode=ZOOKEEPER

-Dspark.deploy.zookeeper.url=host1:port,host2:port

-Dspark.deploy.zookeeper.dir=/spark”

 

注:在设置Worker进程的CPU个数和内存大小,要注意机器的实际硬件条件,如果配置的超过当前Worker节点的硬件条件,Worker进程会启动失败。

 类似资料:

相关阅读

相关文章

相关问答