当前位置: 首页 > 工具软件 > Nimbus > 使用案例 >

Nimbus(补充)

张智
2023-12-01

2021SC@SDUSC

nimbus是storm集群的"控制器",是storm集群的重要组成部分。我们可以通用执行bin/storm nimbus >/dev/null 2>&1 &来启动nimbus。bin/storm是一个python脚本,在这个脚本中定义了一个nimbus函数:

nimbus函数
 

def  nimbus( klass = "backtype.storm.daemon.nimbus" ):
    """Syntax: [storm nimbus]

   Launches the nimbus daemon. This command should be run under 
   supervision with a tool like daemontools or monit. 

   See Setting up a Storm cluster for more information.
   (https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
   """
    cppaths  =  [ STORM_DIR  +  "/log4j" ,  STORM_DIR  +  "/conf" ]
    jvmopts  =  parse_args( confvalue( "nimbus.childopts" ,  cppaths))  +  [
        "-Dlogfile.name=nimbus.log" ,
        "-Dlog4j.configuration=storm.log.properties" ,
    ]
    exec_storm_class(
        klass , 
        jvmtype = "-server" , 
        extrajars = cppaths , 
        jvmopts = jvmopts)

klass参数的默认值为backtype.storm.daemon.nimbus,backtype.storm.daemon.nimbus标识一个java类。STORM_DIR标识storm的安装目录,cppaths集合存放了log4j配置文件路径和storm配置文件storm.yaml路径,jvmopts存放传递给jvm的参数,包括log4j配文件路径、storm.yaml路径、log4j日志名称和log4j配置文件名称。

exec_storm_class函数的逻辑比较简单,具体实现如下:

exec_storm_class函数
 
 

def  exec_storm_class( klass ,  jvmtype = "-server" ,  jvmopts = [],  extrajars = [],  args = [],  fork = False ):  
    global  CONFFILE  
    all_args  =  [  
        "java" ,  jvmtype ,  get_config_opts (),  
        "-Dstorm.home="  +  STORM_DIR ,   
        "-Djava.library.path="  +  confvalue( "java.library.path" ,  extrajars ),  
        "-Dstorm.conf.file="  +  CONFFILE ,  
        "-cp" ,  get_classpath( extrajars ),  
    ]  +  jvmopts  +  [ klass ]  +  list( args)  
    print  "Running: "  +  " " . join( all_args)  
    if  fork :  
        os . spawnvp( os . P_WAIT ,  "java" ,  all_args)  
    else :  
        os . execvp( "java" ,  all_args)  # replaces the current process and never returns

get_config_opts()获取jvm的默认配置信息,confvalue("java.library.path", extrajars)获取storm使用的本地库JZMQ加载路径,get_classpath(extrajars)获取所有依赖jar包的完整路径,然后拼接一个java -cp命令运行klass的main方法。
klass默认值为backtype.storm.daemon.nimbus,所以exec_storm_class函数最终调用backtype.storm.daemon.nimbus类的main方法。

backtype.storm.daemon.nimbus类定义在nimbus.clj文件中,定义如下:

backtype.storm.daemon.nimbus类
 
 

(ns  backtype.storm.daemon.nimbus
 ( :import  [ org.apache.thrift.server  THsHaServer  THsHaServer$Args ])
 ( :import  [ org.apache.thrift.protocol  TBinaryProtocol  TBinaryProtocol$Factory ])
 ( :import  [ org.apache.thrift.exception ])
 ( :import  [ org.apache.thrift.transport  TNonblockingServerTransport  TNonblockingServerSocket ])
 ( :import  [ java.nio  ByteBuffer ])
 ( :import  [ java.io  FileNotFoundException ])
 ( :import  [ java.nio.channels  Channels  WritableByteChannel ])
 ( :use  [ backtype.storm.scheduler.DefaultScheduler ])
 ( :import  [ backtype.storm.scheduler  INimbus  SupervisorDetails  WorkerSlot  TopologyDetails
            Cluster  Topologies  SchedulerAssignment  SchedulerAssignmentImpl  DefaultScheduler  ExecutorDetails ])
 ( :use  [ backtype.storm  bootstrap  util ])
 ( :use  [ backtype.storm.config  :only  [ validate-configs-with-schemas ]])
 ( :use  [ backtype.storm.daemon  common ])
 ( :gen-class
    :methods  [ ^ { :static  true }  [ launch  [ backtype.storm.scheduler.INimbus ]  void ]]))
    ...
    ;; 其他方法
    ...
   ( defn  -main  []
 ( -launch ( standalone-nimbus)))

