2021SC@SDUSC
代码分析以注释方式加在原文中
函数定义如下:
(defn nimbus-data [conf inimbus]
(let [forced-scheduler (.getForcedScheduler inimbus)]
{:conf conf
:inimbus inimbus
:submitted-count (atom 0) //表示当前已经提交的Topology的数目;
:storm-cluster-state (cluster/mk-storm-cluster-state conf) //定义了storm-cluster-state对象,该对象可用于将数据存储到ZooKeeper中以及从ZooKeeper读取数据;
:submit-lock (Object.) //声明一个锁对象;
:heartbeats-cache (atom {})
:downloaders (file-cache-map conf) //声明缓存;
:uploaders (file-cache-map conf)
:uptime (uptime-computer) //当前启动时间;
:validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR)) //检测验证Topology
:timer (mk-timer :kill-fn (fn [t]
(log-error t"Error when processing event")
(halt-process!20 "Error when processing an event")
)) //计时器;
:scheduler (mk-scheduler conf inimbus) //调度器
}))
注:当用户提交Topology时,系统会创建一个上传流并将其放入uploaders缓存中;当Supervisor从Nimbus下载Topology的jar时,系统会创建一个下载流并将其放入downloaders缓存中。任何一种操作完成时,其所对应的上传或下载流就会被关闭,且流所传递的内容也会被从缓存中移除。
TopologyAssign
直接看run函数
public void run() {
LOG.info("TopologyAssign thread has been started");
runFlag = true;
while (runFlag) {
TopologyAssignEvent event;
try {
event = queue.take(); // 从队列中获取事务信息;
} catch (InterruptedException e1) {
continue;
}
if (event == null) {
continue;
}
boolean isSuccess = doTopologyAssignment(event); //执行任务分配信息;
if (isSuccess == false) {
} else {
try {
cleanupDisappearedTopology(); //清除已消失任务;
} catch (Exception e) {
LOG.error("Failed to do cleanup disappear topology ", e);
continue;
}
}
}
}
doTopologyAssignment函数
protected boolean doTopologyAssignment(TopologyAssignEvent event) {
Assignment assignment;
try {
Assignment oldAssignment = null;
boolean isReassign = event.isScratch();
if (isReassign) {
oldAssignment = nimbusData.getStormClusterState().assignment_info(event.getTopologyId(), null);
}
assignment = mkAssignment(event); //准备工作;
TaskStartEvent taskEvent = new TaskStartEvent();
taskEvent.oldAssignment = oldAssignment;
taskEvent.newAssignment = assignment;
taskEvent.topologyId = event.getTopologyId();
taskEvent.clusterName = nimbusData.getClusterName();
taskEvent.timestamp = System.currentTimeMillis();
Map<Integer, String> task2Component;
// get from nimbus cache first
Map<Integer, TaskInfo> taskInfoMap = Cluster.get_all_taskInfo(nimbusData.getStormClusterState(), event.getTopologyId());
if (taskInfoMap != null) {
task2Component = Common.getTaskToComponent(taskInfoMap);
} else {
task2Component = Common.getTaskToComponent(Cluster.get_all_taskInfo(nimbusData.getStormClusterState(), event.getTopologyId()));
} //分配任务(确定worker运行到那台机器和端口,确认 task运行到哪个worker)
taskEvent.task2Component = task2Component;
nimbusData.getMetricRunnable().pushEvent(taskEvent);
if (!isReassign) {
setTopologyStatus(event);
}
} catch (Throwable e) {
LOG.error("Failed to assign topology " + event.getTopologyId(), e);
event.fail(e.getMessage());
return false;
}
if (assignment != null)
backupAssignment(assignment, event); //把分配的结果写入到zookeeper上
event.done();
return true;
}
prepareTopologyAssign函数
protected TopologyAssignContext prepareTopologyAssign(TopologyAssignEvent event) throws Exception {
//一个任务的上下文场景,下面工作主要是为了填充这个对象的
TopologyAssignContext ret = new TopologyAssignContext();
String topologyId = event.getTopologyId();
ret.setTopologyId(topologyId); // topologyId
int topoMasterId = nimbusData.getTasksHeartbeat().get(topologyId).get_topologyMasterId();
ret.setTopologyMasterTaskId(topoMasterId); //masterId
LOG.info("prepareTopologyAssign, topoMasterId={}", topoMasterId);
//一个任务的配置有nimbus 配置以及topology配置组成的(这个写到nimbus的nimbus/stormdist/topologyId/stormconf.ser文件里面的,进行序列化的)
Map<Object, Object> nimbusConf = nimbusData.getConf();
Map<Object, Object> topologyConf = StormConfig.read_nimbus_topology_conf(nimbusConf, topologyId);
//topology code(这个写到nimbus的nimbus/stormdist/topologyId/stormcode.ser文件里面的,进行序列化的)
StormTopology rawTopology = StormConfig.read_nimbus_topology_code(nimbusConf, topologyId);
ret.setRawTopology(rawTopology);
Map stormConf = new HashMap();
stormConf.putAll(nimbusConf);
stormConf.putAll(topologyConf);
ret.setStormConf(stormConf);
StormClusterState stormClusterState = nimbusData.getStormClusterState();
// get all running supervisor, don't need callback to watch supervisor
Map<String, SupervisorInfo> supInfos = Cluster.get_all_SupervisorInfo(stormClusterState, null);
// init all AvailableWorkerPorts
//所有的可用work就是全部的work
for (Entry<String, SupervisorInfo> supInfo : supInfos.entrySet()) {
SupervisorInfo supervisor = supInfo.getValue();
if (supervisor != null)
supervisor.setAvailableWorkerPorts(supervisor.getWorkerPorts());
}
//获取所有活着的sp
getAliveSupervsByHb(supInfos, nimbusConf);
if (supInfos.size() == 0) {
throw new FailedAssignTopologyException("Failed to make assignment " + topologyId + ", due to no alive supervisor");
}
//taskId --> componentId对应关系
//从zookeeper上获取的,那么这个信息是谁写入的呢?
Map<Integer, String> taskToComponent = Cluster.get_all_task_component(stormClusterState, topologyId, null);
ret.setTaskToComponent(taskToComponent);
// get taskids /ZK/tasks/topologyId
// 通过map 的keyset获取所有的taskIds
Set<Integer> allTaskIds = taskToComponent.keySet();
if (allTaskIds == null || allTaskIds.size() == 0) {
String errMsg = "Failed to get all task ID list from /ZK-dir/tasks/" + topologyId;
LOG.warn(errMsg);
throw new IOException(errMsg);
}
ret.setAllTaskIds(allTaskIds);
Set<Integer> aliveTasks = new HashSet<Integer>();
// unstoppedTasks are tasks which are alive on no supervisor's(dead)
// machine
Set<Integer> unstoppedTasks = new HashSet<Integer>();
Set<Integer> deadTasks = new HashSet<Integer>();
Set<ResourceWorkerSlot> unstoppedWorkers = new HashSet<ResourceWorkerSlot>();
//通过 zk/assignments 目录获取assign信息
Assignment existingAssignment = stormClusterState.assignment_info(topologyId, null);
//如果老任务存在
if (existingAssignment != null) {
aliveTasks = getAliveTasks(topologyId, allTaskIds); //根据心跳的状态
/*
* Check if the topology master task is alive first since all task
* heartbeat info is reported by topology master.
* If master is dead, do reassignment for topology master first.
*/
//首先看topology master task 是否死了,如果死的,那么需要重启下,否则信息都没法传送了
if (aliveTasks.contains(topoMasterId) == false) {
ResourceWorkerSlot worker = existingAssignment.getWorkerByTaskId(topoMasterId);
deadTasks.addAll(worker.getTasks());
//1 所有task里面删除已经死的, 2 所有活着里面加上1, 3 所有活着的排除已经死的
Set<Integer> tempSet = new HashSet<Integer>(allTaskIds);
tempSet.removeAll(deadTasks);
aliveTasks.addAll(tempSet);
aliveTasks.removeAll(deadTasks);
} else { // mastId没有死的话,那么就直接计算死task
deadTasks.addAll(allTaskIds);
deadTasks.removeAll(aliveTasks);
}
//获取活着的task
unstoppedTasks = getUnstoppedSlots(aliveTasks, supInfos, existingAssignment);
}
ret.setDeadTaskIds(deadTasks);
ret.setUnstoppedTaskIds(unstoppedTasks);
// Step 2: get all slots resource, free slots/ alive slots/ unstopped
// slots
getFreeSlots(supInfos, stormClusterState);
ret.setCluster(supInfos);
if (existingAssignment == null) {
ret.setAssignType(TopologyAssignContext.ASSIGN_TYPE_NEW);
try {
AssignmentBak lastAssignment = stormClusterState.assignment_bak(event.getTopologyName());
if (lastAssignment != null) {
ret.setOldAssignment(lastAssignment.getAssignment());
}
} catch (Exception e) {
LOG.warn("Fail to get old assignment", e);
}
} else {
ret.setOldAssignment(existingAssignment);
if (event.isScratch()) {
ret.setAssignType(TopologyAssignContext.ASSIGN_TYPE_REBALANCE);
ret.setIsReassign(event.isReassign());
unstoppedWorkers = getUnstoppedWorkers(unstoppedTasks, existingAssignment);
ret.setUnstoppedWorkers(unstoppedWorkers);
} else {
ret.setAssignType(TopologyAssignContext.ASSIGN_TYPE_MONITOR);
unstoppedWorkers = getUnstoppedWorkers(aliveTasks, existingAssignment);
ret.setUnstoppedWorkers(unstoppedWorkers);
}
}
return ret;
}
默认情况下,采用DefaultTopologyScheduler进行调度 ,具体实现在assignTasks函数中,核心代码就三行:
List<ResourceWorkerSlot> availableWorkers = WorkerScheduler.getInstance().getAvailableWorkers(defaultContext, needAssignTasks, allocWorkerNum);
TaskScheduler taskScheduler = new TaskScheduler(defaultContext, needAssignTasks, availableWorkers);
Set<ResourceWorkerSlot> assignment = new HashSet<ResourceWorkerSlot>(taskScheduler.assign());
work的分配比较简单,就是所有机器尽量的均衡,具体过程:
1 根据总的work需求量和机器数,计算每台机器需要运行的work 数量
2 对于每台机器,所有少于平均,那么就在机器上分配work直到达到平均个数
3 对于剩余未分配的work,继续在这些有空闲work的机器上一个个分配