目录
Target(oslo_messaging.target.Target)
服务端使用的驱动函数:
函数1:
class kombu.Queue(name=‘’,exchange=None,routing_key=‘’,channel=None,bingdings=None, on_declared=none,**kwargs)
parameters:
name--队列名字
exchange -- 绑定的exchange
routing_key--路由,exchange根据routing_key来路由消息
channel -- 绑定的channel
durable -- Sever重启,queue是否消失
exclusive -- 队列专有,属于当前连接(connection)
auto_delete -- 所有的consumer使用完后,自动删除。如果exclusive被设定,则auto_delete默认被设定为true。
函数2:
queue_declare(self,nowait=False,passive=False),在server上创建队列
nowait True不等待server的相应信息
passive True server不创建队列,可以用来检测当前server是否存在该队列
queue_bind(self, nowait=False) queue与Exchange绑定
declare(self, nowait=False) 创建队列并与Exchange绑定,内部调用queue_declare,queue_bind
purge(self,nowait=False) 删除队列中所有信息
delete(self,if_unused=False, if_empty=Flase,nowait=False)删除队列
if_unused 队列没被使用时删除
if_empty 队列空时删除
作为服务端的代码示例:
from kombu.entity import Exchange, Queue
from kombu.messaging mport Consumer
from kombu.connection import Connection
conn = Connection(‘amqp://openstack:***@ip:5672’)//建立一个connection连接
channel = conn.channel()
new_exchange = Exchange(‘openstack’,type=’topic’,durable=False)//创建exchange
new_queue=Queue(“queue_name”,channel=channel,exchange= new_exchange,routing_key=’compute’,durable=False)//设置队列与exchange和channel绑定
new_queue.declare()
queue.consume(callback=self._callback, consumer_tag=str(tag), nowait=self.nowait)
生产端如何发送消息呢?
producer = conn.Producer(serializer='json')
producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
exchange=media_exchange, routing_key='video',
declare=[video_queue])
示例:
from kombu.entity import Exchange, Queue
from kombu.messaging mport Consumer,Producer
from kombu.connection import Connection
from kombu.transport.base import Message
conn = Connection(‘amqp://openstack:***@ip:5672’)//建立一个connection连接
channel = conn.channel()#从这个连接中获取一个channel
new_exchange = Exchange(‘openstack’,type=’topic’,durable=False)//用于描述exchange
new_queue=Queue(“queue_name”,channel=channel,exchange= new_exchange,routing_key=’compute’,durable=False)//设置队列与exchange和channel绑定
#构造message
message=Message(channel=channel,body=”This is test”)
# 构造Producer
producer=Producer(channel,serializer=’json’,exchange=new_exchange)
producer.publish(message.body,routing_key=’compute’)
OpenStack中使用组件oslo.message作为消息传递的中间件,通过MQ完成很多协程任务,实现服务之间的分布式部署。
Oslo.message中的几个重要概念:
server(服务端):为RPC服务器提供给多个端点(Endpoint),每个包含一组远程调用客户端的方法,创建一个RPC服务器,提供一个传输队列,目标和端点列表。
client(客户端):客户端负责调用服务端提供的RPC接口。
exchange(交换):一个包含各个项目主体(topic)的容器。
namespace(命名空间):服务器端可以在一个主体上暴露多组方法,每组方法属于一个命名空间。
method(方法):一个方法由一个名字和一组命名的参数组成。
transport(传输工具):一个传送RPC请求到服务器端并将响应返回给客户端的底层消息系统。目前常用的transport有rabbitmq和qpid。
API version:每个命名空间都有一个版本号,当命名空间的接口变化时,这个版本号也会响应增加。向前兼容的修改只需要更改小版本号,向前不兼容的更改需要更改大版本号。
Tansport(传输层)主要实现RPC底层的通信(比如socket)以及事件循环、多线程等其他功能。可以通过URL来获得指向不同Transport实现的句柄。URL格式:
Transport://user:password@hotst1:port[,hostN:portN]/virtual_host
从cfg对象中读取transport_url、rpc_backend、control_exchange信息构造Transport对象。用户可以使用oslo.messaging.get_transport函数来获取transport对象实例的句柄。
import oslo_messaging
transport = solo_messaging.get_transport(cfg, url=None,**kwargs)
对于notify,这里调用的应该是:
url = "rabbit://test:test@192.168.117.156:5672/"
oslo_messaging.set_transport_defaults("jorh_notify")
transport = oslo_messaging.get_notification_transport(cfg.CONF, url=url)
jorh_notify是对应的exchange的值。
get_transport和 get_notification_transport得到的是两个不同的对象,但是继承的是同一个对象,因此用相同的函数也不会有什么影响!
注释:
因此这里的transport,其实就是一个类,其中有一个connection_pool与rabbitmq server保持连接状态,然后发送或者接收就会调用这个类中的方法。
Target封装指定某一个消息最终目的地的所有信息。Target封装了所有将要用到的信息,以确定应该将消息发送到何处或服务器正在侦听什么信息。不管作为服务端还是作为客户端,Target对象是必须构造的!
RPC Server target:
topic和server是必须的,exchange是可选的。
RPC endpoint target:
namespace和version是可选的
RPC client sending a message:
topic是必须的,其他的属性是可选的
Notification Server target:
topic是必须的,exchange是可选的;其他属性忽略。
Notificaton target:
topic是必须的,excahnge是可选的;其他属性忽略。
注意:在不同场景下构造Target对象需要不同的参数。
一个RPC服务器可以暴露多个endpoint,每个endpoint包含一组方法,这组方法可以被客户端通过transport对象远程调用。创建Server对象时,需要指定Transport、Target和一组endpoint。
from oslo_config import cfg
import oslo_messaging
from oslo_log import log as logging
import time
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
logging.register_options(CONF)
logging.setup(CONF,"myservice")
CONF(default_config_files=["app.conf"])
server=None
class ServerControlEndpoint(object):
def __init__(self,server):
print("init the ServerControlEndpoint function!")
self.server = server
def stop(self,ctx):
print("call the stop function")
if self.server:
print("start to stop the sever")
self.server.stop()
class TestEndpoint(object):
def test(self,ctxt,arg):
print("test")
print(arg)
time.sleep(100)
return arg
url = "rabbit://test:test@192.168.117.156:5672/"
oslo_messaging.set_transport_defaults("jorhson_control") #设置control_exchange
transport = oslo_messaging.get_transport(cfg.CONF, url=url) #得到transport方法
target = oslo_messaging.Target(topic="jorh_topic",server="jorh_server") #target其实就是一个对象
endpoints = [ServerControlEndpoint(server),TestEndpoint()]
server = oslo_messaging.get_rpc_server(transport,target,endpoints,executor=None)
try:
server.start()
while True:
time.sleep(10)
except KeyboardInterrupt:
print("Stopping server")
server.stop()
注意:oslo_messaging.get_rpc_server函数返回的server对象,是对transport和target对象的封装,其中的start方法,也是调用transport对象的driver(这里是oslo_messaging._drivers.amqpdriver.AMQPDriverBase)中的listen方法,向rabbitmq服务端注册。这里传入的topic和server是在rabbitmq 服务端注册成队列,一共有三种队列:
conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
topic=target.topic,
callback=listener)
conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
topic='%s.%s' % (target.topic, target.server),
callback=listener)
conn.declare_fanout_consumer(target.topic, listener)
以nova-compute为例,第一个创建的是compute direct,第二个创建的是compute.node1 direct,第三个创建的是compute -*** fault,***代表随机字符;由于该随机字符是随机的,每次启动rpc consumer的时候,都会创建一个新的fault类型的队列绑定在compute-***这个exchange上;nova-compute每重启一次,都会创建一个新的compute_fanout_***队列,绑定在compute_fanout这个exchange上
topic: compute和compute.node1(即所谓的queue)是绑定在jorhson_control上(exchanges)
fault类型的队列,绑定在compute.node1_fanout这个 exchanges上
openstack中的nova-compute服务创建RPC consumer的过程和上面的代码是类似的。
这里注册的回调函数是:oslo_messaging._drivers.amqpdriver.RpcAMQPListener
也即是针对server端,如果有消息被服务端接收之后,都会调用这个类对象进行处理!
注:python中一切皆对象,函数也是对象,同时也是可调用对象(callable)。自定义的函数、内置函数和类都属于可调用对象,但凡是可以把一对括号()应用到某个对象身上都可以称之为可调用对象,判断对象是否可调用可以用函数callable。一个类实例要变成一个可调用对象,只需要实现一个特殊方法__call__().
RpcAMQPListener类对象会将oslo_messaging._drivers.amqpdriver.AMQPIncomingMessage作为自有变量,在RpcAMQPListener对象中会首先调用message.acknowledge(),对客户端的消息进行确认;之后调用oslo_messaging._drivers.amqpdriver.AMQPListener对象(RpcAMQPListener的父类),在该函数中会进一步调用AMQPIncomingMessage对象,并将该对象放入到incoming队列中。
Nova-compute是如何创建RPC server以及如何收发消息的:
self.endpoints=[<nova.compute.manager.ComputeManager>, <nova.baserpc.BaseRPCAPI >]
endpoint=nova.compute.manager.ComputeManager
self.serializer.deserialize_context=<bound method ProfilerRequestContextSerializer.deserialize_context of <nova.rpc.ProfilerRequestContextSerializer >>
最终在oslo_messaging.rpc.dispatcher.RPCDispatcher.dispatch函数中完成类方法的调用和参数的处理
9. 在_process_incoming函数中会调用message.reply(res),实际上调用的函数是oslo_messaging._drivers.amqpdriver.AMQPIncomingMessage.reply函数,会判断是否存在msg_id,这个参数是客户端传过来的,如果不存在这个参数,则直接返回了,存在还要向这个队列中发送消息。
远程调用RPC Server上的方法。远程调用的时候,需要提供一个字典对象指明调用的上下文,调用方法的名字和传递给调用方法的参数。
调用方式:
不管是cast还是call方式,其在业务层的构造方式基本一致:
cctxt = self.router.client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'rescue_instance', **msg_args)
或者
return cctxt.call(ctxt, 'remove_volume_connection',
instance=instance, volume_id=volume_id)
只有调用的方法不一样,cast和call。
需要比较下cast和call的方式在底层实现上有什么不一样!直接比较两个函数中的传参即可看出:
oslo_messaging.rpc.client._BaseCallContext.cast:
self.transport._send(self.target, msg_ctxt, msg, retry=self.retry, transport_options=self.transport_options)
oslo_messaging.rpc.client._BaseCallContext.call:
result = self.transport._send(self.target, msg_ctxt, msg, wait_for_reply=True,
timeout=timeout, call_monitor_timeout=cm_timeout,
retry=self.retry, transport_options=self.transport_options)
可以通过上述代码看到两者最主要的区别在于在调用transport中的_send函数时,cast没有传time_out参数,而call却传了该参数,并且设置wait_for_reply为true。
跟着函数调用继续:oslo_messaging._drivers.amqpdriver.AMQPDriverBase._send
在这个函数中,会判断wait_for_reply标志位,如果该值为true,则代表是同步消息,会构造msg_id,并通过self._waiter.listen(msg_id)创建一个队列;然后将消息发送给服务端,这一步是cast和call都是一样的流程;后面客户端会继续监听这个msg_id队列,并设置一个超时时间,如果在设置的时间内未监听到服务端发来的消息,则会抛出超时的异常错误;最后会通过self._waiter.unlisten(msg_id)取消队列。
作为服务端,必须注册一组endpoint,这个endpoint就是队列接受消息之后的回调函数。见server基本原理。
在oslo.messaging组件中,oslo_messaging._drivers.impl_rabbit.Connection对象是用来建立连接的,和oslo_messaging._drivers.pool.Pool对象一起构成oslo_messaging._drivers.pool.ConnectionPool对象。
oslo_messaging._drivers.amqpdriver.AMQPDriverBase对象对oslo_messaging._drivers.impl_rabbit.Consumer对象和oslo_messaging._drivers.impl_rabbit.Connection对象进行进一步的封装,其中Connection对象中针对self._producer = kombu.messaging.Producer直接调用驱动的参数,然后再调用self._producer.publish将消息发送到目标route中。AMQPDriverBase对象中包含send,send_notification,listen,listen_for_notifications,虽然底层调用并不会区分是call还是cast请求,作为consumer,也不会区分是否是notification,这些都是在AMQPDriverBase对象中完成的。对于call请求,就是利用ReplyWaiter对象(oslo_messaging._drivers.amqpdriver.ReplyWaiter.wait),重新生成一个consumer进行队列的监听。而在listener端,会注册回调函数(对象) RpcAMQPListener(oslo_messaging._drivers.amqpdriver.RpcAMQPListener),在该对象中,得到消息中的_reply_q,并通过向其发送消息,然后ReplyWaiter对象中的线程会不断的轮询该队列,从而得到返回的消息,从而构成了同步消息流程。
而RPC cast请求不会在消息体中构造reply_q,因此RpcAMQPListener不会publish消息。
以call方法为例,oslo_messaging.rpc.client._BaseCallContext.call,在该函数中会从nova层传入method(在nova层将被远程调用的函数)和kwargs(在调用函数时需要传入的参数)。在oslo_messaging.rpc.client._BaseCallContext._make_message函数中会将method和对应的参数封装成字典的形式;然后将该参数传入oslo_messaging._drivers.amqpdriver.AMQPDriverBase._send方法中,在该方法中首先会判断是否是call请求,如果是call请求还要在这个消息体重增加_reply_q的字段,最后调用oslo_messaging._drivers.common.serialize_msg函数,将字典形式的msg变为JSON格式的消息再进行传输。