:gen-class指示Clojure生成Java类backtype.storm.daemon.nimbus,并且声明一个静态方法launch,launch方法接收一个实现backtype.storm.scheduler.INimbus接口的实例作为参数。launch函数的参数是由standalone-nimbus函数生成的。standalone-nimbus函数定义如下:返回一个实现INimbus接口的实例。

standalone-nimbus函数
 

( defn  standalone-nimbus  []
  ;; 实现INimbus接口
 ( reify  INimbus
    ;; prepare函数为空实现
   ( prepare  [ this  conf  local-dir ]
     )
    ;; allSlotsAvailableForScheduling获取所有可用的slot集合
   ( allSlotsAvailableForScheduling  [ this  supervisors  topologies  topologies-missing-assignments ]
      ;; supervisors标识集群所有supervisor的详细信息对象SupervisorDetails的集合
;;也就是说通过遍历Collection<SupervisorDetails> 类型的变量supervisors取id和meta,用来初始化WorkerSolt对象,mapcat将WorkerSolt形成集合,;;之后将这个集合作为参数传递给set,使之返回符合接口要求的Collection<WorkerSlot>
     ( ->>  supervisors
           ;; 遍历supervisors,为supervisor的每个port生成对应的WorkerSlot对象,WorkerSlot包含两个属性节点id和port
          ( mapcat ( fn  [ ^ SupervisorDetails s ]
                    ( for  [p ( .getMeta s )]
                      ( WorkerSlot. ( .getId s) p))))
           set ))
   ( assignSlots  [ this  topology  slots ]
     )
   ( getForcedScheduler  [ this ]
      nil )
getHostName 是取主机名的接口,supervisors 的类型是Map<String,SupervisorDetails> ,node-id的类型是String。其过程是从supervisors中通过node-id去找符合条件的SupervisorDetails对象,如果找到了,就调用SupervisorDetails的getHost方法取得主机名,否者返回的是nil

    ;; 获取supervisor主机名
   ( getHostName  [ this  supervisors  node-id ]
     ( if-let  [ ^ SupervisorDetails  supervisor ( get  supervisors  node-id )]
       ( .getHost  supervisor)))
   ))

launch函数定义如下:

launch函数

( defn  -launch  [ nimbus ]
 ;;
  ;; read-storm-config函数用于读取storm集群的配置信息,参见其定义部分
 ( launch-server! ( read-storm-config)  nimbus))
launch-server! 函数定义如下:  
( defn  launch-server!  [ conf  nimbus ]
  ;; 判断是否是分布式模式,如果是本地模式则抛出IllegalArgumentException
 ( validate-distributed-mode!  conf)
  ;; service-handler函数是由宏defserverfn定义的,返回一个实现了Nimbus类中的Iface接口的实例,Nimbus类是由thrift框架自动生成的,Iface接口封装了service Nimbus的全部接口。
  ;; nimbus thrift server端提供的接口服务都是由这个实例实现的。service-handler函数参见其定义部分,service Nimbus参见storm.thrift
  ;; service-handler绑定实现了Nimbus类中的Iface接口的实例
 ( let  [ service-handler ( service-handler  conf  nimbus)
        options ( -> ( TNonblockingServerSocket. ( int ( conf  NIMBUS-THRIFT-PORT)))
                   ( THsHaServer$Args.)
                   ( .workerThreads  64)
                   ( .protocolFactory ( TBinaryProtocol$Factory.  false  true ( conf  NIMBUS-THRIFT-MAX-BUFFER-SIZE)))
                   ( .processor ( Nimbus$Processor.  service-handler))
                   )
       server ( THsHaServer. ( do ( set! ( .  options  maxReadBufferBytes)( conf  NIMBUS-THRIFT-MAX-BUFFER-SIZE))  options ))]
   ( add-shutdown-hook-with-force-kill-in-1-sec ( fn  []
                                                 ( .shutdown  service-handler)
                                                 ( .stop  server)))
   ( log-message  "Starting Nimbus server...")
   ( .serve  server)))

