当前位置: 首页 > 工具软件 > py-amqp > 使用案例 >

OpenStack建立实例完整过程源码详细分析(15)----依据AMQP通信架构实现消息接收机制解析之二

吕自明
2023-12-01

感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!
如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn


此片博文继续上一篇博文的工作,继续对/nova/server.py中类Service下的方法def start(self)进行解析,来实现对Nova下的AMQP的消息消费者机制的解析工作。

2.3 语句self.conn.consume_in_thread()的解析

这条语句实现了从队列中获取消息,并最终实现了对消息的处理和执行操作。

def consume_in_thread(self):
    """
    启动消费者线程;
    处理一个绿色线程中的所有的队列/消费者信息;
    """
    def _consumer_thread():
        try:
            self.consume()
        except greenlet.GreenletExit:
            return
    # 启动一个绿色线程来运行方法_consumer_thread;
    # 实现获取下一个消费者;
    if self.consumer_thread is None:
        self.consumer_thread = eventlet.spawn(_consumer_thread)
    return self.consumer_thread
这个方法中启动了一个绿色线程来运行方法_consumer_thread,进而运行方法consume,我们来看方法consume的代码实现:

def consume(self, limit=None):
    it = self.iterconsume(limit=limit)
    while True:
        try:
            it.next()
        except StopIteration:
            return
def iterconsume(self, limit=None, timeout=None):
    """
    返回一个迭代器,它实现了处理所有的队列/消费者信息;
    """

    info = {'do_consume': True}

    def _error_callback(exc):
        if isinstance(exc, socket.timeout):
            LOG.debug(_('Timed out waiting for RPC response: %s') % str(exc))
            raise rpc_common.Timeout()
        else:
            LOG.exception(_('Failed to consume message from queue: %s') %
                              str(exc))
            info['do_consume'] = True

    def _consume():
        if info['do_consume']:
            queues_head = self.consumers[:-1]
            queues_tail = self.consumers[-1]
            for queue in queues_head:
               queue.consume(nowait=True)
            queues_tail.consume(nowait=False)
            info['do_consume'] = False
        return self.connection.drain_events(timeout=timeout)

    for iteration in itertools.count(0):
        if limit and iteration >= limit:
            raise StopIteration
        yield self.ensure(_error_callback, _consume)
在这个方法中最重要的语句就是queue.consume(nowait=True)和queues_tail.consume(nowait=False),所以我们进一步来看这里的方法consume的具体实现:

def consume(self, *args, **kwargs):
    options = {'consumer_tag': self.tag}
    options['nowait'] = kwargs.get('nowait', False)
    callback = kwargs.get('callback', self.callback)
    if not callback:
        raise ValueError("No callback defined")

    def _callback(raw_message):
        message = self.channel.message_to_python(raw_message)
        try:
            # 这里将获取的消息进行反序列化(相对于消息发送前进行的序列化操作);
            msg = rpc_common.deserialize_msg(message.payload)
            callback(msg)
        except Exception:
            LOG.exception(_("Failed to process message... skipping it."))
        finally:
            message.ack()

    self.queue.consume(*args, callback=_callback, **options)
我们可以看到这个方法最后执行了语句self.queue.consume(*args, callback=_callback, **options),可以看到这里传入的参数callback就是上面代码中的方法_callback,可以预见,在后面的方法执行过程中,最终将会调用到这个方法def _callback(raw_message),我们先记住它,稍后合适的地方再进行进一步解析。

来看语句self.queue.consume(*args, callback=_callback, **options)中的方法consume的代码实现:

def consume(self, consumer_tag='', callback=None, no_ack=None,
            nowait=False):
        """
        处理队列中的消息;
        keyword consumer_tag:消费者的唯一标识符;
        keyword no_ack:如果接受的消息没有被确认;(???)
        keyword nowait:不等待回应;
        keyword callback:每个传递消息的回调方法;
        """
        if no_ack is None:
            no_ack = self.no_ack
        return self.channel.basic_consume(queue=self.name,
                                          no_ack=no_ack,
                                          consumer_tag=consumer_tag or '',
                                          callback=callback,
                                          nowait=nowait)
进一步来看方法basic_consume的实现代码:

def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
        """
        处理队列中的消息;
        """
        self._tag_to_queue[consumer_tag] = queue
        self._active_queues.append(queue)
        if queue in self.auto_delete_queues:
            self.auto_delete_queues[queue] += 1

        def _callback(raw_message):
            message = self.Message(self, raw_message)
            if not no_ack:
                self.qos.append(message, message.delivery_tag)
            return callback(message)

        self.connection._callbacks[queue] = _callback
        self._consumers.add(consumer_tag)

        self._reset_cycle()
我们可以看到这里在方法def _callback(raw_message)的执行过程的最后,终于调用了前面传进来的参数callback,也就是前面我们记住的那个方法,也就是def consume(self, *args, **kwargs)中的def _callback(raw_message),我们来看一下这个方法的具体实现代码:

def _callback(raw_message):
        message = self.channel.message_to_python(raw_message)
        try:
            # 这里将获取的消息进行反序列化(相对于消息发送前进行的序列化操作);
            msg = rpc_common.deserialize_msg(message.payload)
            callback(msg)
        except Exception:
            LOG.exception(_("Failed to process message... skipping it."))
        finally:
            message.ack()
我们可以看到,在这个方法中的语句callback(msg),它应该是最终处理所获得消息的实现程序,具体它是哪个方法呢,其实callback也是一层层传进来的一个参数方法,他就是上一篇博文中我提到的/nova/openstack/common/rpc/amqp.py----class ProxyCallback(_ThreadPoolWithWait)下面的方法def __call__(self, message_data),但是是如何定位到这个方法的呢,后面我会加以说明。现在来看这个方法def __call__(self, message_data)的实现代码:

def __call__(self, message_data):
    if hasattr(local.store, 'context'):
        del local.store.context
    # 记录日志,但是一些敏感信息,像创建虚拟机时的密码等信息不会
    # 显示在日志中,面是替换为"<SANITIZED>"
    rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
    # 重复消息检测;
    # ack返回前,AMQP消费者可能会出现两次读取相同信息的异常,这个方法可以防止这样的情况出现;
    self.msg_id_cache.check_duplicate_message(message_data)
    # 从message_data中解析出上下文信息;
    ctxt = unpack_context(self.conf, message_data)
    # 从message_data中获取要执行的方法method;
    method = message_data.get('method')
    # 从message_data中获取相关参数args;
    args = message_data.get('args', {})
    # 从message_data中获取版本的相关信息;
    version = message_data.get('version', None)
    if not method:
        LOG.warn(_('no method for message: %s') % message_data)
        ctxt.reply(_('No method for message: %s') % message_data,
                   connection_pool=self.connection_pool)
        return
    # 建立一个新的绿色线程来执行方法method;
    self.pool.spawn_n(self._process_data, ctxt, version, method, args)
前面已经对获取的消息进行了反序列化操作,这里分别直接获取了消息中不同的信息。我们可以看到,在方法的最后,语句self.pool.spawn_n(self._process_data, ctxt, version, method, args)就是我们最想要的东西,它建立了一个绿色线程来执行消息中指定要运行的方法method,比如运行实例等等。

我们在回到方法def _callback(raw_message)中,看到最后执行这样一条语句:message.ack(),这条语句实现了确认获取的消息已经完成执行操作,并从队列中删除这条消息。

至此,语句self.conn.consume_in_thread()解析完成。


下面,我来解释一下,为什么前面说到的callback参数方法是如何定位到方法def __call__(self, message_data)的,当然只是简单的说明一下过程。
def consume(self, *args, **kwargs):
    options = {'consumer_tag': self.tag}
    options['nowait'] = kwargs.get('nowait', False)
    callback = kwargs.get('callback',self.callback)
    if not callback:
        raise ValueError("No callback defined")

    def _callback(raw_message):
        message = self.channel.message_to_python(raw_message)
        try:
            # 这里将获取的消息进行反序列化(相对于消息发送前进行的序列化操作);
            msg = rpc_common.deserialize_msg(message.payload)
            callback(msg)
        except Exception:
            LOG.exception(_("Failed to process message... skipping it."))
        finally:
            message.ack()

    self.queue.consume(*args, callback=_callback, **options)
重点看这里的语句callback(msg),这条语句实际实施了对获取消息的处理过程。这里的重点是callback到底调用的是哪个方法。定义这个方法的语句是:
callback= kwargs.get('callback',self.callback)
这里就需要引出python中字典的get用法:
get(key,default=None)
返回键值key对应的值;如果key没有在字典里,则返回default参数的值,默认为None;
所以这里callback的首选值为kwargs[callback'],如果字典kwargs中没有相应的元素,则赋值callback的值为self.callback;

我们继续来探寻这里的callback的源头:
可以看到方法defconsume(self,*args, **kwargs)所处在的类classConsumerBase(object)的初始化方法__init__中有callback的取值:
def __init__(self, channel, callback, tag, **kwargs):
    """
    根据相关参数在一个amqp channel上声明产生一个队列;
    """
    self.callback = callback
    self.tag = str(tag)
    self.kwargs = kwargs
    self.queue = None
    # 重连接到channel;
    # 声明队列和交换器,通过routing key绑定队列到交换器;
    self.reconnect(channel)
我们可以想一下,什么时候对这个类进行了类的对象初始化,对,就是在执行语句:

self.conn.create_consumer(self.topic,rpc_dispatcher, fanout=False)的过程中,有对类classConsumerBase(object)进行了对象初始化过程。下面简单回顾一下这条语句的执行过程:

self.conn.create_consumer(self.topic,rpc_dispatcher, fanout=False)

def create_consumer(self, topic, proxy, fanout=False):
    """
    根据参数建立具体的消息消费者;
    """
    # Connection:连接到RabbitMQ的实现类;
    # get_connection_pool:获取到RabbitMQ的连接池,并返回这个连接池的对象;
    # connection_cls:是一个连接类的对象;
    # 获取ProxyCallback类的初始化实例对象;
    proxy_cb = rpc_amqp.ProxyCallback(
            self.conf, proxy,
            rpc_amqp.get_connection_pool(self.conf, Connection))

    self.proxy_callbacks.append(proxy_cb)

    if fanout:
        # 建立一个广播类型的消费者;
        # 声明队列和交换器,通过routing key绑定队列到交换器;
        self.declare_fanout_consumer(topic, proxy_cb)
    else:
        # declare_topic_consumer:建立一个主题式的信息消费者;
        # 声明队列和交换器,通过routing key绑定队列到交换器;
        self.declare_topic_consumer(topic, proxy_cb)

def declare_topic_consumer(self, topic, callback=None, queue_name=None,
                               exchange_name=None):
        """
        建立一个主题式的信息消费者;
        声明队列和交换器,通过routing key绑定队列到交换器;
        """
        
        # 根据传入的类建立信息消费者,把建立好的消费者对象加入到consumers列表中;
        self.declare_consumer(functools.partial(TopicConsumer,
                                                name=queue_name,
                                                exchange_name=exchange_name,
                                                ),
                              topic, callback)
这里可以看到传入方法declare_topic_consumer中的callback的参数值即为proxy_cb,所以这里分两条线,一条线来看看proxy_cb的获取过程,另一条线来看看方法declare_topic_consumer中的callback是不是我们想找到的那个callback。

首先来看第一条线,我们来看语句:
proxy_cb = rpc_amqp.ProxyCallback(
            self.conf, proxy,
            rpc_amqp.get_connection_pool(self.conf, Connection))

class ProxyCallback(_ThreadPoolWithWait):
    def __init__(self, conf, proxy, connection_pool):
        # 这个类实现了启动一个用于处理传入信息的绿色线程;
        super(ProxyCallback, self).__init__(
            conf=conf,
            connection_pool=connection_pool,
        )
        self.proxy = proxy
        self.msg_id_cache = _MsgIdCache()

    def __call__(self, message_data):
        if hasattr(local.store, 'context'):
            del local.store.context
        rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
        self.msg_id_cache.check_duplicate_message(message_data)
        ctxt = unpack_context(self.conf, message_data)
        method = message_data.get('method')
        args = message_data.get('args', {})
        version = message_data.get('version', None)
        if not method:
            LOG.warn(_('no method for message: %s') % message_data)
            ctxt.reply(_('No method for message: %s') % message_data,
                       connection_pool=self.connection_pool)
            return
        self.pool.spawn_n(self._process_data, ctxt, version, method, args)

class _ThreadPoolWithWait(object):
    """
    这个类实现了启动一个用于处理传入信息的绿色线程;
    """
    def __init__(self, conf, connection_pool):
        self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
        self.connection_pool = connection_pool
        self.conf = conf
我们可以看到proxy_cb是类ProxyCallback的实例化对象,我们还注意到类 ProxyCallback中实现了一个方法__call__,这个方法就是我们所要找到的目标方法,我们继续解析定位过程。

再来看另一条线,来追踪方法declare_topic_consumer中的callback:
def declare_topic_consumer(self, topic, callback=None, queue_name=None,
                               exchange_name=None):
    """
    建立一个主题式的信息消费者;
    声明队列和交换器,通过routing key绑定队列到交换器;
    """
    # 根据传入的类建立信息消费者,把建立好的消费者对象加入到consumers列表中;
    self.declare_consumer(functools.partial(TopicConsumer,
                                            name=queue_name,
                                            exchange_name=exchange_name,
                                            ),
                          topic, callback)

def declare_consumer(self, consumer_cls, topic, callback):
    """
    根据传入的类建立信息消费者,把建立好的消费者对象加入到consumers列表中;
    """

    def _connect_error(exc):
        log_info = {'topic': topic, 'err_str': str(exc)}
        LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
                      "%(err_str)s") % log_info)

    def _declare_consumer():
        # 传递进来的类consumer_cls应该是TopicConsumer;
        # 声明队列和交换器,通过routing key绑定队列到交换器;
        consumer = consumer_cls(self.conf, self.channel, topic, callback,
                                self.consumer_num.next())
        self.consumers.append(consumer)
        return consumer
    return self.ensure(_connect_error, _declare_consumer)

class TopicConsumer(ConsumerBase):
    """
    主题式消息消费者类;
    """

    def __init__(self, conf, channel, topic, callback, tag, name=None,
                 exchange_name=None, **kwargs):
        """
        Init a 'topic' queue.
        """
        # 默认选项的设置;
        
        # rabbit_durable_queues:这个参数定义了在RabbitMQ中是否使用持久性的队列;
        # 参数的默认值为False;
        options = {'durable': conf.rabbit_durable_queues,
                   'queue_arguments': _get_queue_arguments(conf),
                   'auto_delete': False,
                   'exclusive': False}
        options.update(kwargs)
        
        # 获取交换器的名称;
        exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
        # 根据相关的配置参数声明产生一个交换器;
        exchange = kombu.entity.Exchange(name=exchange_name,
                                         type='topic',
                                         durable=options['durable'],
                                         auto_delete=options['auto_delete'])
        # 重连接到channel;
        # 声明队列和交换器,通过routing key绑定队列到交换器;
        super(TopicConsumer, self).__init__(channel,
                                            callback,
                                            tag,
                                            name=name or topic,
                                            exchange=exchange,
                                            routing_key=topic,
                                            **options)

class ConsumerBase(object):
    """
    Consumer(消费者)基类;
    """

    def __init__(self, channel, callback, tag, **kwargs):
        """
        根据相关参数在一个amqp channel上声明产生一个队列;
        """
        self.callback = callback
        self.tag = str(tag)
        self.kwargs = kwargs
        self.queue = None
        # 重连接到channel;
        # 声明队列和交换器,通过routing key绑定队列到交换器;

        self.reconnect(channel)
至此我们可以看到,前面提到的参数proxy_cb,即类ProxyCallback的实例化对象就是我们要找的callback的源头。
前面我们看到类ProxyCallback中除了类的初始化方法__init__之外,还有实现了一个方法__call__,这里我们来看看python中所规定的方法__call__的用法。
Python中有一个有趣的语法,只要定义类的时候,实现__call__函数,这个类就成为可调用的。换句话说,我们可以把这个类的对象当作方法来使用,相当于重载了括号运算符。例如:
class Aclass:
    def __call__(self):
        print 'Hi I am __call__ed';
    def __init__(self, *args, **keyargs):
        print "Hi I am __init__ed";

x = Aclass()             输出 Hi I am __init__ed
x()                      输出 Hi I am __call__ed

现在我们回到最前面的方法defconsume(self, *args, **kwargs),来看语句callback(msg):
def consume(self, *args, **kwargs):
    options = {'consumer_tag': self.tag}
    options['nowait'] = kwargs.get('nowait', False)
    callback = kwargs.get('callback', self.callback)
    if not callback:
        raise ValueError("No callback defined")

    def _callback(raw_message):
        message = self.channel.message_to_python(raw_message)
        try:
            # 这里将获取的消息进行反序列化(相对于消息发送前进行的序列化操作);
            msg = rpc_common.deserialize_msg(message.payload)
            callback(msg)
        except Exception:
            LOG.exception(_("Failed to process message... skipping it."))
        finally:
            message.ack()
现在我们知道callback的值为self.callback,而self.callback的值为proxy_cb,即类ProxyCallback的实例化对象。而类ProxyCallback中实现了方法__call__,根据前面我们提到的相关的python语法可以知道,这里callback执行的就是方法__call__,具体就是__call__(msg)。

 类似资料: