2021SC@SDUSC
上一篇博客展示了nimbus的核心代码和一些基本的分析注释,本文将详细分析解释第一部分代码,并进行补充。
nimbus-data
nimbus-data函数的定义如下所示
(defn nimbus-data [conf inimbus]
(let [forced-scheduler (.getForcedScheduler inimbus)]
{:conf conf
:inimbus inimbus
:submitted-count (atom 0)
:storm-cluster-state (cluster/mk-storm-cluster-state conf)
:submit-lock (Object.)
:heartbeats-cache (atom {})
:downloaders (file-cache-map conf)
:uploaders (file-cache-map conf)
:uptime (uptime-computer)
:validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))
:timer (mk-timer :kill-fn (fn [t]
(log-error t"Error when processing event")
(halt-process!20 "Error when processing an event")
))
:scheduler (mk-scheduler conf inimbus)
}))
let的bindings定义的变量forced-scheduler的值是个nil,因为INimbus中实现的getForcedScheduler方法是个空实现。
再看看let 块的执行体,它是一个map。其中:conf 的值是一个storm 配置的map,:inimbus的值是INimbus接口的匿名实现也就是一个适配器。submitted-count 的值是一个atom类型的值,初始化为0。:storm-cluster-state的值是什么,需要看看cluster/mk-storm-cluster-state这个函数内部是做了什么。
cluster/mk-storm-cluster-state这个函数内部匿名实现了StormClusterState协议,而实现时又用到了ClusterState协议。这个函数主要用于获取集群的各种状态,底层zookeeper的操作上层封装在了ClusterState协议的实现,而真正底层访问zookeeper使用apache curator这个开源库。ClusterState协议属于zookeeper的部分,就不展开详细讲述了,我们接着代码继续分析
:submit-lock (Object.)
:heartbeats-cache (atom {})
:downloaders (file-cache-map conf)
submit-lock就是一个java的Object对象用于加锁解锁,heartbeats-cache是一个atom引用类型,这个引用所包含的的值是一个空的map。
downloaders是一个TimeCacheMap对象。downloaders的值是(file-cache-mapconf)
其中file-cache-map是一个函数,只需要知道TimeCacheMap是storm自己实现的,对外提供了ExpiredCallback接口类,这个接口类中只有一个方法是expire方法。在file-cache-map函数中匿名实现了这个接口。
TimeCacheMap非常复杂,不过多介绍,只需知道TimeCacheMap对外展示的是一个类似map类,可以在生成这个对象的时候设置超时回调函数(也就是ExpiredCallback这个接口,超时处理,在这里就是关闭流),之后可以像map一样put,get进行操作,在超时时间到了之后,会调用用户设置的回到函数,将超时的数据传递到回调函数中进行处理。需要注意的是,这个类的超时不是很精确。
回到nimbus-data接着看
:uploaders(file-cache-map conf)
:uptime (uptime-computer)
uploaders的值和前面的downloader的值是一样的,也是一个TimeCacheMap对象。
uptime的值是一个函数,是uptime-computer这个函数所返回的一个匿名函数,如下所示:
(defn uptime-computer []
(let [start-time (current-time-secs)]
(fn []
(time-delta start-time)
)))
uptime-computer就是用于计算时间差的,需要注意的是,在添加:uptime (uptime-computer)
这条记录的时候,start-time已经开始取当前时间了,那么在之后真正取:uptime的值开始计算的时候,就会得到时间差。
在service-handler中有条语句nimbus-uptime ((:uptime nimbus)) 这就会计算出,在设置nimbus这个map的时候时间点和当前时间的差。两层圆括号的意思是,先从map中取出值,因为值是一个函数,所以还要再用一对圆括号括起来执行。
回到nimbus-data接着看
:validator(new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))
:timer (mk-timer :kill-fn (fn [t]
(log-error t "Error whenprocessing event")
(halt-process!20 "Error when processing an event")
))
:scheduler (mk-scheduler conf inimbus)
validator的值是一个对象。类名是在default.yaml中所指定的nimbus.topology.validator:"backtype.storm.nimbus.DefaultTopologyValidator",这里的先取到类名,然后根据类名创建对象。
timer的值是一个map ,这个map中包含了定时器,队列,锁等数据。在之前分析supervisor启动过程中也有提到mk-timer,这里再来回顾一下。mk-timer和java中的Timer是很像的。mk-timer的做法是启动一个线程,循环从队列中去peek一个数据,这个数据是一个vector类型的数据,内有会有三个值,分别是时间,函数,和uuid。线程把当前时间和从队列预读的这个数据中的时间值进行比较,如果时间到了,则从队列中弹出这个数据,然后执行这个数据中的第二个值,也就是函数。在执行完之后,会sleep 1秒,这个sleep的实现是在storm自己提供的Time.java中。当sleep结束,会重复这个动作。
mk-timer最后会返回的值如下所示:
{:timer-thread timer-thread
:queue queue
:active active
:lock lock
:cancel-notifier notifier}
也就是定时器线程,队列,锁等数据。
scheduler的值是一个调度器。这个调度器可以有三种,默认的调度器,从INimbus中获取的调度器,以及是用户自定义的调度器。在这里取到的是默认的调度器。也就是定时器线程,队列,锁等数据。scheduler的值是一个调度器。这个调度器可以有三种,默认的调度器,从INimbus中获取的调度器,以及是用户自定义的调度器。在这里取到的是默认的调度器。
接下来,我们从nimbus的整个过程开始看,共分为四个部分:
1.nimbus的启动
2.Topology提交
3.任务调度
4.任务监控
先从nimbus启动开始讲:
Nimbus Daemon进程启动流程如下:
1、根据配置文件初始化Context数据;
2、与Zookeeper数据同步;
3、初始化RPC服务处理类ServiceHandler;
4、启动任务分配策略线程;
5、启动Task的Heartbeat监控线程;
6、启动RPC服务;
7、其他初始化工作。
Nimbus的详细启动逻辑如下:
@SuppressWarnings("rawtypes")
private void launchServer(Map conf) throws Exception {
LOG.info("Begin to start nimbus with conf " + conf);
//1.检查配置文件中是否配置为分布式模式
StormConfig.validate_distributed_mode(conf);
//2.注册主线程退出Hook现场清理(关闭线程+清理数据)
initShutdownHook();
//3.新建NimbusData数据,记录30s超时上传下载通道Channel/BufferFileInputStream
data = createNimbusData(conf);
//4.nimbus本地不存在的stormids数据如果在ZK上存在则删除,其中删除操作包括/zk/{assignments,tasks,storms}相关数据
NimbusUtils.cleanupCorruptTopologies(data);
//5.启动Topology分配策略
initTopologyAssign();
//6.初始化所有topology的状态为startup
initTopologyStatus();
//7.监控所有task的heartbeat,一旦发现taskid失去心跳将其置为needreassign 1次/10s
initMonitor(conf);
//8.启动cleaner线程,默认600s扫描一次,默认删除3600s没有读写过的jar文件
initCleaner(conf);
//9.初始化ServiceHandler
serviceHandler = new ServiceHandler(data);
//10.启动rpc server
initThrift(conf);
}