@(spark)[scheduler]
/**
* A unit of execution. We have two kinds of Task's in Spark:
* - [[org.apache.spark.scheduler.ShuffleMapTask]]
* - [[org.apache.spark.scheduler.ResultTask]]
*
* A Spark job consists of one or more stages. The very last stage in a job consists of multiple
* ResultTasks, while earlier stages consist of ShuffleMapTasks. A ResultTask executes the task
* and sends the task output back to the driver application. A ShuffleMapTask executes the task
* and divides the task output to multiple buckets (based on the task's partitioner).
*
* @param stageId id of the stage this task belongs to
* @param partitionId index of the number in the RDD
*/
private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable {
/**
* A task that sends back the output to the driver application.
*
* See [[Task]] for more information.
*
* @param stageId id of the stage this task belongs to
* @param taskBinary broadcasted version of the serialized RDD and the function to apply on each
* partition of the given RDD. Once deserialized, the type should be
* (RDD[T], (TaskContext, Iterator[T]) => U).
* @param partition partition of the RDD this task is associated with
* @param locs preferred task execution locations for locality scheduling
* @param outputId index of the task in this job (a job can launch tasks on only a subset of the
* input RDD's partitions).
*/
private[spark] class ResultTask[T, U](
stageId: Int,
taskBinary: Broadcast[Array[Byte]],
partition: Partition,
@transient locs: Seq[TaskLocation],
val outputId: Int)
extends Task[U](stageId, partition.index) with Serializable {
重点看一下它的runTask:
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))
}
/**
* A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner
* specified in the ShuffleDependency).
*
* See [[org.apache.spark.scheduler.Task]] for more information.
*
* @param stageId id of the stage this task belongs to
* @param taskBinary broadcast version of of the RDD and the ShuffleDependency. Once deserialized,
* the type should be (RDD[_], ShuffleDependency[_, _, _]).
* @param partition partition of the RDD this task is associated with
* @param locs preferred task execution locations for locality scheduling
*/
private[spark] class ShuffleMapTask(
stageId: Int,
taskBinary: Broadcast[Array[Byte]],
partition: Partition,
@transient private var locs: Seq[TaskLocation])
extends Task[MapStatus](stageId, partition.index) with Logging {
其runTask的返回是MapStatus:
/**
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
* task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
*/
private[spark] sealed trait MapStatus {
/* Location where this task was run. /
def location: BlockManagerId
/**
* Estimated size for the reduce block, in bytes.
*
* If a block is non-empty, then this method MUST return a non-zero size. This invariant is
* necessary for correctness, since block fetchers are allowed to skip zero-size blocks.
*/
def getSizeForBlock(reduceId: Int): Long
}
runTask的也比较简单,就是生成一个ShuffleWriter,写结果。
// Task result. Also contains updates to accumulator variables.
private[spark] sealed trait TaskResult[T]
分为DirectTaskResult和IndirectTaskResult。
/**
* :: DeveloperApi ::
* Information about a running task attempt inside a TaskSet.
*/
@DeveloperApi
class TaskInfo(
val taskId: Long,
val index: Int,
val attempt: Int,
val launchTime: Long,
val executorId: String,
val host: String,
val taskLocality: TaskLocality.TaskLocality,
val speculative: Boolean) {
/**
* Description of a task that gets passed onto executors to be executed, usually created by
* [[TaskSetManager.resourceOffer]].
*/
private[spark] class TaskDescription(
/**
* :: DeveloperApi ::
* Information about an [[org.apache.spark.Accumulable]] modified during a task or stage.
*/
@DeveloperApi
class AccumulableInfo (
val id: Long,
val name: String,
val update: Option[String], // represents a partial update within a task
val value: String) {
// information about a specific split instance : handles both split instances.
// So that we do not need to worry about the differences.
@DeveloperApi
class SplitInfo(
val inputFormatClazz: Class[_],
val hostLocation: String,
val path: String,
val length: Long,
val underlyingSplit: Any) {
A result of a job in the DAGScheduler.
只有两种 JobSucceeded和JobFailed
/**
* An object that waits for a DAGScheduler job to complete. As tasks finish, it passes their
* results to the given handler function.
*/
private[spark] class JobWaiter[T](
dagScheduler: DAGScheduler,
val jobId: Int,
totalTasks: Int,
resultHandler: (Int, T) => Unit)
extends JobListener {
/**
* Interface used to listen for job completion or failure events after submitting a job to the
* DAGScheduler. The listener is notified each time a task succeeds, as well as if the whole
* job fails (and no further taskSucceeded events will happen).
*/
private[spark] trait JobListener {
def taskSucceeded(index: Int, result: Any)
def jobFailed(exception: Exception)
}
/**
* :: DeveloperApi ::
* A logger class to record runtime information for jobs in Spark. This class outputs one log file
* for each Spark job, containing tasks start/stop and shuffle information. JobLogger is a subclass
* of SparkListener, use addSparkListener to add JobLogger to a SparkContext after the SparkContext
* is created. Note that each JobLogger only works for one SparkContext
*
* NOTE: The functionality of this class is heavily stripped down to accommodate for a general
* refactor of the SparkListener interface. In its place, the EventLoggingListener is introduced
* to log application information as SparkListenerEvents. To enable this functionality, set
* spark.eventLog.enabled to true.
*/
@DeveloperApi
@deprecated("Log application information by setting spark.eventLog.enabled.", "1.0.0")
class JobLogger(val user: String, val logDirName: String) extends SparkListener with Logging {
/**
* A simple listener for application events.
*
* This listener expects to hear events from a single application only. If events
* from multiple applications are seen, the behavior is unspecified.
*/
private[spark] class ApplicationEventListener extends SparkListener {
/**
* Types of events that can be handled by the DAGScheduler. The DAGScheduler uses an event queue
* architecture where any thread can post an event (e.g. a task finishing or a new job being
* submitted) but there is a single "logic" thread that reads these events and takes decisions.
* This greatly simplifies synchronization.
*/
private[scheduler] sealed trait DAGSchedulerEvent
包含很多的event,重要的包括JobSubmitted,StageCancelled等等。
/**
* A [[SparkListenerEvent]] bus that relays [[SparkListenerEvent]]s to its listeners
*/
private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkListenerEvent] {
/**
* A SparkListener that logs events to persistent storage.
*
* Event logging is specified by the following configurable parameters:
* spark.eventLog.enabled - Whether event logging is enabled.
* spark.eventLog.compress - Whether to compress logged events
* spark.eventLog.overwrite - Whether to overwrite any existing files.
* spark.eventLog.dir - Path to the directory in which events are logged.
* spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
*/
private[spark] class EventLoggingListener(
appId: String,
logBaseDir: URI,
sparkConf: SparkConf,
hadoopConf: Configuration)
extends SparkListener with Logging {
/**
* A SparkListenerBus that can be used to replay events from serialized event data.
*/
private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
/**
* Asynchronously passes SparkListenerEvents to registered SparkListeners.
*
* Until start() is called, all posted events are only buffered. Only after this listener bus
* has started will events be actually propagated to all attached listeners. This listener bus
* is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
*/
private[spark] class LiveListenerBus
extends AsynchronousListenerBus[SparkListener, SparkListenerEvent]("SparkListenerBus")
with SparkListenerBus {
/**
* Represents an explanation for a executor or whole slave failing or exiting.
*/
private[spark]
class ExecutorLossReason(val message: String) {
override def toString: String = message
}
/**
* A backend interface for scheduling systems that allows plugging in different ones under
* TaskSchedulerImpl. We assume a Mesos-like model where the application gets resource offers as
* machines become available and can launch tasks on them.
*/
private[spark] trait SchedulerBackend {
SchedulerBackend的子类有四类分别为MesosSchedulerBackend,CoarseMesosSchedulerBackend,SimrSchedulerBackend,SparkDeploySchedulerBackend。MesosSchedulerBackend和CoarseMesosSchedulerBackend用于mesos的部署方式,SimrSchedulerBackend用于hadoop部署方式,SparkDeploySchedulerBackend用于纯spark的部署方式。YarnSchedulerBackend用于基于yarn的方式,问题是它是个abstract class,实现在
spark/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala里。
/**
* LocalBackend is used when running a local version of Spark where the executor, backend, and
* master all run in the same JVM. It sits behind a TaskSchedulerImpl and handles launching tasks
* on a single Executor (created by the LocalBackend) running locally.
*/
private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int)
/**
* Abstract Yarn scheduler backend that contains common logic
* between the client and cluster Yarn scheduler backends.
*/
private[spark] abstract class YarnSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
/**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
* This backend holds onto each executor for the duration of the Spark job rather than relinquishing
* executors whenever a task is done and asking the scheduler to launch a new executor for
* each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the
* coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode
* (spark.deploy.*).
*/
private[spark]
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSystem: ActorSystem)
extends ExecutorAllocationClient with SchedulerBackend with Logging
/**
* Tracks information about an active job in the DAGScheduler.
*/
private[spark] class ActiveJob(
val jobId: Int,
val finalStage: Stage,
val func: (TaskContext, Iterator[_]) => _,
val partitions: Array[Int],
val callSite: CallSite,
val listener: JobListener,
val properties: Properties) {
/**
* A stage is a set of independent tasks all computing the same function that need to run as part
* of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run
* by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the
* DAGScheduler runs these stages in topological order.
*
* Each Stage can either be a shuffle map stage, in which case its tasks' results are input for
* another stage, or a result stage, in which case its tasks directly compute the action that
* initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes
* that each output partition is on.
*
* Each Stage also has a jobId, identifying the job that first submitted the stage. When FIFO
* scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
* faster on failure.
*
* The callSite provides a location in user code which relates to the stage. For a shuffle map
* stage, the callSite gives the user code that created the RDD being shuffled. For a result
* stage, the callSite gives the user code that executes the associated action (e.g. count()).
*
* A single stage can consist of multiple attempts. In that case, the latestInfo field will
* be updated for each attempt.
*
*/
private[spark] class Stage(
val id: Int,
val rdd: RDD[_],
val numTasks: Int,
val shuffleDep: Option[ShuffleDependency[_, _, _]], // Output shuffle if stage is a map stage
val parents: List[Stage],
val jobId: Int,
val callSite: CallSite)
extends Logging {
/**
* :: DeveloperApi ::
* Stores information about a stage to pass from the scheduler to SparkListeners.
*/
@DeveloperApi
class StageInfo(
val stageId: Int,
val attemptId: Int,
val name: String,
val numTasks: Int,
val rddInfos: Seq[RDDInfo],
val details: String) {
/**
* Runs a thread pool that deserializes and remotely fetches (if necessary) task results.
*/
private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
/**
* A location where a task should run. This can either be a host or a (host, executorID) pair.
* In the latter case, we will prefer to launch the task on that executorID, but our next level
* of preference will be executors on the same host if this is not possible.
*/
private[spark] sealed trait TaskLocation {
def host: String
}
/**
* "FAIR" and "FIFO" determines which policy is used
* to order tasks amongst a Schedulable's sub-queues
* "NONE" is used when the a Schedulable has no sub-queues.
*/
object SchedulingMode extends Enumeration {
type SchedulingMode = Value
val FAIR, FIFO, NONE = Value
}
/**
* A set of tasks submitted together to the low-level TaskScheduler, usually representing
* missing partitions of a particular stage.
*/
private[spark] class TaskSet(
val tasks: Array[Task[_]],
val stageId: Int,
val attempt: Int,
val priority: Int,
val properties: Properties) {
Information about an [[org.apache.spark.Accumulable]] modified during a task or stage.
Parses and holds information about inputFormat (and files) specified as a parameter.
有意思的是,在object中有段注释:
/**
Computes the preferred locations based on input(s) and returned a location to block map.
Typical use of this method for allocation would follow some algo like this:
a) For each host, count number of splits hosted on that host.
b) Decrement the currently allocated containers on that host.
c) Compute rack info for each host and update rack -> count map based on (b).
d) Allocate nodes based on (c)
e) On the allocation result, ensure that we dont allocate "too many" jobs on a single node
(even if data locality on that is very high) : this is to prevent fragility of job if a
single (or small set of) hosts go down.
go to (a) until required nodes are allocated.
If a node 'dies', follow same procedure.
PS: I know the wording here is weird, hopefully it makes some sense !
*/
An Schedulable entity that represent collection of Pools or TaskSetManagers
/**
* Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of
* each task, retries tasks if they fail (up to a limited number of times), and
* handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces
* to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node,
* and statusUpdate, which tells it that one of its tasks changed state (e.g. finished).
*
* THREADING: This class is designed to only be called from code with a lock on the
* TaskScheduler (e.g. its event handlers). It should not be called from other threads.
*
* @param sched the TaskSchedulerImpl associated with the TaskSetManager
* @param taskSet the TaskSet to manage scheduling for
* @param maxTaskFailures if any particular task fails more than this number of times, the entire
* task set will be aborted
*/
private[spark] class TaskSetManager(
sched: TaskSchedulerImpl,
val taskSet: TaskSet,
val maxTaskFailures: Int,
clock: Clock = new SystemClock())
extends Schedulable with Logging {
这个文件蛮长,需要详细解释一下:
object TaskLocality extends Enumeration {
// Process local is expected to be used ONLY within TaskSetManager for now.
val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Value
type TaskLocality = Value
在TaskSetManager的逻辑中,它优先选择离自己近的节点先跑,优先级就是上面的
/**
* Respond to an offer of a single executor from the scheduler by finding a task
*
* NOTE: this function is either called with a maxLocality which
* would be adjusted by delay scheduling algorithm or it will be with a special
* NO_PREF locality which will be not modified
*
* @param execId the executor Id of the offered resource
* @param host the host Id of the offered resource
* @param maxLocality the maximum locality we want to schedule the tasks at
*/
@throws[TaskNotSerializableException]
def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
有个executer可以提供一个位置跑了,那么就找出一个来
Marks the task as successful and notifies the DAGScheduler that a task has ended.
Marks the task as failed, re-adds it to the list of pending tasks, and notifies the
/**
* An interface for sort algorithm
* FIFO: FIFO algorithm between TaskSetManagers
* FS: FS algorithm between Pools, and FIFO or FS within Pools
*/
private[spark] trait SchedulingAlgorithm {
def comparator(s1: Schedulable, s2: Schedulable): Boolean
}
其中FS指的是FairSchedulingAlgorithm
/**
* Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins"
* policy.
*
* OutputCommitCoordinator is instantiated in both the drivers and executors. On executors, it is
* configured with a reference to the driver's OutputCommitCoordinatorActor, so requests to commit
* output will be forwarded to the driver's OutputCommitCoordinator.
*
* This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests)
* for an extensive design discussion.
*/
private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
在driver上,会有一个OutputCommitCoordinatorActor,这个Actor就是OutputCommitCoordinator的持有者,它会接受第一个task的请求;deny剩下所有的task的请求。
请求的粒度是: AskPermissionToCommitOutput(stage, partition, taskAttempt)
/**
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
* stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a
* minimal schedule to run the job. It then submits stages as TaskSets to an underlying
* TaskScheduler implementation that runs them on the cluster.
*
* In addition to coming up with a DAG of stages, this class also determines the preferred
* locations to run each task on, based on the current cache status, and passes these to the
* low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being
* lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are
* not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task
* a small number of times before cancelling the whole stage.
*
* Here's a checklist to use when making or reviewing changes to this class:
*
* - When adding a new data structure, update `DAGSchedulerSuite.assertDataStructuresEmpty` to
* include the new structure. This will help to catch memory leaks.
*/
private[spark]
class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
extends Logging {
/**
* The main event loop of the DAG scheduler.
*/
override def onReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
listener, properties)
case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)
case JobCancelled(jobId) =>
dagScheduler.handleJobCancellation(jobId)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId) =>
dagScheduler.handleExecutorLost(execId, fetchFailed = false)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason) =>
dagScheduler.handleTaskSetFailed(taskSet, reason)
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}
请先参考如下文章:
Stage划分及提交源码分析 或者 stage
个人理解:
1. 最后的一个RDD一定是一个Stage,so 把它当作最终的Stage
2. 从finalRDD开始遍历,如果遇到了ShuffleDependence,那么它也应该是一个Stage
3. 2的过程不断重复,直到所有的Stage都生成。