感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!
如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn
这篇博文开始解析NOVA中的AMQP架构下消息的消费者如何从特定的消息队列中读取发送给自己的消息,并进行执行操作。
总体来讲,Nova中的各个服务在启动的时候就会初始化会用到的队列,而且会启动一个绿色线程,不断的循环验证新的消息的到来,一旦有新的消息,将会由合适的consumer进行读取,并进一步进行消息的解析和执行操作。
下面,我将会以compute服务的启动作为实例,重点解析AMQP架构下消息的消费操作。
1.kombu consumer代码示例
kombu中消息接收消费机制的简单实现如示例所示:
#!/usr/bin/python
from kombu.entity import Exchange, Queue
from kombu.messaging import Consumer
from kombu.connection import Connection
def process_media(body, message):
print body
message.ack()
connection = Connection('amqp://guest:bupt@172.16.4.1:5672//')
channel = connection.channel()
media_exchange = Exchange('media', 'direct', channel)
video_queue = Queue('video', exchange=media_exchange, routing_key='video', channel=channel)
consumer = Consumer(channel, queues=[video_queue], callbacks=[process_media])
consumer.consume()
while True:
connection.drain_events()
consumer.cancel()
思路也很简单:
(1)创建连接;
(2)获取channel;
(3)创建exchange;
(4)创建队列并与exchange绑定;
(5)创建Consumer;
(6)consume()向server注册,表明可以接受消息了;
(7)drain_enents阻塞程序,等待消息到来;
(8)cancel()通知server不要向该consumer发送任何消息了;
在nova中,当然不能实现的这么简便,而是进行了一系列的封装操作,但是基本的实现思路是一致的。
2.以nova-compute服务启动为例,解析AMQP架构下消息的消费操作
首先来看nova-compute服务启动脚本代码:
"""
Nova Compute的启动脚本;
"""
import eventlet
import os
if os.name == 'nt':
eventlet.monkey_patch(os=False)
else:
eventlet.monkey_patch()
import os
import sys
import traceback
from oslo.config import cfg
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'nova', '__init__.py')):
sys.path.insert(0, POSSIBLE_TOPDIR)
from nova import config
import nova.db.api
from nova import exception
from nova.openstack.common import log as logging
from nova import service
from nova import utils
CONF = cfg.CONF
CONF.import_opt('compute_topic', 'nova.compute.rpcapi')
CONF.import_opt('use_local', 'nova.conductor.api', group='conductor')
LOG = logging.getLogger('nova.compute')
def block_db_access():
class NoDB(object):
def __getattr__(self, attr):
return self
def __call__(self, *args, **kwargs):
stacktrace = "".join(traceback.format_stack())
LOG.error('No db access allowed in nova-compute: %s' % stacktrace)
raise exception.DBNotAllowed('nova-compute')
nova.db.api.IMPL = NoDB()
if __name__ == '__main__':
config.parse_args(sys.argv)
logging.setup('nova')
utils.monkey_patch()
if not CONF.conductor.use_local:
block_db_access()
# 初始化Service这个类,并且获取其实例化对象;
server = service.Service.create(binary='nova-compute',
topic=CONF.compute_topic,
db_allowed=False)
service.serve(server)
service.wait()
简单的说,nova-compute服务启动脚本中将会有如下的代码执行顺序:
def serve(server, workers=None):
global _launcher
if _launcher:
raise RuntimeError(_('serve() can only be called once'))
if workers:
_launcher = ProcessLauncher()
_launcher.launch_server(server, workers=workers)
else:
_launcher = ServiceLauncher()
_launcher.launch_server(server)
def launch_server(self, server):
"""
加载并启动给定的服务;
"""
if self.backdoor_port is not None:
server.backdoor_port = self.backdoor_port
gt = eventlet.spawn(self.run_server, server)
self._services.append(gt)
def run_server(server):
"""
启动并等待一个服务的运行的结束;
"""
server.start()
server.wait()
可见在nova-compute服务启动的过程中将会调用/nova/server.py中类Service下的方法def start(self),这也将会是我们主要进行解析的一个方法,来看实现的代码:
def start(self):
verstr = version.version_string_with_package()
LOG.audit(_('Starting %(topic)s node (version %(version)s)'),
{'topic': self.topic, 'version': verstr})
# 在进程开始之前执行基本配置的检测;
self.basic_config_check()
self.manager.init_host()
self.model_disconnected = False
# 获取上下文信息;
ctxt = context.get_admin_context()
try:
# 查询数据库获取topic、host、binary类型指定的所有的服务;
self.service_ref = self.conductor_api.service_get_by_args(ctxt, self.host, self.binary)
# 获取这些服务的ID值;
self.service_id = self.service_ref['id']
except exception.NotFound:
# 如果没有合适的服务存在,则根据上下文要求建立一个服务环境;
self.service_ref = self._create_service_ref(ctxt)
if self.backdoor_port is not None:
self.manager.backdoor_port = self.backdoor_port
# 建立一个到用于RPC的消息总线的连接;
# 建立获取到RabbitMQ的连接;
# 创建连接,默认是kombu实现;
self.conn = rpc.create_connection(new=True)
LOG.debug(_("Creating Consumer connection for Service %s") %
self.topic)
# 更新现有资源;
# 读取系统的总共资源以及可用的资源,更新资源,算出已经使用的资源;
self.manager.pre_start_hook(rpc_connection=self.conn)
# 获取RPC调度器;
# 初始化RPC调度器;
rpc_dispatcher = self.manager.create_rpc_dispatcher()
# 建立不同的消息消费者;
# 创建以服务的topic为路由键的消费者;
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
# 创建以服务的topic和本机名为路由键的消费者(基于topic&host,可用来接收定向消息);
node_topic = '%s.%s' % (self.topic, self.host)
self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
# fanout直接投递消息,不进行匹配,速度最快(fanout类型,可用于接收广播消息);
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
# 启动消费者线程;
# consume_in_thread用evelent.spawn创建一个协程一直运行;
# 等待消息,在有消费到来时会创建新的协程运行远程调用的函数;
self.conn.consume_in_thread()
# 在启动服务之后,并能够通过RPC接收消息之后,广播通知每个节点更新capabilities属性值,由控制器获取每个节点上的这个属性值;
self.manager.post_start_hook()
LOG.debug(_("Join ServiceGroup membership for this service %s")
% self.topic)
# 添加服务到服务成员组;
pulse = self.servicegroup_api.join(self.host, self.topic, self)
if pulse:
self.timers.append(pulse)
if self.periodic_enable:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
initial_delay = None
periodic = utils.DynamicLoopingCall(self.periodic_tasks)
periodic.start(initial_delay=initial_delay,
periodic_interval_max=self.periodic_interval_max)
self.timers.append(periodic)
这个方法主要实现了获取所有服务、创建到RPC的连接,创建不同类型的消息消费者,启动消费者线程用来执行获取的消息,并在启动服务后添加服务到服务成员组等等操作。下面我们来详细的解析这个方法。
2.1 语句self.conn = rpc.create_connection(new=True)的解析
这条语句实现了建立获取到RabbitMQ的连接,具体来看方法create_connection的代码实现:
def create_connection(new=True):
"""
建立一个到用于RPC的消息总线的连接;
建立获取到RabbitMQ的连接;
"""
return _get_impl().create_connection(CONF, new=new)
def create_connection(conf, new=True):
"""
建立获取到RabbitMQ的连接;
"""
# get_connection_pool:获取到RabbitMQ的连接池,并返回这个连接池的对象;
# Connection:连接到RabbitMQ的实现类;
return rpc_amqp.create_connection(
conf, new,
rpc_amqp.get_connection_pool(conf, Connection))
注:方法中有关的方法在前面的博文中已经进行过解析,所以这里不再赘述。
def create_connection(conf, new, connection_pool):
"""
建立连接;
"""
return ConnectionContext(conf, connection_pool, pooled=not new)
class ConnectionContext(rpc_common.Connection):
"""
这个类是对连接功能的一个封装;
这个类提供方法建立新的连接,也可以实现从连接池中直接获取一个连接;
当然,也有方法可以实现对连接的删除,当连接删除之后,如果是从连接池获取的连接会把连接返回连接池;
"""
def __init__(self, conf, connection_pool, pooled=True, server_params=None):
"""
建立一个新的连接,或者从连接池中获取一个连接;
"""
self.connection = None
self.conf = conf
self.connection_pool = connection_pool
# 如果已经获取连接池对象,直接从连接池中获取一个连接;
if pooled:
self.connection = connection_pool.get()
else:
self.connection = connection_pool.connection_cls(conf, server_params=server_params)
self.pooled = pooled
到这里,一个到RabbitMQ的连接已经建立好,也就是实现了前面消息消费者示例所说的第一个步骤,创建连接。
2.2 语句self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)的解析
这里有三条相似的语句,如下:
# 建立不同的消息消费者;
# 创建以服务的topic为路由键的消费者;
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
# 创建以服务的topic和本机名为路由键的消费者(基于topic&host,可用来接收定向消息);
node_topic = '%s.%s' % (self.topic, self.host)
self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
# fanout直接投递消息,不进行匹配,速度最快(fanout类型,可用于接收广播消息);
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
这三条语句实现了建立三种不同的消息消费者,分别是以topic为路由键的主题式消费者,以topic.<host>为路由键的主题式消费者和以topic为路由键的广播式消费者。这里主题式和广播式是以消息消费者所应用的交换器的类型来进行区分的;而交换器的不同,主要体现在路由匹配方法的不同;这些后面会涉及到。
这里现以语句self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)作为示例进行解析。
首先来看方法create_consumer,实现了消息消费者的建立,具体来看代码的实现:
def create_consumer(self, topic, proxy, fanout=False):
"""
根据参数建立具体的消息消费者;
"""
# Connection:连接到RabbitMQ的实现类;
# get_connection_pool:获取到RabbitMQ的连接池,并返回这个连接池的对象;
# connection_cls:是一个连接类的对象;
# 获取ProxyCallback类的初始化实例对象;
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
if fanout:
# 建立一个广播类型的消费者;
# 声明队列和交换器,通过routing key绑定队列到交换器;
self.declare_fanout_consumer(topic, proxy_cb)
else:
# declare_topic_consumer:建立一个主题式的信息消费者;
# 声明队列和交换器,通过routing key绑定队列到交换器;
self.declare_topic_consumer(topic, proxy_cb)
2.2.1 首先来看语句proxy_cb = rpc_amqp.ProxyCallback(......):
class ProxyCallback(_ThreadPoolWithWait):
def __init__(self, conf, proxy, connection_pool):
# 这个类实现了启动一个用于处理传入信息的绿色线程;
super(ProxyCallback, self).__init__(
conf=conf,
connection_pool=connection_pool,
)
self.proxy = proxy
self.msg_id_cache = _MsgIdCache()
def __call__(self, message_data):
if hasattr(local.store, 'context'):
del local.store.context
# 记录日志,但是一些敏感信息,像创建虚拟机时的密码等信息不会
# 显示在日志中,面是替换为"<SANITIZED>"
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
# 重复消息检测;
# ack返回前,AMQP消费者可能会出现两次读取相同信息的异常,这个方法可以防止这样的情况出现;
self.msg_id_cache.check_duplicate_message(message_data)
# 从message_data中解析出上下文信息;
ctxt = unpack_context(self.conf, message_data)
# 从message_data中获取要执行的方法method;
method = message_data.get('method')
# 从message_data中获取相关参数args;
args = message_data.get('args', {})
# 从message_data中获取版本的相关信息;
version = message_data.get('version', None)
if not method:
LOG.warn(_('no method for message: %s') % message_data)
ctxt.reply(_('No method for message: %s') % message_data,
connection_pool=self.connection_pool)
return
# 建立一个新的绿色线程来执行方法method;
self.pool.spawn_n(self._process_data, ctxt, version, method, args)
class _ThreadPoolWithWait(object):
"""
这个类实现了启动一个用于处理传入信息的绿色线程;
"""
def __init__(self, conf, connection_pool):
self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
self.connection_pool = connection_pool
self.conf = conf
这里的_call_方法是非常非常重要的,当然这里我只是拿出来给大家看一下,这里还没有调用这个方法,这个方法就是执行了对获取的消息进行最终处理的过程。其实nova中调用这个方法真的是辗转挪移,七拐八拐,不留意的话很难看到这个方法的调用,后面我会专门进行解析说明这个方法是怎么被调用的。
2.2.2 语句self.declare_topic_consumer(topic, proxy_cb)的解析
这条语句实现了建立一个主题式的消息消费者,具体来看方法declare_topic_consumer的代码:
def declare_topic_consumer(self, topic, callback=None, queue_name=None,
exchange_name=None):
"""
建立一个主题式的信息消费者;
声明队列和交换器,通过routing key绑定队列到交换器;
"""
# 根据传入的类建立信息消费者,把建立好的消费者对象加入到consumers列表中;
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
exchange_name=exchange_name,
),
topic, callback)
首先来看方法declare_consumer:
def declare_consumer(self, consumer_cls, topic, callback):
"""
根据传入的类建立信息消费者,把建立好的消费者对象加入到consumers列表中;
"""
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
"%(err_str)s") % log_info)
def _declare_consumer():
# 传递进来的类consumer_cls应该是TopicConsumer;
# 声明队列和交换器,通过routing key绑定队列到交换器;
consumer = consumer_cls(self.conf, self.channel, topic, callback,
self.consumer_num.next())
self.consumers.append(consumer)
return consumer
return self.ensure(_connect_error, _declare_consumer)
在方法_declare_consumer中,传递进来的consumer_cls应该是TopicConsumer类,所以语句consumer = consumer_cls(self.conf, self.channel, topic, callback, self.consumer_num.next())实现的就是对类TopicConsumer进行初始化,并获取其实例化对象,再把建立好的消费者对象加入到consumers列表中。
下面对类TopicConsumer的初始化过程进行比较细致的解析,来看代码的实现:
class TopicConsumer(ConsumerBase):
"""
主题式消息消费者类;
"""
def __init__(self, conf, channel, topic, callback, tag, name=None,
exchange_name=None, **kwargs):
# 默认选项的设置;
# rabbit_durable_queues:这个参数定义了在RabbitMQ中是否使用持久性的队列;
# 参数的默认值为False;
options = {'durable': conf.rabbit_durable_queues,
'queue_arguments': _get_queue_arguments(conf),
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
# 获取交换器的名称;
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
# 根据相关的配置参数声明产生一个交换器;
exchange = kombu.entity.Exchange(name=exchange_name,
type='topic',
durable=options['durable'],
auto_delete=options['auto_delete'])
# 重连接到channel;
# 声明队列和交换器,通过routing key绑定队列到交换器;
super(TopicConsumer, self).__init__(channel,
callback,
tag,
name=name or topic,
exchange=exchange,
routing_key=topic,
**options)
首先来看交换器的声明类Exchange的初始化过程:
class Exchange(MaybeChannelBound):
"""
交换器声明;
"""
# 短暂性消息传递方式;
TRANSIENT_DELIVERY_MODE = TRANSIENT_DELIVERY_MODE
# 持久性消息传递方式;
PERSISTENT_DELIVERY_MODE = PERSISTENT_DELIVERY_MODE
# 交换器名称;
name = ""
# 如果没有定义交换器类型,默认的交换器类型(直接式交换器),但是这里面已经定义为主题式交换器;
type = "direct"
# 默认交换器是持久类型的;
durable = True
auto_delete = False
# 持久性消息传递方式;
delivery_mode = PERSISTENT_DELIVERY_MODE
attrs = (("name", None),
("type", None),
("arguments", None),
("durable", bool),
("auto_delete", bool),
("delivery_mode", lambda m: DELIVERY_MODES.get(m) or m))
def __init__(self, name="", type="", channel=None, **kwargs):
super(Exchange, self).__init__(**kwargs)
self.name = name or self.name
self.type = type or self.type
# maybe_bind:如果没有绑定,则绑定实例到channel;
self.maybe_bind(channel)
def maybe_bind(self, channel):
"""
如果没有绑定,则绑定实例到channel;
"""
if not self.is_bound and channel:
self._channel = channel
self.when_bound()
self._is_bound = True
return self
再来看语句super(TopicConsumer, self).__init__(channel,callback,tag,name=name or topic,exchange=exchange,routing_key=topic,**options),这条语句实现的是对类TopicConsumer的父类ConsumerBase进行进一步的初始化操作。
class ConsumerBase(object):
"""
Consumer(消费者)基类;
"""
def __init__(self, channel, callback, tag, **kwargs):
"""
根据相关参数在一个amqp channel上声明产生一个队列;
"""
self.callback = callback
self.tag = str(tag)
self.kwargs = kwargs
self.queue = None
# 重连接到channel;
# 声明队列和交换器,通过routing key绑定队列到交换器;
self.reconnect(channel)
ConsumerBase
类是消费者的基类,在这个初始化的过程中,调用了方法reconnect,这个方法中实现了队列的声明和初始化,以及交换器与队列的绑定等重要的过程,这也是我们很关注的,具体来看方法reconnect的代码实现:
def reconnect(self, channel):
"""
重连接到channel;
声明队列和交换器,通过routing key绑定队列到交换器;
"""
self.channel = channel
self.kwargs['channel'] = channel
# 队列的声明类,实现队列初始化,并且实现队列与交换器和channel的绑定;
self.queue = kombu.entity.Queue(**self.kwargs)
# 序列重新的申报;
# 声明队列和交换器,通过routing key绑定队列到交换器;
self.queue.declare()
先来看语句self.queue = kombu.entity.Queue(**self.kwargs),看看队列的定义类
Queue,看看如何实现队列的初始化过程:
class Queue(MaybeChannelBound):
"""
队列的声明类,实现队列初始化,并且实现队列与交换器和channel的绑定;
"""
name = ""
exchange = None
routing_key = ""
durable = True
exclusive = False
auto_delete = False
no_ack = False
attrs = (("name", None),
("exchange", None),
("routing_key", None),
("queue_arguments", None),
("binding_arguments", None),
("durable", bool),
("exclusive", bool),
("auto_delete", bool),
("no_ack", None),
("alias", None))
def __init__(self, name="", exchange=None, routing_key="", channel=None,
**kwargs):
super(Queue, self).__init__(**kwargs)
self.name = name or self.name
self.exchange = exchange or self.exchange
self.routing_key = routing_key or self.routing_key
# exclusive implies auto-delete.
if self.exclusive:
self.auto_delete = True
# 如果没有绑定,则绑定到channel;
self.maybe_bind(channel)
这里获取了队列的实例化对象,但是这个过程无非就是初始化了一些相关的参数信息。
再来看语句self.queue.declare(),这条语句实现了声明队列和交换器,通过routing key绑定队列到交换器,具体的实现来看方法declare的代码:
def declare(self, nowait=False):
"""
声明队列和交换器,通过routing key绑定队列到交换器;
"""
# 队列名称;
name = self.name
if name:
if self.exchange:
self.exchange.declare(nowait)
self.queue_declare(nowait, passive=False)
# 通过routing key绑定队列到交换器;
if name:
self.queue_bind(nowait)
return self.name
在这个方法中主要是线了交换器的声明、队列的声明,并且实现了交换器和队列的绑定等操作。
首先来看语句self.exchange.declare(nowait),这条语句实现了对特定的交换器的声明操作,具体来看方法declare的代码实现:
def declare(self, nowait=False):
"""
声明交换器,并在代理上创建交换器;
"""
# 在绿色线程中执行方法exchange_declare,实现交换器的声明和绑定;
return _SYN(self.channel.exchange_declare, exchange=self.name,
type=self.type,
durable=self.durable,
auto_delete=self.auto_delete,
arguments=self.arguments,
nowait=nowait)
def exchange_declare(self, exchange, type="direct", durable=False,
auto_delete=False, arguments=None, nowait=False):
"""
声明交换器;
"""
try:
prev = self.state.exchanges[exchange]
if not self.typeof(exchange).equivalent(prev, exchange, type,
durable, auto_delete,
arguments):
raise NotEquivalentError(
"Cannot redeclare exchange %r in vhost %r with "
"different type, durable or autodelete value" % (
exchange,
self.connection.client.virtual_host or "/"))
except KeyError:
self.state.exchanges[exchange] = {
"type": type,
"durable": durable,
"auto_delete": auto_delete,
"arguments": arguments or {},
"table": [],
}
再来看语句self.queue_declare(nowait, passive=False),这条语句实现了对特定队列的声明操作,具体来看方法queue_declare的代码实现:
def queue_declare(self, nowait=False, passive=False):
"""
在服务器上声明队列;
"""
ret = _SYN(self.channel.queue_declare, queue=self.name,
passive=passive,
durable=self.durable,
exclusive=self.exclusive,
auto_delete=self.auto_delete,
arguments=self.queue_arguments,
nowait=nowait)
if not self.name:
self.name = ret[0]
return ret
def queue_declare(self, queue, passive=False, auto_delete=False, **kwargs):
"""
声明队列;
"""
if auto_delete:
self.auto_delete_queues.setdefault(queue, 0)
if passive and not self._has_queue(queue, **kwargs):
raise StdChannelError("404",
u"NOT_FOUND - no queue %r in vhost %r" % (
queue, self.connection.client.virtual_host or '/'),
(50, 10), "Channel.queue_declare")
else:
self._new_queue(queue, **kwargs)
return queue, self._size(queue), 0
再来看语句self.queue_bind(nowait),这条语句实现了通过routing key绑定队列到交换器,具体来看方法queue_bind的代码实现:
def queue_bind(self, nowait=False):
"""
在服务器上建立队列的绑定;
通过routing key绑定队列到交换器;
"""
return _SYN(self.channel.queue_bind, queue=self.name,
exchange=self.exchange.name,
routing_key=self.routing_key,
arguments=self.binding_arguments,
nowait=nowait)
def queue_bind(self, queue, exchange, routing_key, arguments=None,
**kwargs):
"""
通过routing key绑定队列到交换器;
"""
# 如果队列已经在绑定的列表中,则直接返回;
if queue in self.state.bindings:
return
table = self.state.exchanges[exchange].setdefault("table", [])
# 队列绑定的信息加入到列表中;
self.state.bindings[queue] = exchange, routing_key, arguments
meta = self.typeof(exchange).prepare_bind(queue,
exchange,
routing_key,
arguments)
table.append(meta)
if self.supports_fanout:
self._queue_bind(exchange, *meta)
def prepare_bind(self, queue, exchange, routing_key, arguments):
return routing_key, self.key_to_pattern(routing_key), queue
至此,建立消息消费者所需要的交换器和队列均已创建完成,并实现了交换器到队列的绑定,因此,也完成了一个主题式消息消费者的建立。也就是说,前面消息消费者示例所说的前5个步骤都已完成。
在下一篇博文中,将继续解析/nova/server.py中类Service下的方法def start(self)中的下一条也是很重要的语句:self.conn.consume_in_thread(),这条语句实现了从队列中获取消息,并最终实现了对消息的处理和执行操作。