当前位置: 首页 > 工具软件 > Spark Kernel > 使用案例 >

CC00081.spark——|Hadoop&Spark.V07|——|Spark.v07|Spark 原理 源码|Spark Context|

包承望
2023-12-01
一、Spark Env内部组件
### --- SparkEnv内部组件

~~~     SparkEnv是spark计算层的基石,不管是 Driver 还是 Executor,
~~~     都需要依赖SparkEnv来进行计算,它是Spark的执行环境对象,
~~~     其中包括与众多Executor执行相关的对象。
~~~     Spark 对任务的计算都依托于 Executor 的能力,
~~~     所有的Executor 都有自己的 Spark 的执行环境 SparkEnv。
~~~     有了 SparkEnv,可以将数据存储在存储体系中;利用计算引擎对计算任务进行处理,
~~~     可以在节点间进行通信等。
### --- 源码提取说明

~~~     # 源码提取说明:SparkEnv.scala
~~~     # 57行~71行
class SparkEnv (
    val executorId: String,
    private[spark] val rpcEnv: RpcEnv,
    val serializer: Serializer,
    val closureSerializer: Serializer,
    val serializerManager: SerializerManager,
    val mapOutputTracker: MapOutputTracker,
    val shuffleManager: ShuffleManager,
    val broadcastManager: BroadcastManager,
    val blockManager: BlockManager,
    val securityManager: SecurityManager,
    val metricsSystem: MetricsSystem,
    val memoryManager: MemoryManager,
    val outputCommitCoordinator: OutputCommitCoordinator,
    val conf: SparkConf) extends Logging {
二、Spark Env内部组件:参数说明
### --- spark Env内部组件:参数说明

~~~     # ExecutorId
~~~     每一个Executor在向Driver端注册时,都会生成一个ExecutorId,对Driver来说,
~~~     这个ExecutorId就是Executor的唯一标识,它会一直伴随Executor;
~~~     # RpcEnv
~~~     通过Netty技术来实现对组件之间的通信;

~~~     # Serializer
~~~     Spark使用的序列化器,
~~~     默认使用Java的序列化器org.apache.spark.serializer.JavaSerializer;
~~~     # SerializerManager

~~~     Spark 中很多对象在通用网络传输或者写入存储体系时,都需要序列化。
~~~     SparkEnv 中有两个序列化组件,分别是SerializerManager和ClosureSerializer。
~~~     serializer默认为Java的序列化器org.apache.spark.serializer.JavaSerializer,
~~~     用户可以通过spark.serializer属性配置其他的序列化实现,
~~~     如org.apache.spark.serializer.KryoSerializer。
~~~     而 closureSerializer 的实际类型固定为org.apache.spark.serializer.JavaSerializer,
~~~     用户不能够自己指定。JavaSerializer采用 Java 语言自带的序列化API 实现;
~~~     # MapOutPutTracker

~~~     MapOutputTracker 用于跟踪Map阶段任务的输出状态,
~~~     此状态便于Reduce阶段任务获取地址及中间结果。
~~~     每个Map任务或者Reduce任务都会有其唯一标识,分别为mapId 和 reduceId。
~~~     每个Reduce任务的输入可能是多个Map任务的输出,
~~~     Reduce会到各个Map任务的所在节点上拉取Block。
~~~     每个Shuffle过程都有唯一的表示shuffleId。
~~~     MapOutputTracker 有两个子类:MapOutputTrackerMaster(fordriver) 和 
~~~     MapOutputTrackerWorker(for executors);因为它们使用了不同的HashMap来存储元数据;
~~~     # ShuffleManager

~~~     ShuffleManager负责管理本地及远程的Block数据的shuffle操作。
~~~     ShuffleManager根据默认的 spark.shuffle.manager 属性,
~~~     通过反射方式生成的SortShuffleManager的实例。默认使用的是sort模式的SortShuffleManager;
~~~     # BroadcastManager
~~~     BroadcastManager用于将配置信息和序列化后的RDD、Job以及ShuffleDependency等信息在本地存储。
~~~     如果为了容灾,也会复制到其他节点上;BlockManager。BlockManager负责对Block的管理;

~~~     # SecurityManager
~~~     主要对帐号、权限以及身份认证进行设置和管理;
~~~     # MetricsSystem

~~~     Spark内置的测量系统。
~~~     度量系统根据当前实例是 Driver 还是 Executor 有所区别:
~~~     当前实例为 Driver:创建度量系统,并且指定度量系统的实例名为 driver,
~~~     然后等待 SparkContext 中的任务调度器 TaskScheculer 告诉度量系统后再启动;
~~~     当前实例为 Executor:设置spark.executor.id属性为当前 Executor 的ID,
~~~     然后再创建并启动度量系统;
~~~     # MemoryManager。
~~~     MemoryManager 的主要实现有 StaticMemoryManager 和UnifiedMemoryManager(默认)。

~~~     # OutputCommitCoordinator
~~~     当 Spark 应用程序使用了 Spark SQL (包括 Hive)或者需要将任务的输出保存到 HDFS 时,
~~~     就会用到输出提交协调器 OutputCommitCoordinator,
~~~     OutputCommitCoordinator 将决定任务是否可以提交输出到 HDFS。
~~~     无论是 Driver 还是 Executor,在 SparkEnv 中都包含了子组件OutputCommitCoordinator。
~~~     在 Driver 上注册了 OutputCommitCoordinatorEndpoint,
~~~     在所有 Executor 上的 OutputCommitCoordinator 都是通过 OutputCommitCoordinatorEndpoint 的 
~~~     RpcEndpointRef 来询问Driver 上的 OutputCommitCoordinator,是否能够将输出提交到 HDFS。
 类似资料: