@(spark)[streaming|scheduler]
/**
* :: DeveloperApi ::
* Class having information on completed batches.
* @param batchTime Time of the batch
* @param submissionTime Clock time of when jobs of this batch was submitted to
* the streaming scheduler queue
* @param processingStartTime Clock time of when the first job of this batch started processing
* @param processingEndTime Clock time of when the last job of this batch finished processing
*/
@DeveloperApi
case class BatchInfo(
batchTime: Time,
receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]],
submissionTime: Long,
processingStartTime: Option[Long],
processingEndTime: Option[Long]
) {
/**
* Class that keep track of all the received blocks, and allocate them to batches
* when required. All actions taken by this class can be saved to a write ahead log
* (if a checkpoint directory has been provided), so that the state of the tracker
* (received blocks and block-to-batch allocations) can be recovered after driver failure.
*
* Note that when any instance of this class is created with a checkpoint directory,
* it will try reading events from logs in the directory.
*/
private[streaming] class ReceivedBlockTracker(
conf: SparkConf,
hadoopConf: Configuration,
streamIds: Seq[Int],
clock: Clock,
checkpointDirOption: Option[String])
extends Logging {
/**
* This class manages the execution of the receivers of ReceiverInputDStreams. Instance of
* this class must be created after all input streams have been added and StreamingContext.start()
* has been called because it needs the final set of input streams at the time of instantiation.
*
* @param skipReceiverLaunch Do not launch the receiver. This is useful for testing.
*/
private[streaming]
class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {
它的重点则是
/** This thread class runs all the receivers on the cluster. */
class ReceiverLauncher { d
/**
* Class representing a Spark computation. It may contain multiple Spark jobs.
*/
private[streaming]
class Job(val time: Time, func: () => _) {
/** Class representing a set of Jobs
* belong to the same batch.
*/
private[streaming]
case class JobSet(
time: Time,
jobs: Seq[Job],
receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]] = Map.empty
) {