当前位置: 首页 > 工具软件 > py-amqp > 使用案例 >

OpenStack建立实例完整过程源码详细分析(12)----依据AMQP通信架构实现消息发送机制解析之一

罗新
2023-12-01

感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!
如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn


    这篇博文我将继续分析OpenStack建立实例完整过程的源码。接下来将会涉及到依据AMQP协议进行消息传递的实现过程,这里我将依据具体的源码,用大约4篇左右博文的篇幅,对依据AMQP协议进行消息传递的发送和接收机制和其具体实现过程进行一个全面细致的分析。这里我们先来分析依据AMQP协议进行消息发送的实现机制。

    nova中的每个组件都会连接消息服务器,一个组件可能是一个消息发送者(如API、Scheduler),也可能是一个消息接收者(如compute、volume、network)。发送消息主要有两种方式:同步调用rpc.call和异步调用rpc.cast。还有其他的消息发送方式,比如notify等。首先来回顾一下AMQP通信架构中的几个基本且重要的概念。

1.AMQP通信架构中的重要概念

1.1 交换器:
    接受消息并且将消息转发给队列。在每个主机的内部,交换器有唯一对应的名字。交换器可以是持久的,临时的或者自动删除的。持久的交换器会一直存在于Server端直到他被显示的删除;临时交换器在服务器关闭时停 止工作;自动删除的交换器在没有应用程序使用它的时候被服务器删除。
    主要有三种类型的交换器:
1.1.1 广播式交换器类型(fanout)
    该类交换器不分析所接收到消息中的Routing Key,默认将消息转发到所有与该交换器绑定的队列中去。广播交换器是最简单的一种类型,就像我们从字面上理解到的一样,它把所有接受到的消息广播到所有它所知道的队列中去,不论消息的关键字是什么,消息都会被路由到和该交换器绑定的队列中去。
1.1.2 直接式交换器类型(direct)
    该类交换器需要精确匹配Routing Key与BindingKey,如消息的Routing Key = Cloud,那么该条消息只能被转发至Binding Key = Cloud的消息队列中去。直接式交换器的转发效率较高,安全性较好,但是缺乏灵活性,系统配置量较大。
1.1.3 主题式交换器(Topic Exchange)
    该类交换器通过消息的Routing Key与Binding Key的模式匹配,将消息转发至所有符合绑定规则的队列中。Binding Key支持通配符,其中“*”匹配一个词组,“#”匹配多个词组(包括零个)。例如,Binding Key=“*.Cloud.#”可转发Routing Key=“OpenStack.Cloud.GD.GZ”、“OpenStack.Cloud.Beijing”以及“OpenStack.Cloud” 的消息,但是对于Routing Key=“Cloud.GZ”的消息是无法匹配的。

1.2 队列:
    “消息队列”,它是一个具名缓冲区,它代表一组消费者应用程序保存消息。这些应用程序在它们的权限范围内可以创建、使用、共享消息队列。类似于交换器,消息队列也可以是持久的,临时的或者自动删除的。临时消息队列在服务器被关闭时停止工作;自动删除队列在没有应用程序使用它的时候被服务器自动删除。消息队列将消息保存在内存、硬盘或两者的组合之中。
1.3 绑定:
    可以理解为交换器和消息队列之间的一种关系,绑定之后交换器会知道应该把消息发给那个队列,绑定的关键字称为binding_key。
    Exchange和Queue的绑定可以是多对多的关系,每个发送给Exchange的消息都会有一个叫做routing_key的关键字,交换器要想把 消息发送给某个特定的队列,那么该队列与交换器的binding_key必须和消息的routing_key相匹配才OK。

2.kombu producer代码示例

kombu中消息发送机制的简单实现如示例所示:

    #!/usr/bin/python  
      
    from kombu.entity import Exchange  
    from kombu.messaging import Producer  
    from kombu.connection import Connection  
      
    connection = Connection('amqp://guest:bupt@172.16.4.1:5672//')  
    channel = connection.channel()  
      
    media_exchange = Exchange('media', 'direct', channel)  
    producer = Producer(channel, exchange=media_exchange, routing_key='video')  
      
    producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013})  

    思路很简单:

    (1)创建连接;

    (2)获得channel;

    (3)创建exchange;

    (4)创建Producer,指定exchange与routing_key;

    (5)发送消息;

    在nova中,当然不能实现的这么简便,而是进行了一系列的封装操作,但是基本的实现步骤是一致的。

3.源码分析

    接续上篇博文,我们来继续分析方法_create_instance的实现,先来回顾方法_create_instance的代码实现:

    def _create_instance(self, context, instance_type,
               image_href, kernel_id, ramdisk_id,
               min_count, max_count,
               display_name, display_description,
               key_name, key_data, security_group,
               availability_zone, user_data, metadata,
               injected_files, admin_password,
               access_ip_v4, access_ip_v6,
               requested_networks, config_drive,
               block_device_mapping, auto_disk_config,
               reservation_id=None, scheduler_hints=None):
        """
        验证所有的输入实例参数;
        发送要运行实例('run_instance')的请求消息到远程调度器;
        """

        # generate_uid:随机生成一个uid值赋值给reservation_id;
        if reservation_id is None:
            reservation_id = utils.generate_uid('r')
        
        # _validate_and_provision_instance:验证所有的输入参数;
        # 返回要建立实例的各类信息;
        # 这个方法中做了很多事,稍后会好好总结;
        (instances, request_spec, filter_properties) = \
                self._validate_and_provision_instance(context, instance_type,
                        image_href, kernel_id, ramdisk_id, min_count,
                        max_count, display_name, display_description,
                        key_name, key_data, security_group, availability_zone,
                        user_data, metadata, injected_files, access_ip_v4,
                        access_ip_v6, requested_networks, config_drive,
                        block_device_mapping, auto_disk_config,
                        reservation_id, scheduler_hints)

        # 循环获取instances中每个实例action的一些相关信息(包括启动时间等);
        # _record_action_start获取要启动的实例action的一些相关信息(包括启动时间等);
        for instance in instances:
            self._record_action_start(context, instance, instance_actions.CREATE)

        # run_instance:实现了发送要运行实例('run_instance')的请求消息到远程节点;
        # 远程节点会在队列中提取这条消息,然后调用相应资源,实现运行实例的这个请求;
        # 这个方法只是实现了请求信息的发送;
        self.scheduler_rpcapi.run_instance(context,
                request_spec=request_spec,
                admin_password=admin_password, injected_files=injected_files,
                requested_networks=requested_networks, is_first_time=True,
                filter_properties=filter_properties)

        return (instances, reservation_id)

    我们来分析下面这段代码的实现:

self.scheduler_rpcapi.run_instance(context,
        request_spec=request_spec,
        admin_password=admin_password, injected_files=injected_files,
        requested_networks=requested_networks, is_first_time=True,
        filter_properties=filter_properties)

return (instances, reservation_id)

    首先来看scheduler_rpcapi,这里完成的主要是类 RpcProxy的初始化,获取变量topic的值;

self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()

class SchedulerAPI(nova.openstack.common.rpc.proxy.RpcProxy):
    BASE_RPC_API_VERSION = '2.0'
    def __init__(self):
        # scheduler_topic:这个参数定义了主题模式的节点调度;
        # 参数的默认值为'scheduler';
        super(SchedulerAPI, self).__init__(topic=CONF.scheduler_topic,
                default_version=self.BASE_RPC_API_VERSION)

class RpcProxy(object):
    """
    RPC客户端的辅助类;
    """

    def __init__(self, topic, default_version):
        """
        初始化RpcProxy;
        """
        self.topic = topic
        self.default_version = default_version
        super(RpcProxy, self).__init__()
    再来看方法run_instance的实现过程,这个方法在这里主要实现了发送消息到指定队列。之后会由服务启动时就已经启动的消息接受线程按照一定的匹配规则从队列中获取消息,再进行消息的执行,这一部分在后面我将会进行详细的解析,这里主要关注消息发送到队列的实现机制。

def run_instance(self, ctxt, request_spec, admin_password,
            injected_files, requested_networks, is_first_time,
            filter_properties):
    # 实现了发送要运行实例('run_instance')的请求消息到指定队列;
    # make_msg:封装所要运行实例的这个请求的所有信息到make_msg;
    # 调用这个远程方法cast,但是不返回任何信息;
    # 发送一个主题的消息make_msg,不用等待任何信息的返回;
    #注:远程节点会在队列中提取这条消息,然后调用相应资源,实现运行实例的这个请求,这部分代码只是实现了请求信息的发送;
    return self.cast(ctxt, self.make_msg('run_instance',
            request_spec=request_spec, admin_password=admin_password,
            injected_files=injected_files,
            requested_networks=requested_networks,
            is_first_time=is_first_time,
            filter_properties=filter_properties))

    我们来看这个方法,可见就是调用方法cast实现对打包的消息进行远程发送,前面说过,同步调用rpc.call和异步调用rpc.cast是_get_impl()实现了具体实现的定向,而系统默认的实现是impl_kombu.py,所以这里的方法_get_impl()实现了定向到/nova/openstack/common/rpc/impl_kombu.py,具体过程就不进行分析了,来看看具体代码就可以了。

def _get_impl():
    """
    延迟导入rpc_backend,知道配置被加载为止;
    导入配置参数CONF.rpc_backend指定的模块;
    默认的话就是导入模块nova.openstack.common.rpc.impl_kombu;
    """
          
    # _RPCIMPL全局变量;
    global _RPCIMPL
    if _RPCIMPL is None:
        try:
            # rpc_backend:这个参数定义了所使用的信息模块,默认是kombu;
            # 这个参数默认的值是'%s.impl_kombu' % __package__;
            # 对应着/nova/openstack/common/rpc/impl_kombu.py;
            # import_module:导入模块CONF.rpc_backend;
            # 默认的话就是导入模块nova.openstack.common.rpc.impl_kombu;
            _RPCIMPL = importutils.import_module(CONF.rpc_backend)
        except ImportError:
            # For backwards compatibility with older oslo.config.
            impl = CONF.rpc_backend.replace('nova.rpc',
                                            'nova.openstack.common.rpc')
            _RPCIMPL = importutils.import_module(impl)
    return _RPCIMPL

    继续来看代码def cast(conf, context, topic, msg):

def cast(conf, context, topic, msg):
    """
    发送一个topic主题的消息,不用等待任何信息的返回;
    """
           
    # cast:发送一个topic主题的消息,不用等待任何信息的返回;
    return rpc_amqp.cast(
        conf, context, topic, msg,       
        # get_connection_pool:获取到RabbitMQ的连接池,并返回这个连接池的对象;
        # Connection:连接到RabbitMQ的实现的初始化;
        rpc_amqp.get_connection_pool(conf, Connection))

    首先我们对方法get_connection_pool和类Connection进行解析,其中方法get_connection_pool实现了从到RabbitMQ的连接池中获取一个连接,具体来看代码:

def get_connection_pool(conf, connection_cls):
    """
    获取到RabbitMQ的连接池,并返回这个连接池的一个连接;
    """
    with _pool_create_sem:
        # 首先要确定只有一个线程试图建立这个连接池;
        
        # 如果connection_cls.pool为空;
        # 则根据conf参数信息生成一个连接池;
        # Pool:实现了一个连接池的类;
        if not connection_cls.pool:
            connection_cls.pool = Pool(conf, connection_cls)
    return connection_cls.pool
    方法get_connection_pool中的connection_cls传进来的就是类Connection的实例化对象,而类Connection就是实现了到RabbitMQ的连接。

class Connection(object):
    """
    连接到RabbitMQ的实现类;
    """

    pool = None

    def __init__(self, conf, server_params=None):
        """
        连接到RabbitMQ的实现的初始化;
        """
        
        #看看初始化方法都有做了什么;
        #设置必要的连接属性信息;
        #循环从配置文件获取adr地址,解析出host:port配对形式的字符串,每一对地址对应一个params字典,字典保存了一些连接相关的参数信息;
        #重新连接并重新建立队列;
        self.consumers = []
        self.consumer_thread = None
        self.proxy_callbacks = []
        self.conf = conf
        # 试图连接到RabbitMQ的最大重试次数,默认为0,但是代表着重复无限次;
        self.max_retries = self.conf.rabbit_max_retries
        # Try forever?
        if self.max_retries <= 0:
            self.max_retries = None
        # 连接到RabbitMQ的重试间隔时间;
        self.interval_start = self.conf.rabbit_retry_interval
        # 连接到RabbitMQ的回退时间间隔;
        self.interval_stepping = self.conf.rabbit_retry_backoff
        # max retry-interval = 30 seconds
        self.interval_max = 30 # 连接到RabbitMQ重试的最大时间间隔;
        self.memory_transport = False

        if server_params is None:
            server_params = {}
        # Keys to translate from server_params to kombu params
        server_params_to_kombu_params = {'username': 'userid'}

        # 获取那些应该用于连接的ssl参数(如果有的话);
        ssl_params = self._fetch_ssl_params()
        params_list = []
        
        # rabbit_hosts:这个列表参数定义了RabbitMQ HA集群主机:端口对,默认为['$rabbit_host:$rabbit_port'];
        # 循环读取rabbit_hosts列表参数中的地址元素,进行解析;
        for adr in self.conf.rabbit_hosts:
            # 把adr和default_port解析成host:port配对形式的字符串,分别赋值给hostname和port;
            hostname, port = network_utils.parse_host_port(adr, default_port=self.conf.rabbit_port)

            # 生成相关信息组成的字典;
            # 包括主机名、端口、RabbitMQ的userid、RabbitMQ密码以及RabbitMQ虚拟主机的路径等;
            params = {
                'hostname': hostname,
                'port': port,
                # 参数rabbit_userid定义了RabbitMQ的userid;
                # 参数的默认值为'guest';
                'userid': self.conf.rabbit_userid,
                # 参数rabbit_password定义了RabbitMQ密码;
                # 参数的默认值为'guest';
                'password': self.conf.rabbit_password,
                # 参数rabbit_virtual_host定义了RabbitMQ虚拟主机的路径;
                # 参数的默认值为“/”;
                'virtual_host': self.conf.rabbit_virtual_host,
            }

            for sp_key, value in server_params.iteritems():
                p_key = server_params_to_kombu_params.get(sp_key, sp_key)
                params[p_key] = value

            if self.conf.fake_rabbit:
                params['transport'] = 'memory'
            if self.conf.rabbit_use_ssl:
                params['ssl'] = ssl_params

            # 更新存储链接相关信息的参数列表;
            # params_list是列表,其中元素params是字典,保存了一些连接相关的参数信息;
            # 每一个主机:地址对就对应着一个params字典;
            params_list.append(params)

        self.params_list = params_list # 存储链接相关信息的参数列表;
        #注:每一个主机:地址对就对应着列表中的一个元素,每一个元素是一个params字典;

        self.memory_transport = self.conf.fake_rabbit

        self.connection = None
        
        # 重新连接并重新建立队列;
        # 直到连接成功或者到达重试连接最大次数为止;
        self.reconnect()
    我们再回到方法def cast(conf, context, topic, msg),在完成了获取到RabbitMQ的连接之后,我们来继续看方法rpc_amqp.cast:

def cast(conf, context, topic, msg, connection_pool):
    """
    发送一个topic主题的消息,不用等待任何信息的返回;
    """
    LOG.debug(_('Making asynchronous cast on %s...'), topic)
    
    # 为消息msg添加随机生成的unique_id;
    _add_unique_id(msg)
    # 打包上下文信息context到msg中;
    pack_context(msg, context)
    
    # ConnectionContext(conf, connection_pool):建立一个新连接,或者从连接池中获取一个连接;
    # serialize_msg(msg):消息msg的序列化;
    # topic_send:实现发送一个'topic'主题的消息;
    with ConnectionContext(conf, connection_pool) as conn:
        conn.topic_send(topic, rpc_common.serialize_msg(msg))

    先来看语句ConnectionContext(conf, connection_pool),实现了建立一个新连接,或者从连接池中获取一个连接,其中类ConnectionContext实现的是对连接功能的一个封装。具体来看代码:

class ConnectionContext(rpc_common.Connection):
    """
    这个类实际上返回给create_connection()的调用者;
    这个类是对连接功能的一个封装;
    这个类提供方法建立新的连接,也可以实现从连接池中直接获取一个连接;
    当然,也有方法可以实现对连接的删除,当连接删除之后,如果是从连接池获取的连接会把连接返回连接池;
    """

    def __init__(self, conf, connection_pool, pooled=True, server_params=None):
        """
        建立一个新的连接,或者从连接池中获取一个连接;
        """
        self.connection = None
        self.conf = conf
        self.connection_pool = connection_pool
        
        # 如果已经获取连接池对象,直接从连接池中获取一个连接;
        if pooled:
            self.connection = connection_pool.get()
        else:
            self.connection = connection_pool.connection_cls(conf, server_params=server_params)
        self.pooled = pooled
    这里,我们也实现了前面提到的kombu中消息发送机制的简单实现中的第一个步骤,即获取连接。

    在下一篇博文中,我将继续解析方法def cast(conf, context, topic, msg)中的重要语句topic_send(topic, rpc_common.serialize_msg(msg)),来看看建立连接之后,如果实现消息发送机制中的其他步骤。

 类似资料: