Spark-streaming-scheduler

邵宜年
2023-12-01

Spark-streaming-scheduler

@(spark)[streaming|scheduler]

BatchInfo

/**                                                                                                                                                                     
 * :: DeveloperApi ::                                                                                                                                                   
 * Class having information on completed batches.                                                                                                                       
 * @param batchTime   Time of the batch                                                                                                                                 
 * @param submissionTime  Clock time of when jobs of this batch was submitted to                                                                                        
 *                        the streaming scheduler queue                                                                                                                 
 * @param processingStartTime Clock time of when the first job of this batch started processing                                                                         
 * @param processingEndTime Clock time of when the last job of this batch finished processing                                                                           
 */                                                                                                                                                                     
@DeveloperApi                                                                                                                                                           
case class BatchInfo(                                                                                                                                                   
    batchTime: Time,                                                                                                                                                    
    receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]],                                                                                                              
    submissionTime: Long,                                                                                                                                               
    processingStartTime: Option[Long],                                                                                                                                  
    processingEndTime: Option[Long]                                                                                                                                     
  ) {    

ReceivedBlockTracker

/**                                                                                                                                                                     
 * Class that keep track of all the received blocks, and allocate them to batches                                                                                       
 * when required. All actions taken by this class can be saved to a write ahead log                                                                                     
 * (if a checkpoint directory has been provided), so that the state of the tracker                                                                                      
 * (received blocks and block-to-batch allocations) can be recovered after driver failure.                                                                              
 *                                                                                                                                                                      
 * Note that when any instance of this class is created with a checkpoint directory,                                                                                    
 * it will try reading events from logs in the directory.                                                                                                               
 */                                                                                                                                                                     
private[streaming] class ReceivedBlockTracker(                                                                                                                          
    conf: SparkConf,                                                                                                                                                    
    hadoopConf: Configuration,                                                                                                                                          
    streamIds: Seq[Int],                                                                                                                                                
    clock: Clock,                                                                                                                                                       
    checkpointDirOption: Option[String])                                                                                                                                
  extends Logging {  

#

/**                                                                                                                                                                     
 * This class manages the execution of the receivers of ReceiverInputDStreams. Instance of                                                                              
 * this class must be created after all input streams have been added and StreamingContext.start()                                                                      
 * has been called because it needs the final set of input streams at the time of instantiation.                                                                        
 *                                                                                                                                                                      
 * @param skipReceiverLaunch Do not launch the receiver. This is useful for testing.                                                                                    
 */                                                                                                                                                                     
private[streaming]                                                                                                                                                      
class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {   

它的重点则是

  /** This thread class runs all the receivers on the cluster.  */                                                                                                      
  class ReceiverLauncher {     d

Job

/**                                                                                                                                                                     
 * Class representing a Spark computation. It may contain multiple Spark jobs.                                                                                          
 */                                                                                                                                                                     
private[streaming]                                                                                                                                                      
class Job(val time: Time, func: () => _) {      

JobSet

/** Class representing a set of Jobs                                                                                                                                    
  * belong to the same batch.                                                                                                                                           
  */                                                                                                                                                                    
private[streaming]                                                                                                                                                      
case class JobSet(                                                                                                                                                      
    time: Time,                                                                                                                                                         
    jobs: Seq[Job],                                                                                                                                                     
    receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]] = Map.empty                                                                                                   
  ) {    

#

 类似资料: