根据openstack官网介绍,oslo.messaging库就是把rabbitmq的python库做了封装,在openstack中调用RPC通信就要调用oslo.messaging库。
下面介绍oslo.messaging在RPC通信过程中两个重要部分,RPC Client,Server。
翻译官网Server和Client中介绍:
Server是RPC服务器提供多个端点,每个包含一组远程调用客户端的方法,创建一个PRC服务器,提供一个传输队列,目标和端点列表。
Client是一个类调用远程服务器上的方法,RPCClient类是负责发送方法调用通过消息传输到远程服务器。
在/nova/rpc.py有获取客户端的方法
def get_client(target, version_cap=None, serializer=None):
assert TRANSPORT is not None
serializer = RequestContextSerializer(serializer)
return messaging.RPCClient(TRANSPORT,
target,
version_cap=version_cap,
serializer=serializer)
获取客户端,通过调用messging.RPCClient()方法,再来查看RPCClient方法,这个方法不在navo的源码当中,通过上面的引用importoslo_messagingasmessaging可以看出是从oslo_messaging类中引用过来的,下面来查看oslo_messaging组件。
在代码/oslo_messaging/rpc/client.py中定义了RPCClient类
class RPCClient(object):
"""
<span style="white-space:pre"> </span>一个远程调用服务器的方法的类
"""
def __init__(self, transport, target,
timeout=None, version_cap=None, serializer=None, retry=None):
"""构造一个RPC的客户端
"""
self.conf = transport.conf
self.conf.register_opts(_client_opts)
self.transport = transport
self.target = target
self.timeout = timeout
self.retry = retry
self.version_cap = version_cap
self.serializer = serializer or msg_serializer.NoOpSerializer()
super(RPCClient, self).__init__()
_marker = _CallContext._marker
def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,
version=_marker, server=_marker, fanout=_marker,
timeout=_marker, version_cap=_marker, retry=_marker):
"""Prepare a method invocation context.
"""
return _CallContext._prepare(self,
exchange, topic, namespace,
version, server, fanout,
timeout, version_cap, retry)
def cast(self, ctxt, method, **kwargs):
"""调用一个方法并立即返回
"""
self.prepare().cast(ctxt, method, **kwargs)
def call(self, ctxt, method, **kwargs):
"""调用一个方法,等待回复
"""
return self.prepare().call(ctxt, method, **kwargs)
def can_send_version(self, version=_marker):
"""检查版本是否兼容的版本."""
return self.prepare(version=version).can_send_version()
RPC消息发送函数cast,发送消息立即返回
def cast(self, ctxt, method, **kwargs):
//消息组装
msg = self._make_message(ctxt, method, kwargs)
ctxt = self.serializer.serialize_context(ctxt)
if self.version_cap:
self._check_version_cap(msg.get('version'))
try:
self.transport._send(self.target, ctxt, msg, retry=self.retry)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)
消息组装:_make_message
def _make_message(self, ctxt, method, args):
msg = dict(method=method)
<span style="white-space:pre"> </span>//消息组装函数 method接收端函数名称 args 接收端函数参数
msg['args'] = dict()
for argname, arg in six.iteritems(args):
msg['args'][argname] = self.serializer.serialize_entity(ctxt, arg)
<span style="white-space:pre"> </span>
if self.target.namespace is not None:
msg['namespace'] = self.target.namespace
if self.target.version is not None:
msg['version'] = self.target.version
return msg
消息的发送调用transport._send()
def _send(self, target, ctxt, message,
wait_for_reply=None, timeout=None,
envelope=True, notify=False, retry=None):
class Context(object):
def __init__(self, d):
self.d = d
def to_dict(self):
return self.d
context = Context(ctxt)
msg = message
//若是需要回复,则设置msg_id等信息
if wait_for_reply:
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
msg.update({'_reply_q': self._get_reply_q()})
//msg消息获取唯一的unique_id
rpc_amqp._add_unique_id(msg)
unique_id = msg[rpc_amqp.UNIQUE_ID]
rpc_amqp.pack_context(msg, context)
if envelope:
msg = rpc_common.serialize_msg(msg)
//若需要回复,则监听回复的msg_id
if wait_for_reply:
self._waiter.listen(msg_id)
log_msg = "CALL msg_id: %s " % msg_id
else:
log_msg = "CAST unique_id: %s " % unique_id
//获取一个connection的对象
try:
with self._get_connection(rpc_amqp.PURPOSE_SEND) as conn:
if notify:
exchange = self._get_exchange(target)
log_msg += "NOTIFY exchange '%(exchange)s'" \
" topic '%(topic)s'" % {
'exchange': exchange,
'topic': target.topic}
LOG.debug(log_msg)
conn.notify_send(exchange, target.topic, msg, retry=retry)
elif target.fanout:
log_msg += "FANOUT topic '%(topic)s'" % {
'topic': target.topic}
LOG.debug(log_msg)
conn.fanout_send(target.topic, msg, retry=retry)
else:
topic = target.topic
exchange = self._get_exchange(target)
if target.server:
topic = '%s.%s' % (target.topic, target.server)
log_msg += "exchange '%(exchange)s'" \
" topic '%(topic)s'" % {
'exchange': exchange,
'topic': target.topic}
LOG.debug(log_msg)
//初始化一个TopicPublisher的对象
conn.topic_send(exchange_name=exchange, topic=topic,
msg=msg, timeout=timeout, retry=retry)
if wait_for_reply:
result = self._waiter.wait(msg_id, timeout)
if isinstance(result, Exception):
raise result
return result
finally:
if wait_for_reply:
self._waiter.unlisten(msg_id)
从RabbitMQ连接池中获取一个链接:_get_connectiong()
def _get_connection(self, purpose=rpc_amqp.PURPOSE_SEND):
return rpc_amqp.ConnectionContext(self._connection_pool,
purpose=purpose)
方法ConnectionContext的初始化
def __init__(self, connection_pool, purpose):
"""创建一个链接或者从一个链接池中获取一个链接"""
self.connection = None
self.connection_pool = connection_pool
pooled = purpose == PURPOSE_SEND
if pooled:
self.connection = connection_pool.get()
else:
# a non-pooled connection is requested, so create a new connection
self.connection = connection_pool.create(purpose)
self.pooled = pooled
self.connection.pooled = pooled
现在看下方法topic_send的实现
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
"""发送一个topic的信息."""
exchange = kombu.entity.Exchange(
name=exchange_name,
type='topic',
durable=self.amqp_durable_queues,
auto_delete=self.amqp_auto_delete)
self._ensure_publishing(self._publish, exchange, msg,
routing_key=topic, retry=retry)
在来看下_public方法的实现
def _publish(self, exchange, msg, routing_key=None, timeout=None):
"""发布消息."""
producer = kombu.messaging.Producer(exchange=exchange,
channel=self.channel,
routing_key=routing_key)
expiration = None
if timeout:
expiration = int(timeout * 1000)
transport_timeout = timeout
heartbeat_timeout = self.heartbeat_timeout_threshold
if (self._heartbeat_supported_and_enabled() and (
transport_timeout is None or
transport_timeout > heartbeat_timeout)):
transport_timeout = heartbeat_timeout
log_info = {'msg': msg,
'who': exchange or 'default',
'key': routing_key}
LOG.trace('Connection._publish: sending message %(msg)s to'
' %(who)s with routing key %(key)s', log_info)
with self._transport_socket_timeout(transport_timeout):
producer.publish(msg, expiration=expiration)
方法call的发送过程,和cast类似,区别在于call方法等待consumer结束要拿到返回值。call需要考虑超时和捕获异常。
def call(self, ctxt, method, **kwargs):
"""Invoke a method and wait for a reply. See RPCClient.call()."""
if self.target.fanout:
raise exceptions.InvalidTarget('A call cannot be used with fanout',
self.target)
//封装消息
msg = self._make_message(ctxt, method, kwargs)
msg_ctxt = self.serializer.serialize_context(ctxt)
//设置超时时间
timeout = self.timeout
if self.timeout is None:
timeout = self.conf.rpc_response_timeout
if self.version_cap:
self._check_version_cap(msg.get('version'))
//发送
try:
result = self.transport._send(self.target, msg_ctxt, msg,
wait_for_reply=True, timeout=timeout,
retry=self.retry)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)
return self.serializer.deserialize_entity(ctxt, result)
重新回到_send方法,因为等待参数生效,所以分析等待回复的队列_get_reply_q()
def _get_reply_q(self):
with self._reply_q_lock:
if self._reply_q is not None:
return self._reply_q
reply_q = 'reply_' + uuid.uuid4().hex
//获取一个连接
conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)
//等下分析
self._waiter = ReplyWaiter(reply_q, conn,
self._allowed_remote_exmods)
self._reply_q = reply_q
self._reply_q_conn = conn
return self._reply_q
分析ReplyWaiter这个类
class ReplyWaiter(object):
def __init__(self, reply_q, conn, allowed_remote_exmods):
self.conn = conn
self.allowed_remote_exmods = allowed_remote_exmods
self.msg_id_cache = rpc_amqp._MsgIdCache()
self.waiters = ReplyWaiters()
//声明一个消费者
self.conn.declare_direct_consumer(reply_q, self)
self._thread_exit_event = threading.Event()
self._thread = threading.Thread(target=self.poll)
self._thread.daemon = True
self._thread.start()
刚才传进来的reply_q进入了self.conn.declare_direct_consumer(reply_q,self)方法
def declare_direct_consumer(self, topic, callback):
"""创建一个direct类型的queue
"""
consumer = Consumer(exchange_name=topic,
queue_name=topic,
routing_key=topic,
type='direct',
durable=False,
auto_delete=True,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues)
self.declare_consumer(consumer)
这里可以看到这个队列是一个Direct类型的消费者,在看下declare_consumer方法的实现
def declare_consumer(self, consumer):
""" 创建一个消费者类,
"""
def _connect_error(exc):
log_info = {'topic': consumer.routing_key, 'err_str': exc}
LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': "
"%(err_str)s"), log_info)
def _declare_consumer():
consumer.declare(self)
self._consumers.append(consumer)
self._new_consumers.append(consumer)
return consumer
with self._connection_lock:
return self.ensure(_declare_consumer,
error_callback=_connect_error)
再次重新回到_send方法,listen方法就是监听返回值
if wait_for_reply:
self._waiter.listen(msg_id)
log_msg = "CALL msg_id: %s " % msg_id
最后面的如果返回有异常则抛出异常,最后取消监听
if wait_for_reply:
result = self._waiter.wait(msg_id, timeout)
if isinstance(result, Exception):
raise result
return result
finally:
if wait_for_reply:
self._waiter.unlisten(msg_id)