### --- SparkEnv内部组件
~~~ SparkEnv是spark计算层的基石,不管是 Driver 还是 Executor,
~~~ 都需要依赖SparkEnv来进行计算,它是Spark的执行环境对象,
~~~ 其中包括与众多Executor执行相关的对象。
~~~ Spark 对任务的计算都依托于 Executor 的能力,
~~~ 所有的Executor 都有自己的 Spark 的执行环境 SparkEnv。
~~~ 有了 SparkEnv,可以将数据存储在存储体系中;利用计算引擎对计算任务进行处理,
~~~ 可以在节点间进行通信等。
### --- 源码提取说明
~~~ # 源码提取说明:SparkEnv.scala
~~~ # 57行~71行
class SparkEnv (
val executorId: String,
private[spark] val rpcEnv: RpcEnv,
val serializer: Serializer,
val closureSerializer: Serializer,
val serializerManager: SerializerManager,
val mapOutputTracker: MapOutputTracker,
val shuffleManager: ShuffleManager,
val broadcastManager: BroadcastManager,
val blockManager: BlockManager,
val securityManager: SecurityManager,
val metricsSystem: MetricsSystem,
val memoryManager: MemoryManager,
val outputCommitCoordinator: OutputCommitCoordinator,
val conf: SparkConf) extends Logging {
### --- spark Env内部组件:参数说明
~~~ # ExecutorId
~~~ 每一个Executor在向Driver端注册时,都会生成一个ExecutorId,对Driver来说,
~~~ 这个ExecutorId就是Executor的唯一标识,它会一直伴随Executor;
~~~ # RpcEnv
~~~ 通过Netty技术来实现对组件之间的通信;
~~~ # Serializer
~~~ Spark使用的序列化器,
~~~ 默认使用Java的序列化器org.apache.spark.serializer.JavaSerializer;
~~~ # SerializerManager
~~~ Spark 中很多对象在通用网络传输或者写入存储体系时,都需要序列化。
~~~ SparkEnv 中有两个序列化组件,分别是SerializerManager和ClosureSerializer。
~~~ serializer默认为Java的序列化器org.apache.spark.serializer.JavaSerializer,
~~~ 用户可以通过spark.serializer属性配置其他的序列化实现,
~~~ 如org.apache.spark.serializer.KryoSerializer。
~~~ 而 closureSerializer 的实际类型固定为org.apache.spark.serializer.JavaSerializer,
~~~ 用户不能够自己指定。JavaSerializer采用 Java 语言自带的序列化API 实现;
~~~ # MapOutPutTracker
~~~ MapOutputTracker 用于跟踪Map阶段任务的输出状态,
~~~ 此状态便于Reduce阶段任务获取地址及中间结果。
~~~ 每个Map任务或者Reduce任务都会有其唯一标识,分别为mapId 和 reduceId。
~~~ 每个Reduce任务的输入可能是多个Map任务的输出,
~~~ Reduce会到各个Map任务的所在节点上拉取Block。
~~~ 每个Shuffle过程都有唯一的表示shuffleId。
~~~ MapOutputTracker 有两个子类:MapOutputTrackerMaster(fordriver) 和
~~~ MapOutputTrackerWorker(for executors);因为它们使用了不同的HashMap来存储元数据;
~~~ # ShuffleManager
~~~ ShuffleManager负责管理本地及远程的Block数据的shuffle操作。
~~~ ShuffleManager根据默认的 spark.shuffle.manager 属性,
~~~ 通过反射方式生成的SortShuffleManager的实例。默认使用的是sort模式的SortShuffleManager;
~~~ # BroadcastManager
~~~ BroadcastManager用于将配置信息和序列化后的RDD、Job以及ShuffleDependency等信息在本地存储。
~~~ 如果为了容灾,也会复制到其他节点上;BlockManager。BlockManager负责对Block的管理;
~~~ # SecurityManager
~~~ 主要对帐号、权限以及身份认证进行设置和管理;
~~~ # MetricsSystem
~~~ Spark内置的测量系统。
~~~ 度量系统根据当前实例是 Driver 还是 Executor 有所区别:
~~~ 当前实例为 Driver:创建度量系统,并且指定度量系统的实例名为 driver,
~~~ 然后等待 SparkContext 中的任务调度器 TaskScheculer 告诉度量系统后再启动;
~~~ 当前实例为 Executor:设置spark.executor.id属性为当前 Executor 的ID,
~~~ 然后再创建并启动度量系统;
~~~ # MemoryManager。
~~~ MemoryManager 的主要实现有 StaticMemoryManager 和UnifiedMemoryManager(默认)。
~~~ # OutputCommitCoordinator
~~~ 当 Spark 应用程序使用了 Spark SQL (包括 Hive)或者需要将任务的输出保存到 HDFS 时,
~~~ 就会用到输出提交协调器 OutputCommitCoordinator,
~~~ OutputCommitCoordinator 将决定任务是否可以提交输出到 HDFS。
~~~ 无论是 Driver 还是 Executor,在 SparkEnv 中都包含了子组件OutputCommitCoordinator。
~~~ 在 Driver 上注册了 OutputCommitCoordinatorEndpoint,
~~~ 在所有 Executor 上的 OutputCommitCoordinator 都是通过 OutputCommitCoordinatorEndpoint 的
~~~ RpcEndpointRef 来询问Driver 上的 OutputCommitCoordinator,是否能够将输出提交到 HDFS。