read-storm-config定义如下: 

read-storm-config函数

( defn  read-storm-config  
  []  
  ;; conf绑定storm集群配置信息
 ( let  [ conf ( clojurify-structure ( Utils/readStormConfig ))]  
    ;; validate-configs-with-schemas函数验证配置信息的正确性并删除不正确的配置信息
   ( validate-configs-with-schemas  conf)  
    conf))

read-storm-config函数调用了backtype.storm.utils.Utils类的静态方法readStormConfig,如下:

readStormConfig方法

public  static  Map  readStormConfig()  {  
        // 调用readDefaultConfig从defaults.yaml配置文件读取默认配置信息存入ret
        Map  ret  =  readDefaultConfig(); 
        // 获取用户自定义配置文件路径
        String  confFile  =  System . getProperty( "storm.conf.file");  
        Map  storm;  
        if ( confFile == null ||  confFile . equals( ""))  {  
            storm  =  findAndReadConfigFile( "storm.yaml" ,  false);  
        }  else  { 
            // 读取用户自定义配置信息
            storm  =  findAndReadConfigFile( confFile ,  true);  
        }  
        // 将用户自定义的配置信息覆盖更新到ret中
        ret . putAll( storm); 
        // 将命令行方式提供的配置信息覆盖更新到ret中 
        ret . putAll( readCommandLineOpts());  
        // 返回覆盖更新后的配置信息ret
        return  ret;  
    }

service-handler函数定义如下:

defserverfn是一个宏,(defserverfn service-handler [conf inimbus] ... )返回一个名字为service-handler函数。宏扩展是在编译时进行的

service-handler函数

( defserverfn  service-handler  [ conf  inimbus ]
  ;; 调用inimbus的prepare方法,inimbus是standalone-nimbus函数返回的实现INimbus接口的实例,当前版本prepare方法为空实现
 ( .prepare  inimbus  conf ( master-inimbus-dir  conf))
  ;; 打印日志信息
 ( log-message  "Starting Nimbus with conf "  conf)
  ;; nimbus绑定了一个map,这个map保存了nimbus端所必需的"属性",详见nimbus-data函数定义部分
 ( let  [ nimbus ( nimbus-data  conf  inimbus )]
    ;; 调用nimbus这个map中保存的backtype.storm.nimbus.DefaultTopologyValidator对象的prepare方法,通过查看backtype.storm.nimbus.DefaultTopologyValidator类,我们可以发现prepare默认为空实现
   ( .prepare  ^ backtype.storm.nimbus.ITopologyValidator ( :validator  nimbus)  conf)
    ;; cleanup-corrupt-topologies!函数的主要功能就是将在nimbus服务器{storm.local.dir}/nimbus/stormdist/路径中不存在的topology id从zookeeper的/storms/路径中删除,即删除在nimbus服务器上缺失jar包、topology信息和配置信息的当前正在运行的topology,
    ;; cleanup-corrupt-topologies!函数参见其定义部分
   ( cleanup-corrupt-topologies!  nimbus)
    ;; 更新当前storm集群上topology的状态
   ( doseq  [ storm-id ( .active-storms ( :storm-cluster-state  nimbus ))]
      ;; transition!函数主要功能就是负责topology状态转换,规定了当topology由一种状态转换成另一种新状态时,需要做哪些处理操作,参见其定义部分
     ( transition!  nimbus  storm-id  :startup))
    ;; 通过schedule-recurring函数向storm定时器添加了一个"周期任务"检查心跳,重新分配任务,清理不活跃的topology,mk-assignments函数的主要功能就是检查心跳和重新分配任务。关于storm定时器详细分析请见"storm定时器timer源码分析";关于mk-assignments函数请见"storm任务分配源码分析"
    ;; do-cleanup函数主要功能就是清理不活跃的topology,请参加其定义部分
   ( schedule-recurring ( :timer  nimbus)
                        0
                       ( conf  NIMBUS-MONITOR-FREQ-SECS)
                       ( fn  []
                         ( when ( conf  NIMBUS-REASSIGN)
                           ( locking ( :submit-lock  nimbus)
                             ( mk-assignments  nimbus)))
                         ( do-cleanup  nimbus)
                         ))
    ;; Schedule Nimbus inbox cleaner
    ;; 通过schedule-recurring函数向storm定时器添加一个"周期任务"删除nimbus服务器上的过期jar包
   ( schedule-recurring ( :timer  nimbus)
                        0
                       ( conf  NIMBUS-CLEANUP-INBOX-FREQ-SECS)
                       ( fn  []
                         ( clean-inbox ( inbox  nimbus) ( conf  NIMBUS-INBOX-JAR-EXPIRATION-SECS))
                         ))    
   ( reify  Nimbus$Iface
      ;; submitTopologyWithOpts函数负责topology的提交,有关该函数的详细分析请参见"storm源码分析之topology提交过程"
     ( ^ void  submitTopologyWithOpts
        [ this  ^ String  storm-name  ^ String  uploadedJarLocation  ^ String  serializedConf  ^ StormTopology  topology
         ^ SubmitOptions  submitOptions ]
       ( try
         ( assert ( not-nil?  submitOptions))
         ( validate-topology-name!  storm-name)
         ( check-storm-active!  nimbus  storm-name  false)
         ( let  [ topo-conf ( from-json  serializedConf )]
           ( try
             ( validate-configs-with-schemas  topo-conf)
             ( catch  IllegalArgumentException  ex
               ( throw ( InvalidTopologyException. ( .getMessage  ex)))))
           ( .validate  ^ backtype.storm.nimbus.ITopologyValidator ( :validator  nimbus)
                       storm-name
                       topo-conf
                       topology))
         ( swap! ( :submitted-count  nimbus)  inc)
         ( let  [ storm-id ( str  storm-name  "-"  @( :submitted-count  nimbus)  "-" ( current-time-secs))
                storm-conf ( normalize-conf
                            conf
                           ( ->  serializedConf
                                from-json
                               ( assoc  STORM-ID  storm-id)
                             ( assoc  TOPOLOGY-NAME  storm-name))
                            topology)
                total-storm-conf ( merge  conf  storm-conf)
                topology ( normalize-topology  total-storm-conf  topology)
                storm-cluster-state ( :storm-cluster-state  nimbus )]
           ( system-topology!  total-storm-conf  topology)  ;; this validates the structure of the topology
           ( log-message  "Received topology submission for "  storm-name  " with conf "  storm-conf)
            ;; lock protects against multiple topologies being submitted at once and
            ;; cleanup thread killing topology in b/w assignment and starting the topology
           ( locking ( :submit-lock  nimbus)
             ( setup-storm-code  conf  storm-id  uploadedJarLocation  storm-conf  topology)
             ( .setup-heartbeats!  storm-cluster-state  storm-id)
             ( let  [ thrift-status->kw-status  { TopologyInitialStatus/INACTIVE  :inactive
                                              TopologyInitialStatus/ACTIVE  :active }]
               ( start-storm  nimbus  storm-name  storm-id ( thrift-status->kw-status ( .get_initial_status  submitOptions))))
             ( mk-assignments  nimbus)))
         ( catch  Throwable  e
           ( log-warn-error  e  "Topology submission exception. (topology name='"  storm-name  "')")
           ( throw  e))))
      ;; submitTopology函数调用了submitTopologyWithOpts函数
     ( ^ void  submitTopology
        [ this  ^ String  storm-name  ^ String  uploadedJarLocation  ^ String  serializedConf  ^ StormTopology  topology ]
       ( .submitTopologyWithOpts  this  storm-name  uploadedJarLocation  serializedConf  topology
                                ( SubmitOptions.  TopologyInitialStatus/ACTIVE)))
      ;; killTopology函数见名知意,调用了killTopologyWithOpts函数
     ( ^ void  killTopology  [ this  ^ String  name ]
       ( .killTopologyWithOpts  this  name ( KillOptions.)))
      ;; storm-name绑定kill的topology名称,KillOptions是一个thrift数据结构,只有个属性wait_secs,表示延迟多长时间执行kill
     ( ^ void  killTopologyWithOpts  [ this  ^ String  storm-name  ^ KillOptions  options ]
        ;; check-storm-active!检查topology是否是"active",如果不活跃则抛出异常
       ( check-storm-active!  nimbus  storm-name  true)
        ;; 如果设置了延迟时间,wait-amt绑定延迟时间
       ( let  [ wait-amt ( if ( .is_set_wait_secs  options)
                        ( .get_wait_secs  options)                         
                         )]
          ;; transition-name!函数主要功能就是根据storm-name获取topology id,然后调用transition!函数,topology由当前状态转换到:kill状态,:kill状态是一个"临时状态",最终修改topology状态为:killed,:killed状态为"持久状态"
          ;; 通过state-transitions函数我们可以知道无论从哪种状态转换到:kill状态,都将调用kill-transition函数,kill-transition通过调用delay-event向storm定时器添加一个定时任务,这个定时任务的主要功能就是负责topology由:killed状态
          ;; 转换到:remove状态,这时将调用remove-storm!函数清理topology
         ( transition-name!  nimbus  storm-name  [ :kill  wait-amt ]  true)
         ))
      ;; rebalance函数可以重新设置topology的进程数和各个component的并行度,RebalanceOptions是thirft数据结构,有三个属性rebalance的延迟时间、新的进程数,新的并行度
     ( ^ void  rebalance  [ this  ^ String  storm-name  ^ RebalanceOptions  options ]
        ;; check-storm-active!检查topology是否是"active",如果不活跃则抛出异常
       ( check-storm-active!  nimbus  storm-name  true)
        ;; 如果设置了延迟时间,wait-amt绑定延迟时间
       ( let  [ wait-amt ( if ( .is_set_wait_secs  options)
                        ( .get_wait_secs  options))
              ;; 如果设置了新的进程数,num-workers绑定新进程数
              num-workers ( if ( .is_set_num_workers  options)
                           ( .get_num_workers  options))
              ;; 如果设置了新的组件并行度,executor-overrides绑定新组件并行度
              executor-overrides ( if ( .is_set_num_executors  options)
                                  ( .get_num_executors  options)
                                   {})]
         ( doseq  [[ c  num-executors ]  executor-overrides ]
           ( when ( <=  num-executors  0)
             ( throw ( InvalidTopologyException.  "Number of executors must be greater than 0"))
             ))
          ;; transition-name!函数主要功能就是根据storm-name获取topology id,然后调用transition!函数,topology由当前状态转换到:rebalance状态,:rebalance状态是一个"临时状态",最终修改topology状态为:rebalancing,:rebalancing状态为"持久状态"
          ;; 通过state-transitions函数我们可以知道只允许从:active和:inactive状态转换到:rebalance状态,并调用rebalance-transition函数,rebalance-transition通过调用delay-event向storm定时器添加一个定时任务,这个定时任务的主要功能就是负责topology由:rebalancing状态
          ;; 转换到:do-rebalance状态,并调用do-rebalance函数(重新设置topology的进程数和组件并行度,然后调用mk-assignments函数重新进行任务分配),然后将topology状态修改成:rebalancing的前一个状态
         ( transition-name!  nimbus  storm-name  [ :rebalance  wait-amt  num-workers  executor-overrides ]  true)
         ))
        ;; 激活topology,将topology状态修改成:active,处理过程与killTopologyWithOpts、rebalance相似
     ( activate  [ this  storm-name ]
       ( transition-name!  nimbus  storm-name  :activate  true)
       )
      ;; 将topology状态修改成:inactive,deactivate处理过程与activate相似
     ( deactivate  [ this  storm-name ]
       ( transition-name!  nimbus  storm-name  :inactivate  true))
      ;; beginFileUpload()函数获取nimbus存放jar的目录
     ( beginFileUpload  [ this ]
       ( let  [ fileloc ( str ( inbox  nimbus)  "/stormjar-" ( uuid)  ".jar" )]
         ( .put ( :uploaders  nimbus)
                fileloc
               ( Channels/newChannel ( FileOutputStream.  fileloc)))
         ( log-message  "Uploading file from client to "  fileloc)
          fileloc
         ))
      ;; 上传jar包文件
     ( ^ void  uploadChunk  [ this  ^ String  location  ^ ByteBuffer  chunk ]
       ( let  [ uploaders ( :uploaders  nimbus)
              ^ WritableByteChannel  channel ( .get  uploaders  location )]
         ( when-not  channel
           ( throw ( RuntimeException.
                    "File for that location does not exist (or timed out)")))
         ( .write  channel  chunk)
         ( .put  uploaders  location  channel)
         ))
      ;; 上传jar包完成,关闭Channel
     ( ^ void  finishFileUpload  [ this  ^ String  location ]
       ( let  [ uploaders ( :uploaders  nimbus)
              ^ WritableByteChannel  channel ( .get  uploaders  location )]
         ( when-not  channel
           ( throw ( RuntimeException.
                    "File for that location does not exist (or timed out)")))
         ( .close  channel)
         ( log-message  "Finished uploading file from client: "  location)
         ( .remove  uploaders  location)
         ))
      ;; 获取文件输入流
     ( ^ String  beginFileDownload  [ this  ^ String  file ]
       ( let  [ is ( BufferFileInputStream.  file)
              id ( uuid )]
         ( .put ( :downloaders  nimbus)  id  is)
          id
         ))
      ;; 读取文件
     ( ^ ByteBuffer  downloadChunk  [ this  ^ String  id ]
       ( let  [ downloaders ( :downloaders  nimbus)
              ^ BufferFileInputStream  is ( .get  downloaders  id )]
         ( when-not  is
           ( throw ( RuntimeException.
                    "Could not find input stream for that id")))
         ( let  [ ret ( .read  is )]
           ( .put  downloaders  id  is)
           ( when ( empty?  ret)
             ( .remove  downloaders  id))
           ( ByteBuffer/wrap  ret)
           )))
      ;; 获取storm集群配置信息
     ( ^ String  getNimbusConf  [ this ]
       ( to-json ( :conf  nimbus)))
      ;; 获取topology配置信息
     ( ^ String  getTopologyConf  [ this  ^ String  id ]
       ( to-json ( try-read-storm-conf  conf  id)))
      ;; 获取StormTopology
     ( ^ StormTopology  getTopology  [ this  ^ String  id ]
       ( system-topology! ( try-read-storm-conf  conf  id) ( try-read-storm-topology  conf  id)))

     ( ^ StormTopology  getUserTopology  [ this  ^ String  id ]
       ( try-read-storm-topology  conf  id))
            ;; 获取当前集群的汇总信息包括supervisor汇总信息,nimbus启动时间,所有活跃topology汇总信息
     ( ^ ClusterSummary  getClusterInfo  [ this ]
       ( let  [ storm-cluster-state ( :storm-cluster-state  nimbus)
                    ;; supervisor-infos绑定supervisor id->SupervisorInfo对象键值对的map
              ;; SupervisorInfo定义:(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs])
              supervisor-infos ( all-supervisor-info  storm-cluster-state)
              ;; TODO: need to get the port info about supervisors...
              ;; in standalone just look at metadata, otherwise just say N/A?
              ;; 根据SupervisorInfo数据创建SupervisorSummary数据
              supervisor-summaries ( dofor  [[ id  info ]  supervisor-infos ]
                                         ( let  [ ports ( set ( :meta  info))  ;;TODO: this is only true for standalone
                                                ]
                                           ( SupervisorSummary. ( :hostname  info)
                                                               ( :uptime-secs  info)
                                                               ( count  ports)
                                                               ( count ( :used-ports  info))
                                                                id )
                                           ))
              ;; nimbus-uptime绑定nimbus启动时间                              
              nimbus-uptime (( :uptime  nimbus))
              ;; bases绑定集群上所有活跃topology的StormBase数据集合
              bases ( topology-bases  storm-cluster-state)
              ;; topology-summaries绑定活跃topology的TopologySummary数据
              topology-summaries ( dofor  [[ id  base ]  bases ]
                                       ( let  [ assignment ( .assignment-info  storm-cluster-state  id  nil )]
                                         ( TopologySummary.  id
                                                           ( :storm-name  base)
                                                           ( ->> ( :executor->node+port  assignment)
                                                                 keys
                                                                ( mapcat  executor-id->tasks)
                                                                 count) 
                                                           ( ->> ( :executor->node+port  assignment)
                                                                 keys
                                                                 count)                                                            
                                                           ( ->> ( :executor->node+port  assignment)
                                                                 vals
                                                                 set
                                                                 count)
                                                           ( time-delta ( :launch-time-secs  base))
                                                           ( extract-status-str  base))
                                          ))]
          ;; 创建ClusterSummary数据
         ( ClusterSummary.  supervisor-summaries
                           nimbus-uptime
                           topology-summaries)
         ))
      ;; 获取指定storm-id的topology的TopologyInfo数据
     ( ^ TopologyInfo  getTopologyInfo  [ this  ^ String  storm-id ]
        ;; storm-cluster-state绑定StormClusterState对象
       ( let  [ storm-cluster-state ( :storm-cluster-state  nimbus)
              ;; task->component绑定任务id->组件名称键值对的map,形如:{1 "boltA", 2 "boltA", 3 "boltA", 4 "boltA", 5 "boltB", 6 "boltB"}
              task->component ( storm-task-info ( try-read-storm-topology  conf  storm-id) ( try-read-storm-conf  conf  storm-id))
              ;; bases绑storm-id的StormBase
              base ( .storm-base  storm-cluster-state  storm-id  nil)
              ;; assignment绑定该topology的AssignmentInfo信息,(defrecord Assignment [master-code-dir node->host executor->node+port executor->start-time-secs])
              assignment ( .assignment-info  storm-cluster-state  storm-id  nil)
              ;; beats绑定该topology所有executor-id->心跳信息的map
              beats ( .executor-beats  storm-cluster-state  storm-id ( :executor->node+port  assignment))
              ;; all-components绑定该topology所有component-id集合
              all-components ( ->  task->component  reverse-map  keys)
              ;; errors绑定component-id->组件错误信息的map
              errors ( ->>  all-components
                         ( map ( fn  [ c ]  [ c ( get-errors  storm-cluster-state  storm-id  c )]))
                         ( into  {}))
              ;; executor-summaries绑定ExecutorSummary集合
              executor-summaries ( dofor  [[ executor  [ node  port ]] ( :executor->node+port  assignment )]
                                       ( let  [ host ( ->  assignment  :node->host ( get  node))
                                              heartbeat ( get  beats  executor)
                                              stats ( :stats  heartbeat)
                                              stats ( if  stats
                                                     ( stats/thriftify-executor-stats  stats ))]
                                         ( doto
                                             ( ExecutorSummary. ( thriftify-executor-id  executor)
                                                               ( ->  executor  first  task->component)
                                                                host
                                                                port
                                                               ( nil-to-zero ( :uptime  heartbeat)))
                                           ( .set_stats  stats))
                                         ))
              ]
          ;; 创建TopologyInfo对象
         ( TopologyInfo.  storm-id
                        ( :storm-name  base)
                        ( time-delta ( :launch-time-secs  base))
                         executor-summaries
                        ( extract-status-str  base)
                         errors
                        )
         ))
     
      Shutdownable
     ( shutdown  [ this ]
       ( log-message  "Shutting down master")
       ( cancel-timer ( :timer  nimbus))
       ( .disconnect ( :storm-cluster-state  nimbus))
       ( .cleanup ( :downloaders  nimbus))
       ( .cleanup ( :uploaders  nimbus))
       ( log-message  "Shut down master")
       )
      DaemonCommon
     ( waiting?  [ this ]
       ( timer-waiting? ( :timer  nimbus))))))


 
 

 类似资料: