Spark-scheduler

隆礼骞
2023-12-01

Spark-scheduler

@(spark)[scheduler]

Task

/**                                                                                                                                                                     
 * A unit of execution. We have two kinds of Task's in Spark:                                                                                                           
 * - [[org.apache.spark.scheduler.ShuffleMapTask]]                                                                                                                      
 * - [[org.apache.spark.scheduler.ResultTask]]                                                                                                                          
 *                                                                                                                                                                      
 * A Spark job consists of one or more stages. The very last stage in a job consists of multiple                                                                        
 * ResultTasks, while earlier stages consist of ShuffleMapTasks. A ResultTask executes the task                                                                         
 * and sends the task output back to the driver application. A ShuffleMapTask executes the task                                                                         
 * and divides the task output to multiple buckets (based on the task's partitioner).                                                                                   
 *                                                                                                                                                                      
 * @param stageId id of the stage this task belongs to                                                                                                                  
 * @param partitionId index of the number in the RDD                                                                                                                    
 */                                                                                                                                                                     
private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable {     

ResultTask

/**                                                                                                                                                                     
 * A task that sends back the output to the driver application.                                                                                                         
 *                                                                                                                                                                      
 * See [[Task]] for more information.                                                                                                                                   
 *                                                                                                                                                                      
 * @param stageId id of the stage this task belongs to                                                                                                                  
 * @param taskBinary broadcasted version of the serialized RDD and the function to apply on each                                                                        
 *                   partition of the given RDD. Once deserialized, the type should be                                                                                  
 *                   (RDD[T], (TaskContext, Iterator[T]) => U).                                                                                                         
 * @param partition partition of the RDD this task is associated with                                                                                                   
 * @param locs preferred task execution locations for locality scheduling                                                                                               
 * @param outputId index of the task in this job (a job can launch tasks on only a subset of the                                                                        
 *                 input RDD's partitions).                                                                                                                             
 */                                                                                                                                                                     
private[spark] class ResultTask[T, U](                                                                                                                                  
    stageId: Int,                                                                                                                                                       
    taskBinary: Broadcast[Array[Byte]],                                                                                                                                 
    partition: Partition,                                                                                                                                               
    @transient locs: Seq[TaskLocation],                                                                                                                                 
    val outputId: Int)                                                                                                                                                  
  extends Task[U](stageId, partition.index) with Serializable {    

重点看一下它的runTask:

  override def runTask(context: TaskContext): U = {                                                                                                                     
    // Deserialize the RDD and the func using the broadcast variables.                                                                                                  
    val ser = SparkEnv.get.closureSerializer.newInstance()                                                                                                              
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](                                                                                       
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)                                                                                    

    metrics = Some(context.taskMetrics)                                                                                                                                 
    func(context, rdd.iterator(partition, context))                                                                                                                     
  }   
  1. 反序列化rdd
  2. 调用其iterator(iterator是RDD的final function,会根据情况调用computOrcheckoutpoint)

ShuffleMapTask

/**                                                                                                                                                                     
* A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner                                                                         
* specified in the ShuffleDependency).                                                                                                                                  
*                                                                                                                                                                       
* See [[org.apache.spark.scheduler.Task]] for more information.                                                                                                         
*                                                                                                                                                                       
 * @param stageId id of the stage this task belongs to                                                                                                                  
 * @param taskBinary broadcast version of of the RDD and the ShuffleDependency. Once deserialized,                                                                      
 *                   the type should be (RDD[_], ShuffleDependency[_, _, _]).                                                                                           
 * @param partition partition of the RDD this task is associated with                                                                                                   
 * @param locs preferred task execution locations for locality scheduling                                                                                               
 */                                                                                                                                                                     
private[spark] class ShuffleMapTask(                                                                                                                                    
    stageId: Int,                                                                                                                                                       
    taskBinary: Broadcast[Array[Byte]],                                                                                                                                 
    partition: Partition,                                                                                                                                               
    @transient private var locs: Seq[TaskLocation])                                                                                                                     
  extends Task[MapStatus](stageId, partition.index) with Logging {      

其runTask的返回是MapStatus:

MapStatus

/**
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
* task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
*/
private[spark] sealed trait MapStatus {
/* Location where this task was run. /
def location: BlockManagerId

/**
* Estimated size for the reduce block, in bytes.
*
* If a block is non-empty, then this method MUST return a non-zero size. This invariant is
* necessary for correctness, since block fetchers are allowed to skip zero-size blocks.
*/
def getSizeForBlock(reduceId: Int): Long
}

RunTask

runTask的也比较简单,就是生成一个ShuffleWriter,写结果。

TaskResult

// Task result. Also contains updates to accumulator variables.
private[spark] sealed trait TaskResult[T]
分为DirectTaskResult和IndirectTaskResult。

TaskInfo

/**                                                                                                                                                                     
 * :: DeveloperApi ::                                                                                                                                                   
 * Information about a running task attempt inside a TaskSet.                                                                                                           
 */                                                                                                                                                                     
@DeveloperApi                                                                                                                                                           
class TaskInfo(                                                                                                                                                         
    val taskId: Long,                                                                                                                                                   
    val index: Int,                                                                                                                                                     
    val attempt: Int,                                                                                                                                                   
    val launchTime: Long,                                                                                                                                               
    val executorId: String,                                                                                                                                             
    val host: String,                                                                                                                                                   
    val taskLocality: TaskLocality.TaskLocality,                                                                                                                        
    val speculative: Boolean) {   

TaskDescription


/**                                                                                                                                                                     
 * Description of a task that gets passed onto executors to be executed, usually created by                                                                             
 * [[TaskSetManager.resourceOffer]].                                                                                                                                    
 */                                                                                                                                                                     
private[spark] class TaskDescription(        

AccumulableInfo

/**                                                                                                                                                                     
 * :: DeveloperApi ::                                                                                                                                                   
 * Information about an [[org.apache.spark.Accumulable]] modified during a task or stage.                                                                               
 */                                                                                                                                                                     
@DeveloperApi                                                                                                                                                           
class AccumulableInfo (                                                                                                                                                 
    val id: Long,                                                                                                                                                       
    val name: String,                                                                                                                                                   
    val update: Option[String], // represents a partial update within a task                                                                                            
    val value: String) {                                                                                                                                                

SplitInfo

// information about a specific split instance : handles both split instances.                                                                                          
// So that we do not need to worry about the differences.                                                                                                               
@DeveloperApi                                                                                                                                                           
class SplitInfo(                                                                                                                                                        
    val inputFormatClazz: Class[_],                                                                                                                                     
    val hostLocation: String,                                                                                                                                           
    val path: String,                                                                                                                                                   
    val length: Long,                                                                                                                                                   
    val underlyingSplit: Any) {       

SparkListener

  1. 定义了一系列的事件
  2. 定义了接口trait SparkListener
  3. 定义了class StatsReportListener: Simple SparkListener that logs a few summary statistics when each stage complet.

JobResult

A result of a job in the DAGScheduler.
只有两种 JobSucceeded和JobFailed

JobWaiter

/**                                                                                                                                                                     
 * An object that waits for a DAGScheduler job to complete. As tasks finish, it passes their                                                                            
 * results to the given handler function.                                                                                                                               
 */                                                                                                                                                                     
private[spark] class JobWaiter[T](                                                                                                                                      
    dagScheduler: DAGScheduler,                                                                                                                                         
    val jobId: Int,                                                                                                                                                     
    totalTasks: Int,                                                                                                                                                    
    resultHandler: (Int, T) => Unit)                                                                                                                                    
  extends JobListener {   

JobListener

/**                                                                                                                                                                     
 * Interface used to listen for job completion or failure events after submitting a job to the                                                                          
 * DAGScheduler. The listener is notified each time a task succeeds, as well as if the whole                                                                            
 * job fails (and no further taskSucceeded events will happen).                                                                                                         
 */                                                                                                                                                                     
private[spark] trait JobListener {                                                                                                                                      
  def taskSucceeded(index: Int, result: Any)                                                                                                                            
  def jobFailed(exception: Exception)                                                                                                                                   
}   

JobLogger

/**                                                                                                                                                                     
 * :: DeveloperApi ::                                                                                                                                                   
 * A logger class to record runtime information for jobs in Spark. This class outputs one log file                                                                      
 * for each Spark job, containing tasks start/stop and shuffle information. JobLogger is a subclass                                                                     
 * of SparkListener, use addSparkListener to add JobLogger to a SparkContext after the SparkContext                                                                     
 * is created. Note that each JobLogger only works for one SparkContext                                                                                                 
 *                                                                                                                                                                      
 * NOTE: The functionality of this class is heavily stripped down to accommodate for a general                                                                          
 * refactor of the SparkListener interface. In its place, the EventLoggingListener is introduced                                                                        
 * to log application information as SparkListenerEvents. To enable this functionality, set                                                                             
 * spark.eventLog.enabled to true.                                                                                                                                      
 */                                                                                                                                                                     
@DeveloperApi                                                                                                                                                           
@deprecated("Log application information by setting spark.eventLog.enabled.", "1.0.0")                                                                                  
class JobLogger(val user: String, val logDirName: String) extends SparkListener with Logging {      

ApplicationEventListener

/**                                                                                                                                                                     
 * A simple listener for application events.                                                                                                                            
 *                                                                                                                                                                      
 * This listener expects to hear events from a single application only. If events                                                                                       
 * from multiple applications are seen, the behavior is unspecified.                                                                                                    
 */                                                                                                                                                                     
private[spark] class ApplicationEventListener extends SparkListener {      

DAGSchedulerEvent

/**                                                                                                                                                                     
 * Types of events that can be handled by the DAGScheduler. The DAGScheduler uses an event queue                                                                        
 * architecture where any thread can post an event (e.g. a task finishing or a new job being                                                                            
 * submitted) but there is a single "logic" thread that reads these events and takes decisions.                                                                         
 * This greatly simplifies synchronization.                                                                                                                             
 */                                                                                                                                                                     
private[scheduler] sealed trait DAGSchedulerEvent   

包含很多的event,重要的包括JobSubmitted,StageCancelled等等。

SparkListenerBus

/**                                                                                                                                                                     
 * A [[SparkListenerEvent]] bus that relays [[SparkListenerEvent]]s to its listeners                                                                                    
 */                                                                                                                                                                     
private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkListenerEvent] {  

EventLoggingListener

/**                                                                                                                                                                     
 * A SparkListener that logs events to persistent storage.                                                                                                              
 *                                                                                                                                                                      
 * Event logging is specified by the following configurable parameters:                                                                                                 
 *   spark.eventLog.enabled - Whether event logging is enabled.                                                                                                         
 *   spark.eventLog.compress - Whether to compress logged events                                                                                                        
 *   spark.eventLog.overwrite - Whether to overwrite any existing files.                                                                                                
 *   spark.eventLog.dir - Path to the directory in which events are logged.                                                                                             
 *   spark.eventLog.buffer.kb - Buffer size to use when writing to output streams                                                                                       
 */                                                                                                                                                                     
private[spark] class EventLoggingListener(                                                                                                                              
    appId: String,                                                                                                                                                      
    logBaseDir: URI,                                                                                                                                                    
    sparkConf: SparkConf,                                                                                                                                               
    hadoopConf: Configuration)                                                                                                                                          
  extends SparkListener with Logging {   

ReplayListenerBus

/**                                                                                                                                                                     
 * A SparkListenerBus that can be used to replay events from serialized event data.                                                                                     
 */                                                                                                                                                                     
private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {  

LiveListenerBus

/**                                                                                                                                                                     
 * Asynchronously passes SparkListenerEvents to registered SparkListeners.                                                                                              
 *                                                                                                                                                                      
 * Until start() is called, all posted events are only buffered. Only after this listener bus                                                                           
 * has started will events be actually propagated to all attached listeners. This listener bus                                                                          
 * is stopped when it receives a SparkListenerShutdown event, which is posted using stop().                                                                             
 */                                                                                                                                                                     
private[spark] class LiveListenerBus                                                                                                                                    
  extends AsynchronousListenerBus[SparkListener, SparkListenerEvent]("SparkListenerBus")                                                                                
  with SparkListenerBus {       

ExecutorLossReason

/**                                                                                                                                                                     
 * Represents an explanation for a executor or whole slave failing or exiting.                                                                                          
 */                                                                                                                                                                     
private[spark]                                                                                                                                                          
class ExecutorLossReason(val message: String) {                                                                                                                         
  override def toString: String = message                                                                                                                               
}       

SchedulerBackend

/**                                                                                                                                                                     
 * A backend interface for scheduling systems that allows plugging in different ones under                                                                              
 * TaskSchedulerImpl. We assume a Mesos-like model where the application gets resource offers as                                                                        
 * machines become available and can launch tasks on them.                                                                                                              
 */                                                                                                                                                                     
private[spark] trait SchedulerBackend {   

SchedulerBackend的子类有四类分别为MesosSchedulerBackend,CoarseMesosSchedulerBackend,SimrSchedulerBackend,SparkDeploySchedulerBackend。MesosSchedulerBackend和CoarseMesosSchedulerBackend用于mesos的部署方式,SimrSchedulerBackend用于hadoop部署方式,SparkDeploySchedulerBackend用于纯spark的部署方式。YarnSchedulerBackend用于基于yarn的方式,问题是它是个abstract class,实现在
spark/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala里。

LocalBackend

/**                                                                                                                                                                     
 * LocalBackend is used when running a local version of Spark where the executor, backend, and                                                                          
 * master all run in the same JVM. It sits behind a TaskSchedulerImpl and handles launching tasks                                                                       
 * on a single Executor (created by the LocalBackend) running locally.                                                                                                  
 */                                                                                                                                                                     
private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int)    

YarnSchedulerBackend

/**                                                                                                                                                                     
 * Abstract Yarn scheduler backend that contains common logic                                                                                                           
 * between the client and cluster Yarn scheduler backends.                                                                                                              
 */                                                                                                                                                                     
private[spark] abstract class YarnSchedulerBackend(                                                                                                                     
    scheduler: TaskSchedulerImpl,                                                                                                                                       
    sc: SparkContext)                                                                                                                                                   
  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { 

CoarseGrainedSchedulerBackend

/**                                                                                                                                                                     
 * A scheduler backend that waits for coarse grained executors to connect to it through Akka.                                                                           
 * This backend holds onto each executor for the duration of the Spark job rather than relinquishing                                                                    
 * executors whenever a task is done and asking the scheduler to launch a new executor for                                                                              
 * each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the                                                                           
 * coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode                                                                                 
 * (spark.deploy.*).                                                                                                                                                    
 */                                                                                                                                                                     
private[spark]                                                                                                                                                          
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSystem: ActorSystem)                                                                         
  extends ExecutorAllocationClient with SchedulerBackend with Logging    

SparkDeploySchedulerBackend

ActiveJob

/**                                                                                                                                                                     
 * Tracks information about an active job in the DAGScheduler.                                                                                                          
 */                                                                                                                                                                     
private[spark] class ActiveJob(                                                                                                                                         
    val jobId: Int,                                                                                                                                                     
    val finalStage: Stage,                                                                                                                                              
    val func: (TaskContext, Iterator[_]) => _,                                                                                                                          
    val partitions: Array[Int],                                                                                                                                         
    val callSite: CallSite,                                                                                                                                             
    val listener: JobListener,                                                                                                                                          
    val properties: Properties) {   

Stage

/**                                                                                                                                                                     
 * A stage is a set of independent tasks all computing the same function that need to run as part                                                                       
 * of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run                                                                        
 * by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the                                                                        
 * DAGScheduler runs these stages in topological order.                                                                                                                 
 *                                                                                                                                                                      
 * Each Stage can either be a shuffle map stage, in which case its tasks' results are input for                                                                         
 * another stage, or a result stage, in which case its tasks directly compute the action that                                                                           
 * initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes                                                                         
 * that each output partition is on.                                                                                                                                    
 *                                                                                                                                                                      
 * Each Stage also has a jobId, identifying the job that first submitted the stage.  When FIFO                                                                          
 * scheduling is used, this allows Stages from earlier jobs to be computed first or recovered                                                                           
 * faster on failure.                                                                                                                                                   
 *                                                                                                                                                                      
 * The callSite provides a location in user code which relates to the stage. For a shuffle map                                                                          
 * stage, the callSite gives the user code that created the RDD being shuffled. For a result                                                                            
 * stage, the callSite gives the user code that executes the associated action (e.g. count()).                                                                          
 *                                                                                                                                                                      
 * A single stage can consist of multiple attempts. In that case, the latestInfo field will                                                                             
 * be updated for each attempt.                                                                                                                                         
 *                                                                                                                                                                      
 */                                                                                                                                                                     
private[spark] class Stage(                                                                                                                                             
    val id: Int,                                                                                                                                                        
    val rdd: RDD[_],                                                                                                                                                    
    val numTasks: Int,                                                                                                                                                  
    val shuffleDep: Option[ShuffleDependency[_, _, _]],  // Output shuffle if stage is a map stage                                                                      
    val parents: List[Stage],                                                                                                                                           
    val jobId: Int,
    val callSite: CallSite)                                                                                                                                             
  extends Logging {       

StageInfo

/**                                                                                                                                                                     
 * :: DeveloperApi ::                                                                                                                                                   
 * Stores information about a stage to pass from the scheduler to SparkListeners.                                                                                       
 */                                                                                                                                                                     
@DeveloperApi                                                                                                                                                           
class StageInfo(                                                                                                                                                        
    val stageId: Int,                                                                                                                                                   
    val attemptId: Int,                                                                                                                                                 
    val name: String,                                                                                                                                                   
    val numTasks: Int,                                                                                                                                                  
    val rddInfos: Seq[RDDInfo],                                                                                                                                         
    val details: String) {      

TaskResultGetter

/**                                                                                                                                                                     
 * Runs a thread pool that deserializes and remotely fetches (if necessary) task results.                                                                               
 */                                                                                                                                                                     
private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)      

TaskLocation

/**                                                                                                                                                                     
 * A location where a task should run. This can either be a host or a (host, executorID) pair.                                                                          
 * In the latter case, we will prefer to launch the task on that executorID, but our next level                                                                         
 * of preference will be executors on the same host if this is not possible.                                                                                            
 */                                                                                                                                                                     
private[spark] sealed trait TaskLocation {                                                                                                                              
  def host: String                                                                                                                                                      
}          

SchedulingMode

/**                                                                                                                                                                     
 *  "FAIR" and "FIFO" determines which policy is used                                                                                                                   
 *    to order tasks amongst a Schedulable's sub-queues                                                                                                                 
 *  "NONE" is used when the a Schedulable has no sub-queues.                                                                                                            
 */                                                                                                                                                                     
object SchedulingMode extends Enumeration {                                                                                                                             

  type SchedulingMode = Value                                                                                                                                           
  val FAIR, FIFO, NONE = Value                                                                                                                                          
}  

TaskSet

/**                                                                                                                                                                     
 * A set of tasks submitted together to the low-level TaskScheduler, usually representing                                                                               
 * missing partitions of a particular stage.                                                                                                                            
 */                                                                                                                                                                     
private[spark] class TaskSet(                                                                                                                                           
    val tasks: Array[Task[_]],                                                                                                                                          
    val stageId: Int,                                                                                                                                                   
    val attempt: Int,                                                                                                                                                   
    val priority: Int,                                                                                                                                                  
    val properties: Properties) {          

AccumulableInfo

Information about an [[org.apache.spark.Accumulable]] modified during a task or stage.

InputFormatInfo

Parses and holds information about inputFormat (and files) specified as a parameter.
有意思的是,在object中有段注释:

  /**                                                                                                                                                                   
    Computes the preferred locations based on input(s) and returned a location to block map.                                                                            
    Typical use of this method for allocation would follow some algo like this:                                                                                         

    a) For each host, count number of splits hosted on that host.                                                                                                       
    b) Decrement the currently allocated containers on that host.                                                                                                       
    c) Compute rack info for each host and update rack -> count map based on (b).                                                                                       
    d) Allocate nodes based on (c)                                                                                                                                      
    e) On the allocation result, ensure that we dont allocate "too many" jobs on a single node                                                                          
       (even if data locality on that is very high) : this is to prevent fragility of job if a                                                                          
       single (or small set of) hosts go down.                                                                                                                          

    go to (a) until required nodes are allocated.                                                                                                                       

    If a node 'dies', follow same procedure.                                                                                                                            

    PS: I know the wording here is weird, hopefully it makes some sense !                                                                                               
  */      

Schedulable

Pool

An Schedulable entity that represent collection of Pools or TaskSetManagers

TaskSetManager

/**                                                                                                                                                                     
 * Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of                                                                      
 * each task, retries tasks if they fail (up to a limited number of times), and                                                                                         
 * handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces                                                                         
 * to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node,                                                                          
 * and statusUpdate, which tells it that one of its tasks changed state (e.g. finished).                                                                                
 *                                                                                                                                                                      
 * THREADING: This class is designed to only be called from code with a lock on the                                                                                     
 * TaskScheduler (e.g. its event handlers). It should not be called from other threads.                                                                                 
 *                                                                                                                                                                      
 * @param sched           the TaskSchedulerImpl associated with the TaskSetManager                                                                                      
 * @param taskSet         the TaskSet to manage scheduling for                                                                                                          
 * @param maxTaskFailures if any particular task fails more than this number of times, the entire                                                                       
 *                        task set will be aborted                                                                                                                      
 */                                                                                                                                                                     
private[spark] class TaskSetManager(                                                                                                                                    
    sched: TaskSchedulerImpl,                                                                                                                                           
    val taskSet: TaskSet,                                                                                                                                               
    val maxTaskFailures: Int,                                                                                                                                           
    clock: Clock = new SystemClock())                                                                                                                                   
  extends Schedulable with Logging {   

这个文件蛮长,需要详细解释一下:

TaskLocality

object TaskLocality extends Enumeration {                                                                                                                               
  // Process local is expected to be used ONLY within TaskSetManager for now.                                                                                           
  val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Value                                                                                                       

  type TaskLocality = Value                                                                                                                                                                                                                                                                                                                   

在TaskSetManager的逻辑中,它优先选择离自己近的节点先跑,优先级就是上面的

resourceOffer

  /**                                                                                                                                                                   
   * Respond to an offer of a single executor from the scheduler by finding a task                                                                                      
   *                                                                                                                                                                    
   * NOTE: this function is either called with a maxLocality which                                                                                                      
   * would be adjusted by delay scheduling algorithm or it will be with a special                                                                                       
   * NO_PREF locality which will be not modified                                                                                                                        
   *                                                                                                                                                                    
   * @param execId the executor Id of the offered resource                                                                                                              
   * @param host  the host Id of the offered resource                                                                                                                   
   * @param maxLocality the maximum locality we want to schedule the tasks at                                                                                           
   */                                                                                                                                                                   
  @throws[TaskNotSerializableException]                                                                                                                                 
  def resourceOffer(                                                                                                                                                    
      execId: String,                                                                                                                                                   
      host: String,                                                                                                                                                     
      maxLocality: TaskLocality.TaskLocality)                                                                                                                           
    : Option[TaskDescription] =     

有个executer可以提供一个位置跑了,那么就找出一个来

handleSuccessfulTask

Marks the task as successful and notifies the DAGScheduler that a task has ended.

handleFailedTask

Marks the task as failed, re-adds it to the list of pending tasks, and notifies the

基本逻辑

  1. 根据TaskSet所描述的Task列表,根据距离远近分别归类。
  2. SchedulerDAG 给出一个可以执行executor
  3. 选择最合适的task执行之
  4. 根据task执行的结果:
    • 成功, 标记成功
    • 失败,未达次数则继续try

SchedulingAlgorithm

/**                                                                                                                                                                     
 * An interface for sort algorithm                                                                                                                                      
 * FIFO: FIFO algorithm between TaskSetManagers                                                                                                                         
 * FS: FS algorithm between Pools, and FIFO or FS within Pools                                                                                                          
 */                                                                                                                                                                     
private[spark] trait SchedulingAlgorithm {                                                                                                                              
  def comparator(s1: Schedulable, s2: Schedulable): Boolean                                                                                                             
}      

其中FS指的是FairSchedulingAlgorithm

OutputCommitCoordinator

/**                                                                                                                                                                     
 * Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins"                                                                        
 * policy.                                                                                                                                                              
 *                                                                                                                                                                      
 * OutputCommitCoordinator is instantiated in both the drivers and executors. On executors, it is                                                                       
 * configured with a reference to the driver's OutputCommitCoordinatorActor, so requests to commit                                                                      
 * output will be forwarded to the driver's OutputCommitCoordinator.                                                                                                    
 *                                                                                                                                                                      
 * This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests)                                                                      
 * for an extensive design discussion.                                                                                                                                  
 */                                                                                                                                                                     
private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {   

在driver上,会有一个OutputCommitCoordinatorActor,这个Actor就是OutputCommitCoordinator的持有者,它会接受第一个task的请求;deny剩下所有的task的请求。
请求的粒度是: AskPermissionToCommitOutput(stage, partition, taskAttempt)

DAGScheduler

/**                                                                                                                                                                     
 * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of                                                                      
 * stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a                                                                       
 * minimal schedule to run the job. It then submits stages as TaskSets to an underlying                                                                                 
 * TaskScheduler implementation that runs them on the cluster.                                                                                                          
 *                                                                                                                                                                      
 * In addition to coming up with a DAG of stages, this class also determines the preferred                                                                              
 * locations to run each task on, based on the current cache status, and passes these to the                                                                            
 * low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being                                                                          
 * lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are                                                                        
 * not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task                                                                         
 * a small number of times before cancelling the whole stage.                                                                                                           
 *                                                                                                                                                                      
 * Here's a checklist to use when making or reviewing changes to this class:                                                                                            
 *                                                                                                                                                                      
 *  - When adding a new data structure, update `DAGSchedulerSuite.assertDataStructuresEmpty` to                                                                         
 *    include the new structure. This will help to catch memory leaks.                                                                                                  
 */                                                                                                                                                                     
private[spark]                                                                                                                                                          
class DAGScheduler(                                                                                                                                                     
    private[scheduler] val sc: SparkContext,                                                                                                                            
    private[scheduler] val taskScheduler: TaskScheduler,                                                                                                                
    listenerBus: LiveListenerBus,                                                                                                                                       
    mapOutputTracker: MapOutputTrackerMaster,                                                                                                                           
    blockManagerMaster: BlockManagerMaster,                                                                                                                             
    env: SparkEnv,                                                                                                                                                      
    clock: Clock = new SystemClock())                                                                                                                                   
  extends Logging {    

DAGSchedulerEventProcessLoop

 /**                                                                                                                                                                   
   * The main event loop of the DAG scheduler.                                                                                                                          
   */                                                                                                                                                                   
  override def onReceive(event: DAGSchedulerEvent): Unit = event match {                                                                                                
    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>                                                                      
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,                                                                               
        listener, properties)                                                                                                                                           

    case StageCancelled(stageId) =>                                                                                                                                     
      dagScheduler.handleStageCancellation(stageId)                                                                                                                     

    case JobCancelled(jobId) =>                                                                                                                                         
      dagScheduler.handleJobCancellation(jobId)                                                                                                                         

    case JobGroupCancelled(groupId) =>                                                                                                                                  
      dagScheduler.handleJobGroupCancelled(groupId)                                                                                                                     

    case AllJobsCancelled =>                                                                                                                                            
      dagScheduler.doCancelAllJobs()                                                                                                                                    

    case ExecutorAdded(execId, host) =>                                                                                                                                 
      dagScheduler.handleExecutorAdded(execId, host)                                                                                                                    

    case ExecutorLost(execId) =>                                                                                                                                        
      dagScheduler.handleExecutorLost(execId, fetchFailed = false)     

    case BeginEvent(task, taskInfo) =>                                                                                                                                  
      dagScheduler.handleBeginEvent(task, taskInfo)                                                                                                                     

    case GettingResultEvent(taskInfo) =>                                                                                                                                
      dagScheduler.handleGetTaskResult(taskInfo)                                                                                                                        

    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>                                                                                     
      dagScheduler.handleTaskCompletion(completion)                                                                                                                     

    case TaskSetFailed(taskSet, reason) =>                                                                                                                              
      dagScheduler.handleTaskSetFailed(taskSet, reason)                                                                                                                 

    case ResubmitFailedStages =>                                                                                                                                        
      dagScheduler.resubmitFailedStages()                                                                                                                               
  }                                                                                                                                                                     

生成Stage的基本逻辑

请先参考如下文章:
Stage划分及提交源码分析 或者 stage

个人理解:
1. 最后的一个RDD一定是一个Stage,so 把它当作最终的Stage
2. 从finalRDD开始遍历,如果遇到了ShuffleDependence,那么它也应该是一个Stage
3. 2的过程不断重复,直到所有的Stage都生成。

 类似资料: