感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!
如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn
此片博文继续上一篇博文的工作,继续对/nova/server.py中类Service下的方法def start(self)进行解析,来实现对Nova下的AMQP的消息消费者机制的解析工作。
2.3 语句self.conn.consume_in_thread()的解析
这条语句实现了从队列中获取消息,并最终实现了对消息的处理和执行操作。
def consume_in_thread(self):
"""
启动消费者线程;
处理一个绿色线程中的所有的队列/消费者信息;
"""
def _consumer_thread():
try:
self.consume()
except greenlet.GreenletExit:
return
# 启动一个绿色线程来运行方法_consumer_thread;
# 实现获取下一个消费者;
if self.consumer_thread is None:
self.consumer_thread = eventlet.spawn(_consumer_thread)
return self.consumer_thread
这个方法中启动了一个绿色线程来运行方法_consumer_thread,进而运行方法consume,我们来看方法consume的代码实现:
def consume(self, limit=None):
it = self.iterconsume(limit=limit)
while True:
try:
it.next()
except StopIteration:
return
def iterconsume(self, limit=None, timeout=None):
"""
返回一个迭代器,它实现了处理所有的队列/消费者信息;
"""
info = {'do_consume': True}
def _error_callback(exc):
if isinstance(exc, socket.timeout):
LOG.debug(_('Timed out waiting for RPC response: %s') % str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
str(exc))
info['do_consume'] = True
def _consume():
if info['do_consume']:
queues_head = self.consumers[:-1]
queues_tail = self.consumers[-1]
for queue in queues_head:
queue.consume(nowait=True)
queues_tail.consume(nowait=False)
info['do_consume'] = False
return self.connection.drain_events(timeout=timeout)
for iteration in itertools.count(0):
if limit and iteration >= limit:
raise StopIteration
yield self.ensure(_error_callback, _consume)
在这个方法中最重要的语句就是queue.consume(nowait=True)和queues_tail.consume(nowait=False),所以我们进一步来看这里的方法consume的具体实现:
def consume(self, *args, **kwargs):
options = {'consumer_tag': self.tag}
options['nowait'] = kwargs.get('nowait', False)
callback = kwargs.get('callback', self.callback)
if not callback:
raise ValueError("No callback defined")
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
try:
# 这里将获取的消息进行反序列化(相对于消息发送前进行的序列化操作);
msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
message.ack()
self.queue.consume(*args, callback=_callback, **options)
我们可以看到这个方法最后执行了语句self.queue.consume(*args, callback=_callback, **options),可以看到这里传入的参数callback就是上面代码中的方法_callback,可以预见,在后面的方法执行过程中,最终将会调用到这个方法def _callback(raw_message),我们先记住它,稍后合适的地方再进行进一步解析。
来看语句self.queue.consume(*args, callback=_callback, **options)中的方法consume的代码实现:
def consume(self, consumer_tag='', callback=None, no_ack=None,
nowait=False):
"""
处理队列中的消息;
keyword consumer_tag:消费者的唯一标识符;
keyword no_ack:如果接受的消息没有被确认;(???)
keyword nowait:不等待回应;
keyword callback:每个传递消息的回调方法;
"""
if no_ack is None:
no_ack = self.no_ack
return self.channel.basic_consume(queue=self.name,
no_ack=no_ack,
consumer_tag=consumer_tag or '',
callback=callback,
nowait=nowait)
进一步来看方法basic_consume的实现代码:
def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
"""
处理队列中的消息;
"""
self._tag_to_queue[consumer_tag] = queue
self._active_queues.append(queue)
if queue in self.auto_delete_queues:
self.auto_delete_queues[queue] += 1
def _callback(raw_message):
message = self.Message(self, raw_message)
if not no_ack:
self.qos.append(message, message.delivery_tag)
return callback(message)
self.connection._callbacks[queue] = _callback
self._consumers.add(consumer_tag)
self._reset_cycle()
我们可以看到这里在方法def _callback(raw_message)的执行过程的最后,终于调用了前面传进来的参数callback,也就是前面我们记住的那个方法,也就是def consume(self, *args, **kwargs)中的def _callback(raw_message),我们来看一下这个方法的具体实现代码:
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
try:
# 这里将获取的消息进行反序列化(相对于消息发送前进行的序列化操作);
msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
message.ack()
我们可以看到,在这个方法中的语句callback(msg),它应该是最终处理所获得消息的实现程序,具体它是哪个方法呢,其实callback也是一层层传进来的一个参数方法,他就是上一篇博文中我提到的/nova/openstack/common/rpc/amqp.py----class ProxyCallback(_ThreadPoolWithWait)下面的方法def __call__(self, message_data),但是是如何定位到这个方法的呢,后面我会加以说明。现在来看这个方法def __call__(self, message_data)的实现代码:
def __call__(self, message_data):
if hasattr(local.store, 'context'):
del local.store.context
# 记录日志,但是一些敏感信息,像创建虚拟机时的密码等信息不会
# 显示在日志中,面是替换为"<SANITIZED>"
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
# 重复消息检测;
# ack返回前,AMQP消费者可能会出现两次读取相同信息的异常,这个方法可以防止这样的情况出现;
self.msg_id_cache.check_duplicate_message(message_data)
# 从message_data中解析出上下文信息;
ctxt = unpack_context(self.conf, message_data)
# 从message_data中获取要执行的方法method;
method = message_data.get('method')
# 从message_data中获取相关参数args;
args = message_data.get('args', {})
# 从message_data中获取版本的相关信息;
version = message_data.get('version', None)
if not method:
LOG.warn(_('no method for message: %s') % message_data)
ctxt.reply(_('No method for message: %s') % message_data,
connection_pool=self.connection_pool)
return
# 建立一个新的绿色线程来执行方法method;
self.pool.spawn_n(self._process_data, ctxt, version, method, args)
前面已经对获取的消息进行了反序列化操作,这里分别直接获取了消息中不同的信息。我们可以看到,在方法的最后,语句self.pool.spawn_n(self._process_data, ctxt, version, method, args)就是我们最想要的东西,它建立了一个绿色线程来执行消息中指定要运行的方法method,比如运行实例等等。
我们在回到方法def _callback(raw_message)中,看到最后执行这样一条语句:message.ack(),这条语句实现了确认获取的消息已经完成执行操作,并从队列中删除这条消息。
至此,语句self.conn.consume_in_thread()解析完成。
下面,我来解释一下,为什么前面说到的callback参数方法是如何定位到方法def __call__(self, message_data)的,当然只是简单的说明一下过程。
def consume(self, *args, **kwargs):
options = {'consumer_tag': self.tag}
options['nowait'] = kwargs.get('nowait', False)
callback = kwargs.get('callback',self.callback)
if not callback:
raise ValueError("No callback defined")
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
try:
# 这里将获取的消息进行反序列化(相对于消息发送前进行的序列化操作);
msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
message.ack()
self.queue.consume(*args, callback=_callback, **options)
重点看这里的语句callback(msg),这条语句实际实施了对获取消息的处理过程。这里的重点是callback到底调用的是哪个方法。定义这个方法的语句是:
callback= kwargs.get('callback',self.callback)
这里就需要引出python中字典的get用法:
get(key,default=None)
返回键值key对应的值;如果key没有在字典里,则返回default参数的值,默认为None;
所以这里callback的首选值为kwargs[callback'],如果字典kwargs中没有相应的元素,则赋值callback的值为self.callback;
我们继续来探寻这里的callback的源头:
可以看到方法defconsume(self,*args, **kwargs)所处在的类classConsumerBase(object)的初始化方法__init__中有callback的取值:
def __init__(self, channel, callback, tag, **kwargs):
"""
根据相关参数在一个amqp channel上声明产生一个队列;
"""
self.callback = callback
self.tag = str(tag)
self.kwargs = kwargs
self.queue = None
# 重连接到channel;
# 声明队列和交换器,通过routing key绑定队列到交换器;
self.reconnect(channel)
我们可以想一下,什么时候对这个类进行了类的对象初始化,对,就是在执行语句:
self.conn.create_consumer(self.topic,rpc_dispatcher, fanout=False)的过程中,有对类classConsumerBase(object)进行了对象初始化过程。下面简单回顾一下这条语句的执行过程:
self.conn.create_consumer(self.topic,rpc_dispatcher, fanout=False)
def create_consumer(self, topic, proxy, fanout=False):
"""
根据参数建立具体的消息消费者;
"""
# Connection:连接到RabbitMQ的实现类;
# get_connection_pool:获取到RabbitMQ的连接池,并返回这个连接池的对象;
# connection_cls:是一个连接类的对象;
# 获取ProxyCallback类的初始化实例对象;
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
if fanout:
# 建立一个广播类型的消费者;
# 声明队列和交换器,通过routing key绑定队列到交换器;
self.declare_fanout_consumer(topic, proxy_cb)
else:
# declare_topic_consumer:建立一个主题式的信息消费者;
# 声明队列和交换器,通过routing key绑定队列到交换器;
self.declare_topic_consumer(topic, proxy_cb)
def declare_topic_consumer(self, topic, callback=None, queue_name=None,
exchange_name=None):
"""
建立一个主题式的信息消费者;
声明队列和交换器,通过routing key绑定队列到交换器;
"""
# 根据传入的类建立信息消费者,把建立好的消费者对象加入到consumers列表中;
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
exchange_name=exchange_name,
),
topic, callback)
这里可以看到传入方法declare_topic_consumer中的callback的参数值即为proxy_cb,所以这里分两条线,一条线来看看proxy_cb的获取过程,另一条线来看看方法declare_topic_consumer中的callback是不是我们想找到的那个callback。
首先来看第一条线,我们来看语句:
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
class ProxyCallback(_ThreadPoolWithWait):
def __init__(self, conf, proxy, connection_pool):
# 这个类实现了启动一个用于处理传入信息的绿色线程;
super(ProxyCallback, self).__init__(
conf=conf,
connection_pool=connection_pool,
)
self.proxy = proxy
self.msg_id_cache = _MsgIdCache()
def __call__(self, message_data):
if hasattr(local.store, 'context'):
del local.store.context
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
self.msg_id_cache.check_duplicate_message(message_data)
ctxt = unpack_context(self.conf, message_data)
method = message_data.get('method')
args = message_data.get('args', {})
version = message_data.get('version', None)
if not method:
LOG.warn(_('no method for message: %s') % message_data)
ctxt.reply(_('No method for message: %s') % message_data,
connection_pool=self.connection_pool)
return
self.pool.spawn_n(self._process_data, ctxt, version, method, args)
class _ThreadPoolWithWait(object):
"""
这个类实现了启动一个用于处理传入信息的绿色线程;
"""
def __init__(self, conf, connection_pool):
self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
self.connection_pool = connection_pool
self.conf = conf
我们可以看到proxy_cb是类ProxyCallback的实例化对象,我们还注意到类 ProxyCallback中实现了一个方法__call__,这个方法就是我们所要找到的目标方法,我们继续解析定位过程。
再来看另一条线,来追踪方法declare_topic_consumer中的callback:
def declare_topic_consumer(self, topic, callback=None, queue_name=None,
exchange_name=None):
"""
建立一个主题式的信息消费者;
声明队列和交换器,通过routing key绑定队列到交换器;
"""
# 根据传入的类建立信息消费者,把建立好的消费者对象加入到consumers列表中;
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
exchange_name=exchange_name,
),
topic, callback)
def declare_consumer(self, consumer_cls, topic, callback):
"""
根据传入的类建立信息消费者,把建立好的消费者对象加入到consumers列表中;
"""
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
"%(err_str)s") % log_info)
def _declare_consumer():
# 传递进来的类consumer_cls应该是TopicConsumer;
# 声明队列和交换器,通过routing key绑定队列到交换器;
consumer = consumer_cls(self.conf, self.channel, topic, callback,
self.consumer_num.next())
self.consumers.append(consumer)
return consumer
return self.ensure(_connect_error, _declare_consumer)
class TopicConsumer(ConsumerBase):
"""
主题式消息消费者类;
"""
def __init__(self, conf, channel, topic, callback, tag, name=None,
exchange_name=None, **kwargs):
"""
Init a 'topic' queue.
"""
# 默认选项的设置;
# rabbit_durable_queues:这个参数定义了在RabbitMQ中是否使用持久性的队列;
# 参数的默认值为False;
options = {'durable': conf.rabbit_durable_queues,
'queue_arguments': _get_queue_arguments(conf),
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
# 获取交换器的名称;
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
# 根据相关的配置参数声明产生一个交换器;
exchange = kombu.entity.Exchange(name=exchange_name,
type='topic',
durable=options['durable'],
auto_delete=options['auto_delete'])
# 重连接到channel;
# 声明队列和交换器,通过routing key绑定队列到交换器;
super(TopicConsumer, self).__init__(channel,
callback,
tag,
name=name or topic,
exchange=exchange,
routing_key=topic,
**options)
class ConsumerBase(object):
"""
Consumer(消费者)基类;
"""
def __init__(self, channel, callback, tag, **kwargs):
"""
根据相关参数在一个amqp channel上声明产生一个队列;
"""
self.callback = callback
self.tag = str(tag)
self.kwargs = kwargs
self.queue = None
# 重连接到channel;
# 声明队列和交换器,通过routing key绑定队列到交换器;
self.reconnect(channel)
至此我们可以看到,前面提到的参数proxy_cb,即类ProxyCallback的实例化对象就是我们要找的callback的源头。
前面我们看到类ProxyCallback中除了类的初始化方法__init__之外,还有实现了一个方法__call__,这里我们来看看python中所规定的方法__call__的用法。
Python中有一个有趣的语法,只要定义类的时候,实现__call__函数,这个类就成为可调用的。换句话说,我们可以把这个类的对象当作方法来使用,相当于重载了括号运算符。例如:
class Aclass:
def __call__(self):
print 'Hi I am __call__ed';
def __init__(self, *args, **keyargs):
print "Hi I am __init__ed";
x = Aclass() 输出 Hi I am __init__ed
x() 输出 Hi I am __call__ed
现在我们回到最前面的方法defconsume(self, *args, **kwargs),来看语句callback(msg):
def consume(self, *args, **kwargs):
options = {'consumer_tag': self.tag}
options['nowait'] = kwargs.get('nowait', False)
callback = kwargs.get('callback', self.callback)
if not callback:
raise ValueError("No callback defined")
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
try:
# 这里将获取的消息进行反序列化(相对于消息发送前进行的序列化操作);
msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
message.ack()
现在我们知道callback的值为self.callback,而self.callback的值为proxy_cb,即类ProxyCallback的实例化对象。而类ProxyCallback中实现了方法__call__,根据前面我们提到的相关的python语法可以知道,这里callback执行的就是方法__call__,具体就是__call__(msg)。