当前位置: 首页 > 工具软件 > Nimbus > 使用案例 >

Nimbus

贺英悟
2023-12-01

2021SC@SDUSC

代码分析以注释方式加在原文中

nimbus-data

函数定义如下:

(defn nimbus-data [conf inimbus]

 (let [forced-scheduler (.getForcedScheduler inimbus)]

   {:conf conf

    :inimbus inimbus

    :submitted-count (atom 0)  //表示当前已经提交的Topology的数目;

    :storm-cluster-state (cluster/mk-storm-cluster-state conf)  //定义了storm-cluster-state对象,该对象可用于将数据存储到ZooKeeper中以及从ZooKeeper读取数据;

    :submit-lock (Object.) //声明一个锁对象;

    :heartbeats-cache (atom {})

    :downloaders (file-cache-map conf)  //声明缓存;

    :uploaders (file-cache-map conf)

    :uptime (uptime-computer)  //当前启动时间;

    :validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))  //检测验证Topology

    :timer (mk-timer :kill-fn (fn [t]

                                 (log-error t"Error when processing event")

                                 (halt-process!20 "Error when processing an event")

                                 ))    //计时器;

    :scheduler (mk-scheduler conf inimbus)  //调度器

    }))
注:当用户提交Topology时,系统会创建一个上传流并将其放入uploaders缓存中;当Supervisor从Nimbus下载Topology的jar时,系统会创建一个下载流并将其放入downloaders缓存中。任何一种操作完成时,其所对应的上传或下载流就会被关闭,且流所传递的内容也会被从缓存中移除。

TopologyAssign

直接看run函数

 public void run() {
        LOG.info("TopologyAssign thread has been started");
        runFlag = true;

        while (runFlag) {
            TopologyAssignEvent event;
            try {
                event = queue.take();   // 从队列中获取事务信息;
            } catch (InterruptedException e1) {
                continue;
            }
            if (event == null) {
                continue;
            }

            boolean isSuccess = doTopologyAssignment(event);  //执行任务分配信息

            if (isSuccess == false) {
            } else {
                try {
                    cleanupDisappearedTopology();   //清除已消失任务;
                } catch (Exception e) {
                    LOG.error("Failed to do cleanup disappear topology ", e);
                    continue;
                }


            }
        }

    }

doTopologyAssignment函数

    protected boolean doTopologyAssignment(TopologyAssignEvent event) {
        Assignment assignment;
        try {
            Assignment oldAssignment = null;
            boolean isReassign = event.isScratch();
            if (isReassign) {
                oldAssignment = nimbusData.getStormClusterState().assignment_info(event.getTopologyId(), null);
            }
            assignment = mkAssignment(event);  //准备工作;

 
            TaskStartEvent taskEvent = new TaskStartEvent();
            taskEvent.oldAssignment = oldAssignment;
            taskEvent.newAssignment = assignment;
            taskEvent.topologyId = event.getTopologyId();
            taskEvent.clusterName = nimbusData.getClusterName();
            taskEvent.timestamp = System.currentTimeMillis();

            Map<Integer, String> task2Component;
            // get from nimbus cache first
            Map<Integer, TaskInfo> taskInfoMap = Cluster.get_all_taskInfo(nimbusData.getStormClusterState(), event.getTopologyId());
            if (taskInfoMap != null) {
                task2Component = Common.getTaskToComponent(taskInfoMap);
            } else {
                task2Component = Common.getTaskToComponent(Cluster.get_all_taskInfo(nimbusData.getStormClusterState(), event.getTopologyId()));
            }  //分配任务(确定worker运行到那台机器和端口,确认 task运行到哪个worker)
            taskEvent.task2Component = task2Component;
            nimbusData.getMetricRunnable().pushEvent(taskEvent);

            if (!isReassign) {
                setTopologyStatus(event);
            }
        } catch (Throwable e) {
            LOG.error("Failed to assign topology " + event.getTopologyId(), e);
            event.fail(e.getMessage());
            return false;
        }

        if (assignment != null)
            backupAssignment(assignment, event);  //把分配的结果写入到zookeeper上
        event.done();
        return true;
    }

prepareTopologyAssign函数

  protected TopologyAssignContext prepareTopologyAssign(TopologyAssignEvent event) throws Exception {
        //一个任务的上下文场景,下面工作主要是为了填充这个对象的
        TopologyAssignContext ret = new TopologyAssignContext();

        String topologyId = event.getTopologyId();
        ret.setTopologyId(topologyId);     // topologyId

        int topoMasterId = nimbusData.getTasksHeartbeat().get(topologyId).get_topologyMasterId();
        ret.setTopologyMasterTaskId(topoMasterId);   //masterId
        LOG.info("prepareTopologyAssign, topoMasterId={}", topoMasterId);

        //一个任务的配置有nimbus 配置以及topology配置组成的(这个写到nimbus的nimbus/stormdist/topologyId/stormconf.ser文件里面的,进行序列化的)
        Map<Object, Object> nimbusConf = nimbusData.getConf();
        Map<Object, Object> topologyConf = StormConfig.read_nimbus_topology_conf(nimbusConf, topologyId);
        //topology code(这个写到nimbus的nimbus/stormdist/topologyId/stormcode.ser文件里面的,进行序列化的)
        StormTopology rawTopology = StormConfig.read_nimbus_topology_code(nimbusConf, topologyId);
        ret.setRawTopology(rawTopology);

        Map stormConf = new HashMap();
        stormConf.putAll(nimbusConf);
        stormConf.putAll(topologyConf);
        ret.setStormConf(stormConf);

        StormClusterState stormClusterState = nimbusData.getStormClusterState();

        // get all running supervisor, don't need callback to watch supervisor
        Map<String, SupervisorInfo> supInfos = Cluster.get_all_SupervisorInfo(stormClusterState, null);
        // init all AvailableWorkerPorts
        //所有的可用work就是全部的work
        for (Entry<String, SupervisorInfo> supInfo : supInfos.entrySet()) {
            SupervisorInfo supervisor = supInfo.getValue();
            if (supervisor != null)
                supervisor.setAvailableWorkerPorts(supervisor.getWorkerPorts());
        }
        //获取所有活着的sp
        getAliveSupervsByHb(supInfos, nimbusConf);
        if (supInfos.size() == 0) {
            throw new FailedAssignTopologyException("Failed to make assignment " + topologyId + ", due to no alive supervisor");
        }
        //taskId --> componentId对应关系
        //从zookeeper上获取的,那么这个信息是谁写入的呢?
        Map<Integer, String> taskToComponent = Cluster.get_all_task_component(stormClusterState, topologyId, null);
        ret.setTaskToComponent(taskToComponent);

        // get taskids /ZK/tasks/topologyId
        // 通过map 的keyset获取所有的taskIds
        Set<Integer> allTaskIds = taskToComponent.keySet();
        if (allTaskIds == null || allTaskIds.size() == 0) {
            String errMsg = "Failed to get all task ID list from /ZK-dir/tasks/" + topologyId;
            LOG.warn(errMsg);
            throw new IOException(errMsg);
        }
        ret.setAllTaskIds(allTaskIds);


        Set<Integer> aliveTasks = new HashSet<Integer>();
        // unstoppedTasks are tasks which are alive on no supervisor's(dead)
        // machine
        Set<Integer> unstoppedTasks = new HashSet<Integer>();
        Set<Integer> deadTasks = new HashSet<Integer>();
        Set<ResourceWorkerSlot> unstoppedWorkers = new HashSet<ResourceWorkerSlot>();

        //通过 zk/assignments 目录获取assign信息
        Assignment existingAssignment = stormClusterState.assignment_info(topologyId, null);
        //如果老任务存在
        if (existingAssignment != null) {
            aliveTasks = getAliveTasks(topologyId, allTaskIds);   //根据心跳的状态

            /*
             * Check if the topology master task is alive first since all task 
             * heartbeat info is reported by topology master. 
             * If master is dead, do reassignment for topology master first.
             */
            //首先看topology master task 是否死了,如果死的,那么需要重启下,否则信息都没法传送了
            if (aliveTasks.contains(topoMasterId) == false) {
                ResourceWorkerSlot worker = existingAssignment.getWorkerByTaskId(topoMasterId);
                deadTasks.addAll(worker.getTasks());

                //1 所有task里面删除已经死的, 2 所有活着里面加上1,  3 所有活着的排除已经死的
                Set<Integer> tempSet = new HashSet<Integer>(allTaskIds);
                tempSet.removeAll(deadTasks);
                aliveTasks.addAll(tempSet);
                aliveTasks.removeAll(deadTasks);
            } else {  // mastId没有死的话,那么就直接计算死task
                deadTasks.addAll(allTaskIds);
                deadTasks.removeAll(aliveTasks);
            }
            //获取活着的task
            unstoppedTasks = getUnstoppedSlots(aliveTasks, supInfos, existingAssignment);
        }

        ret.setDeadTaskIds(deadTasks);
        ret.setUnstoppedTaskIds(unstoppedTasks);

        // Step 2: get all slots resource, free slots/ alive slots/ unstopped
        // slots
        getFreeSlots(supInfos, stormClusterState);
        ret.setCluster(supInfos);

        if (existingAssignment == null) {
            ret.setAssignType(TopologyAssignContext.ASSIGN_TYPE_NEW);

            try {
                AssignmentBak lastAssignment = stormClusterState.assignment_bak(event.getTopologyName());
                if (lastAssignment != null) {
                    ret.setOldAssignment(lastAssignment.getAssignment());
                }
            } catch (Exception e) {
                LOG.warn("Fail to get old assignment", e);
            }
        } else {
            ret.setOldAssignment(existingAssignment);
            if (event.isScratch()) {
                ret.setAssignType(TopologyAssignContext.ASSIGN_TYPE_REBALANCE);
                ret.setIsReassign(event.isReassign());
                unstoppedWorkers = getUnstoppedWorkers(unstoppedTasks, existingAssignment);
                ret.setUnstoppedWorkers(unstoppedWorkers);
            } else {
                ret.setAssignType(TopologyAssignContext.ASSIGN_TYPE_MONITOR);
                unstoppedWorkers = getUnstoppedWorkers(aliveTasks, existingAssignment);
                ret.setUnstoppedWorkers(unstoppedWorkers);
            }
        }
        return ret;
    }

分配worker和task

默认情况下,采用DefaultTopologyScheduler进行调度 ,具体实现在assignTasks函数中,核心代码就三行:

      List<ResourceWorkerSlot> availableWorkers = WorkerScheduler.getInstance().getAvailableWorkers(defaultContext, needAssignTasks, allocWorkerNum);
        TaskScheduler taskScheduler = new TaskScheduler(defaultContext, needAssignTasks, availableWorkers);
        Set<ResourceWorkerSlot> assignment = new HashSet<ResourceWorkerSlot>(taskScheduler.assign());

work的分配比较简单,就是所有机器尽量的均衡,具体过程:
1 根据总的work需求量和机器数,计算每台机器需要运行的work 数量
2 对于每台机器,所有少于平均,那么就在机器上分配work直到达到平均个数
3 对于剩余未分配的work,继续在这些有空闲work的机器上一个个分配
 

 类似资料: