当前位置: 首页 > 工具软件 > stack-log > 使用案例 >

通过heat创建stack的代码流程分析heat stack-create

谭献
2023-12-01

Heat-api发送RPC请求
Heat/api/openstack/v1/stacks.py
    @util.policy_enforce                                                                                                                                                                               
    def create(self, req, body):                                                                                                                                                                       
        """Create a new stack."""                                                                                                                                                                      
        data = InstantiationData(body)                                                                                                                                                                 
                                                                                                                                                                                                       
        args = self.prepare_args(data)                                                                                                                                                                 
        result = self.rpc_client.create_stack(                                                                                                                                                         
            req.context,                                                                                                                                                                               
            data.stack_name(),                                                                                                                                                                         
            data.template(),                                                                                                                                                                           
            data.environment(),                                                                                                                                                                        
            data.files(),                                                                                                                                                                              
            args,                                                                                                                                                                                      
            environment_files=data.environment_files())                                                                                                                                                
                                                                                                                                                                                                       
        formatted_stack = stacks_view.format_stack(                                                                                                                                                    
            req,                                                                                                                                                                                       
            {rpc_api.STACK_ID: result}                                                                                                                                                                 
        )                                                                                                                                                                                              
        return {'stack': formatted_stack}

Heat/rpc/client.py
    def create_stack(self, ctxt, stack_name, template, params, files,                                                                                                                                  
                     args, environment_files=None):                                                                                                                                                    
        """Creates a new stack using the template provided.                                                                                                                                            
                                                                                                                                                                                                       
        Note that at this stage the template has already been fetched from the                                                                                                                         
        heat-api process if using a template-url.                                                                                                                                                      
                                                                                                                                                                                                       
        :param ctxt: RPC context.                                                                                                                                                                      
        :param stack_name: Name of the stack you want to create.                                                                                                                                       
        :param template: Template of stack you want to create.                                                                                                                                         
        :param params: Stack Input Params/Environment                                                                                                                                                  
        :param files: files referenced from the environment.                                                                                                                                           
        :param args: Request parameters/args passed from API                                                                                                                                           
        :param environment_files: optional ordered list of environment file                                                                                                                            
               names included in the files dict                                                                                                                                                        
        :type  environment_files: list or None                                                                                                                                                         
        """                                                                                                                                                                                            
        return self._create_stack(ctxt, stack_name, template, params, files,                                                                                                                           
                                  args, environment_files=environment_files) 

    def _create_stack(self, ctxt, stack_name, template, params, files,                                                                                                                                 
                      args, environment_files=None,                                                                                                                                                    
                      owner_id=None, nested_depth=0, user_creds_id=None,                                                                                                                               
                      stack_user_project_id=None, parent_resource_name=None):                                                                                                                          
        """Internal interface for engine-to-engine communication via RPC.                                                                                                                              
                                                                                                                                                                                                       
        Allows some additional options which should not be exposed to users via                                                                                                                        
        the API:                                                                                                                                                                                       
                                                                                                                                                                                                       
        :param owner_id: parent stack ID for nested stacks                                                                                                                                             
        :param nested_depth: nested depth for nested stacks                                                                                                                                            
        :param user_creds_id: user_creds record for nested stack
        :param stack_user_project_id: stack user project for nested stack
        :param parent_resource_name: the parent resource name
        """
        return self.call(
            ctxt, self.make_msg('create_stack', stack_name=stack_name,
                                template=template,
                                params=params, files=files,
                                environment_files=environment_files,
                                args=args, owner_id=owner_id,
                                nested_depth=nested_depth,
                                user_creds_id=user_creds_id,
                                stack_user_project_id=stack_user_project_id,
                                parent_resource_name=parent_resource_name),
            version='1.23')

最终RPC请求由heat-engine接收,真正有来创建stack的操作是由stack.create()来完成的。
Heat/engine/service.py
    def create_stack(self, cnxt, stack_name, template, params, files,
                     args, environment_files=None,
                     owner_id=None, nested_depth=0, user_creds_id=None,
                     stack_user_project_id=None, parent_resource_name=None):
        """Create a new stack using the template provided.

        Note that at this stage the template has already been fetched from the
        heat-api process if using a template-url.

        :param cnxt: RPC context.
        :param stack_name: Name of the stack you want to create.
        :param template: Template of stack you want to create.
        :param params: Stack Input Params
        :param files: Files referenced from the template
        :param args: Request parameters/args passed from API
        :param environment_files: optional ordered list of environment file
               names included in the files dict
        :type  environment_files: list or None
        :param owner_id: parent stack ID for nested stacks, only expected when
                         called from another heat-engine (not a user option)
        :param nested_depth: the nested depth for nested stacks, only expected
                         when called from another heat-engine
        :param user_creds_id: the parent user_creds record for nested stacks
        :param stack_user_project_id: the parent stack_user_project_id for
                         nested stacks
        :param parent_resource_name: the parent resource name
        """
        LOG.info(_LI('Creating stack %s'), stack_name)

	      # 创建一个新的porject,之所以会新建一个独立的project,
	   #而不是直接用发起创建stack操作的project
	   #应该是从权限控制角度考虑,通过新的project只能访问相关software_config资源.
	   #通过分析stack.create_stack_user_project_id,其实际上是就是调用了keystone 
	   #client的相关接口创建了一个新project.
        def _create_stack_user(stack):
            if not stack.stack_user_project_id:
                try:
                    stack.create_stack_user_project_id()
                except exception.AuthorizationFailure as ex:
                    stack.state_set(stack.action, stack.FAILED,
                                    six.text_type(ex))

        def _stack_create(stack):
            # Create/Adopt a stack, and create the periodic task if successful
            if stack.adopt_stack_data:
                stack.adopt()
            elif stack.status != stack.FAILED:
                                    # 真正做事情的地方,些处调用的是heat/engine/stack.py中的create函数。
                stack.create() 

            if (stack.action in (stack.CREATE, stack.ADOPT)
                    and stack.status == stack.COMPLETE):
                if self.stack_watch:
                    # Schedule a periodic watcher task for this stack
                    self.stack_watch.start_watch_task(stack.id, cnxt)
            else:
                LOG.info(_LI("Stack create failed, status %s"), stack.status)

        convergence = cfg.CONF.convergence_engine

	       # 对heat 模板进行验证。
        stack = self._parse_template_and_validate_stack(
            cnxt, stack_name, template, params, files, environment_files,
            args, owner_id, nested_depth, user_creds_id,
            stack_user_project_id, convergence, parent_resource_name)

        self.resource_enforcer.enforce_stack(stack)
	      #把stack的信息写入数据库。
        stack_id = stack.store()
        if cfg.CONF.reauthentication_auth_method == 'trusts':
            stack = parser.Stack.load(
                cnxt, stack_id=stack_id, use_stored_context=True)
	       # 创建一个新的user.
        _create_stack_user(stack)
        if convergence:
            action = stack.CREATE
            if stack.adopt_stack_data:
                action = stack.ADOPT
            stack.thread_group_mgr = self.thread_group_mgr
            stack.converge_stack(template=stack.t, action=action)
        else:
            self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
                                                  _stack_create, stack)

        return dict(stack.identifier())

接下来看一下stack.create() heat/engine/stack.py
注意其中的TaskRunner这个类及其回调函数self.stack_task。
在heat中,TaskRunner被广泛使用,用创建诸如create, update, suspend等的操作。
TaskRunner的构造参数中定义了,这个Task将执行的函数,对stack所涉及的资源执行的action,即创建。
最后调度这个Task运行,运行的过程是执行self.stack_task函数
    @profiler.trace('Stack.create', hide_args=False)
    @reset_state_on_error      
    def create(self):
        """Create the stack and all of the resources."""
        def rollback():
            if not self.disable_rollback and self.state == (self.CREATE, 
                                                            self.FAILED):                   
                self.delete(action=self.ROLLBACK)

        self._store_resources()

	      # 因为我们是调用heat stack-create,所以这里是self.stack_task;如果我们调用的是
	      # heat stack-update,那么此处应该是self.update_task.
	     # 对于stack的其它操作,如suspend,resume,snapshot等是self.stack_task.具体的可以
	     #看一下heat/engine/stack.py中的代码。
        creator = scheduler.TaskRunner( 
            self.stack_task, action=self.CREATE,
            reverse=False, post_func=rollback,
            error_wait_time=cfg.CONF.error_wait_time)

	       #此处将调用TaskRunner中的__call__()方法。在此__call__()方法中,self.stack_task
	       # 将会被执行。
        creator(timeout=self.timeout_secs())

TaskRunner这个类定义在了heat/engine/scheduler.py中。
Heat engine中比较重要的几个文件就是scheduler.py, service.py, stack.py, resource,py, stack_lock.py, update.py
	Scheduler.py中的两个类比较重要:
		TaskRunner: 对stack的任何操作, 最开始都是以TaskRunner来封装的。
		DependencyTaskGroup: 一个Task中可能会有许多步骤(step)来完,实现task的循环执行主要依靠这个类
			与wrappertask这个装饰器及配合yield来共同完成的。
	Service.py中主要定义了三个类:
		EngineService:
		EngineListenerService:
		ThreadGroupManager: 实现把对stack的操作放到子线程中执行。
		
这个stack_task有三个重要的点:
scheduler.wrappertask进行了装饰,意味这task需要处理subtask
scheduler.DependencyTaskGroup,构造该对象,即拓扑结构的图,该图上的每个节点都是一个资源,即资源上执行的task,即stack_task的子任务
yield action_task(),这是yield关键字的使用,有yield之后,函数就成为一个对象了,那么直接调用stack_task不会执行该函数,而是返回一个迭代器对象

下面来看stack_task(), 同时注意所用的装饰器。
   @scheduler.wrappertask
    def stack_task(self, action, reverse=False, post_func=None,
                   error_wait_time=None,
                   aggregate_exceptions=False, pre_completion_func=None):
        """A task to perform an action on the stack.

        All of the resources are traversed in forward or reverse dependency
        order.

        :param action: action that should be executed with stack resources
        :param reverse: define if action on the resources need to be executed
         in reverse order (resources - first and then res dependencies )
        :param post_func: function that need to be executed after
        action complete on the stack
        :param error_wait_time: time to wait before cancelling all execution
        threads when an error occurred
        :param aggregate_exceptions: define if exceptions should be aggregated
        :param pre_completion_func: function that need to be executed right
        before action completion. Uses stack ,action, status and reason as
        input parameters
        """
        LOG.debug("Jeffrey: stack.stack_task")
        try:
            lifecycle_plugin_utils.do_pre_ops(self.context, self,
                                              None, action)
        except Exception as e:
            self.state_set(action, self.FAILED, e.args[0] if e.args else
                           'Failed stack pre-ops: %s' % six.text_type(e))
            if callable(post_func):
                post_func()
            return
        self.state_set(action, self.IN_PROGRESS,
                       'Stack %s started' % action)

        stack_status = self.COMPLETE
        reason = 'Stack %s completed successfully' % action

        action_method = action.lower()
        # If a local _$action_kwargs function exists, call it to get the
        # action specific argument list, otherwise an empty arg list
        handle_kwargs = getattr(self,
                                '_%s_kwargs' % action_method,
                                lambda x: {})

        @functools.wraps(getattr(resource.Resource, action_method))
        def resource_action(r):
            # Find e.g resource.create and call it
            handle = getattr(r, action_method)
            LOG.debug("Jeffrey: stack.stack_task.resource_action: handle=%s" % handle)
		     #此处实际上就是调用了资源对应的handler函数,比如handle_create()
		    # 这些handle_Xxaction()函数在定义资源的时候已经实现,具体实例可以参看:
		    # heat/engine/resources/openstack/nova/server.py
            return handle(**handle_kwargs(r))

        action_task = scheduler.DependencyTaskGroup(
            self.dependencies,
            resource_action,
            reverse,
            error_wait_time=error_wait_time,
            aggregate_exceptions=aggregate_exceptions)

        try:
            LOG.debug("Jeffrey: stack.stack_task.resource_action: action_task 1")
                           # 调用了DependcyTaskGroup的__call__(),在__call__中又调用了回调函数
	                # resource_action
            yield action_task()
            LOG.debug("Jeffrey: stack.stack_task.resource_action: action_task 1")

继续看scheduler.py中的__call__()
    def __call__(self):
        """Return a co-routine which runs the task group."""
        LOG.debug("Jeffrey: DependencyTaskGroup.__call__ : self._ready()=%s" % self._ready())
        raised_exceptions = [] 
        while any(six.itervalues(self._runners)):
            try:
                for k, r in self._ready():      
                    LOG.debug("Jeffrey: DependencyTaskGroup.__call__.r=%s" % r)
                    r.start()  

上面的r.start调用的是scheduler.py中的start():
这个函数主要是执行传递进来的Task,result = self._task(*self._args, **self._kwargs)不一定是去执行这个函数,就因为
yield关键字的存在导致其成为一个Generator对象了,所以下 一步有个类型判断,对于GeneratorType意味着需要一步步执行,
调用step函数。若没有yield关键字,一切正常如以往。step函数则调用与yield配合的next函数,最后在run_to_completion函数中,
循环的调用next函数,直至yield抛出StopIteration异常.注意,此处的self._task实际上是在heat/engine/stack.py中定义的stack_task()

    def start(self, timeout=None):
        """Initialise the task and run its first step.

        If a timeout is specified, any attempt to step the task after that
        number of seconds has elapsed will result in a Timeout being
        raised inside the task.
        """
        assert self._runner is None, "Task already started"
        assert not self._done, "Task already cancelled"
        LOG.debug("Jeffrey: scheduler.TaskRunner.start")

        LOG.debug('%s starting' % six.text_type(self))

        if timeout is not None:
            self._timeout = Timeout(self, timeout)

        result = self._task(*self._args, **self._kwargs)
        if isinstance(result, types.GeneratorType):
            self._runner = result
            self.step()
        else:
            self._runner = False
            self._done = True
            LOG.debug('%s done (not resumable)' % six.text_type(self))

    def step(self):
        """Run another step of the task.

        Return True if the task is complete; False otherwise.
        """
        LOG.debug("Jeffrey: scheduler.TaskRunner.step: self.done()=%s" % self.done())
        if not self.done():
            assert self._runner is not None, "Task not started"

            if self._timeout is not None and self._timeout.expired():
                LOG.info(_LI('%s timed out'), six.text_type(self))
                self._done = True

                self._timeout.trigger(self._runner)
            else:
                LOG.debug('%s running' % six.text_type(self))

                try:
                    LOG.debug("Jeffrey: scheduler.TaskRunner.step: next(self._runner)=%s" % next(self._runner))
                    next(self._runner)
                except StopIteration:
                    self._done = True
                    LOG.debug('%s complete' % six.text_type(self))

        return self._done

    def run_to_completion(self, wait_time=1):
        """Run the task to completion.

        The task will sleep for `wait_time` seconds between steps. To avoid
        sleeping, pass `None` for `wait_time`.
        """
        while not self.step():
            self._sleep(wait_time)




 类似资料: