oslo_messaging学习系列之二《OpenStack中使用消息队列》

邵城
2023-12-01

目录

消息驱动

oslo.messaging模块详解

Transport

Target(oslo_messaging.target.Target)

Server

RPC Client

RPC Client参数传递


消息驱动

服务端使用的驱动函数:

函数1:
class kombu.Queue(name=‘’,exchange=None,routing_key=‘’,channel=None,bingdings=None, on_declared=none,**kwargs)
parameters:
name--队列名字
exchange -- 绑定的exchange
routing_key--路由,exchange根据routing_key来路由消息
channel -- 绑定的channel
durable -- Sever重启,queue是否消失
exclusive -- 队列专有,属于当前连接(connection)
auto_delete -- 所有的consumer使用完后,自动删除。如果exclusive被设定,则auto_delete默认被设定为true。

函数2:
queue_declare(self,nowait=False,passive=False),在server上创建队列
nowait	True不等待server的相应信息
passive	True server不创建队列,可以用来检测当前server是否存在该队列

queue_bind(self, nowait=False)	queue与Exchange绑定

declare(self, nowait=False)	创建队列并与Exchange绑定,内部调用queue_declare,queue_bind

purge(self,nowait=False)	删除队列中所有信息

delete(self,if_unused=False, if_empty=Flase,nowait=False)删除队列
	if_unused		队列没被使用时删除
	if_empty		队列空时删除

作为服务端的代码示例:

from kombu.entity import Exchange, Queue
from kombu.messaging mport Consumer
from kombu.connection import Connection

conn = Connection(‘amqp://openstack:***@ip:5672’)//建立一个connection连接
channel = conn.channel()
new_exchange = Exchange(‘openstack’,type=’topic’,durable=False)//创建exchange
new_queue=Queue(“queue_name”,channel=channel,exchange= new_exchange,routing_key=’compute’,durable=False)//设置队列与exchange和channel绑定
new_queue.declare()
queue.consume(callback=self._callback, consumer_tag=str(tag), nowait=self.nowait)

生产端如何发送消息呢?

producer = conn.Producer(serializer='json')
producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
                      exchange=media_exchange, routing_key='video',
                      declare=[video_queue])

示例:

from kombu.entity import Exchange, Queue
from kombu.messaging mport Consumer,Producer
from kombu.connection import Connection
from kombu.transport.base import Message

conn = Connection(‘amqp://openstack:***@ip:5672’)//建立一个connection连接
channel = conn.channel()#从这个连接中获取一个channel
new_exchange = Exchange(‘openstack’,type=’topic’,durable=False)//用于描述exchange
new_queue=Queue(“queue_name”,channel=channel,exchange= new_exchange,routing_key=’compute’,durable=False)//设置队列与exchange和channel绑定
#构造message
message=Message(channel=channel,body=”This is test”)
# 构造Producer
producer=Producer(channel,serializer=’json’,exchange=new_exchange)
producer.publish(message.body,routing_key=’compute’)

oslo.messaging模块详解

OpenStack中使用组件oslo.message作为消息传递的中间件,通过MQ完成很多协程任务,实现服务之间的分布式部署。

Oslo.message中的几个重要概念:

server(服务端):为RPC服务器提供给多个端点(Endpoint),每个包含一组远程调用客户端的方法,创建一个RPC服务器,提供一个传输队列,目标和端点列表。

client(客户端):客户端负责调用服务端提供的RPC接口。

exchange(交换):一个包含各个项目主体(topic)的容器。

namespace(命名空间):服务器端可以在一个主体上暴露多组方法,每组方法属于一个命名空间。

method(方法):一个方法由一个名字和一组命名的参数组成。

transport(传输工具):一个传送RPC请求到服务器端并将响应返回给客户端的底层消息系统。目前常用的transport有rabbitmq和qpid。

API version:每个命名空间都有一个版本号,当命名空间的接口变化时,这个版本号也会响应增加。向前兼容的修改只需要更改小版本号,向前不兼容的更改需要更改大版本号。

Transport

Tansport(传输层)主要实现RPC底层的通信(比如socket)以及事件循环、多线程等其他功能。可以通过URL来获得指向不同Transport实现的句柄。URL格式:

Transport://user:password@hotst1:port[,hostN:portN]/virtual_host

    从cfg对象中读取transport_url、rpc_backend、control_exchange信息构造Transport对象。用户可以使用oslo.messaging.get_transport函数来获取transport对象实例的句柄。

    import oslo_messaging

    transport = solo_messaging.get_transport(cfg, url=None,**kwargs)

    对于notify,这里调用的应该是:

    url = "rabbit://test:test@192.168.117.156:5672/"

oslo_messaging.set_transport_defaults("jorh_notify")

transport = oslo_messaging.get_notification_transport(cfg.CONF, url=url)

    jorh_notify是对应的exchange的值。

    get_transport和 get_notification_transport得到的是两个不同的对象,但是继承的是同一个对象,因此用相同的函数也不会有什么影响!

注释:

  1. oslo_messaging.set_transport_defaults函数是设置默认的exchanges值,在nova中设置的是nova,如果不设置默认是openstack;
  2. oslo_messaging.get_notification_transport是根据url中设置的值去找到对应的driver,例如rabbit对应的是oslo_messaging._drivers.impl_rabbit:RabbitDriver,这个是在oslo_messaging. setup.cfg文件中设置的。然后找到对应的driver,再用这个driver初始化transport对象,这个transport就是对各种driver进行的二次封装。
  3. 初始化RabbitDriver,这个driver首先根据conf中设置参数,初始化connection_pool,然后再调用父类进行初始化oslo_messaging._drivers.amqpdriver.AMQPDriverBase。所有的发送和接收方法都是通过调用这个类来实现的。

因此这里的transport,其实就是一个类,其中有一个connection_pool与rabbitmq server保持连接状态,然后发送或者接收就会调用这个类中的方法。

Target(oslo_messaging.target.Target)

Target封装指定某一个消息最终目的地的所有信息。Target封装了所有将要用到的信息,以确定应该将消息发送到何处或服务器正在侦听什么信息。不管作为服务端还是作为客户端,Target对象是必须构造的!

RPC Server target:

    topic和server是必须的,exchange是可选的。

RPC endpoint target:

    namespace和version是可选的

RPC client sending a message:

    topic是必须的,其他的属性是可选的

Notification Server target:

    topic是必须的,exchange是可选的;其他属性忽略。

Notificaton target:

    topic是必须的,excahnge是可选的;其他属性忽略。

注意:在不同场景下构造Target对象需要不同的参数。

  1. 创建RPC服务器时,需要topic和server参数,exchange参数可选(默认为openstack);指定一个endpoint的target时,namespace和version是可选的。
  2. 客户端发送消息时,需要topic参数,其他可选。
  1. endpoints:是可供别人远程调用的方法,也是RPC中的远程函数
  2. executor:服务运行的方式,单线程或者多线程

Server

一个RPC服务器可以暴露多个endpoint,每个endpoint包含一组方法,这组方法可以被客户端通过transport对象远程调用。创建Server对象时,需要指定Transport、Target和一组endpoint。

from oslo_config import cfg
import oslo_messaging
from oslo_log import log as logging
import time

CONF = cfg.CONF
LOG = logging.getLogger(__name__)
logging.register_options(CONF)
logging.setup(CONF,"myservice")
CONF(default_config_files=["app.conf"])
server=None
class ServerControlEndpoint(object):
    def __init__(self,server):
        print("init the ServerControlEndpoint function!")
        self.server = server
    def stop(self,ctx):
        print("call the stop function")
        if self.server:
            print("start to stop the sever")
            self.server.stop()

class TestEndpoint(object):
    def test(self,ctxt,arg):
        print("test")
        print(arg)
        time.sleep(100)
        return arg
url = "rabbit://test:test@192.168.117.156:5672/"
oslo_messaging.set_transport_defaults("jorhson_control") #设置control_exchange
transport = oslo_messaging.get_transport(cfg.CONF, url=url) #得到transport方法
target = oslo_messaging.Target(topic="jorh_topic",server="jorh_server") #target其实就是一个对象
endpoints = [ServerControlEndpoint(server),TestEndpoint()]
server = oslo_messaging.get_rpc_server(transport,target,endpoints,executor=None)
try:
    server.start()
    while True:
        time.sleep(10)
except KeyboardInterrupt:
    print("Stopping server")
    server.stop()

注意oslo_messaging.get_rpc_server函数返回的server对象,是对transporttarget对象的封装,其中的start方法,也是调用transport对象的driver(这里是oslo_messaging._drivers.amqpdriver.AMQPDriverBase)中的listen方法,向rabbitmq服务端注册。这里传入的topicserver是在rabbitmq 服务端注册成队列,一共有三种队列:

        conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                                    topic=target.topic,
                                    callback=listener)
        conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                                    topic='%s.%s' % (target.topic, target.server),
                                    callback=listener)
        conn.declare_fanout_consumer(target.topic, listener)

nova-compute为例,第一个创建的是compute direct,第二个创建的是compute.node1 direct,第三个创建的是compute -***  fault***代表随机字符;由于该随机字符是随机的,每次启动rpc consumer的时候,都会创建一个新的fault类型的队列绑定在compute-***这个exchange上;nova-compute每重启一次,都会创建一个新的compute_fanout_***队列,绑定在compute_fanout这个exchange

topic: computecompute.node1(即所谓的queue)是绑定在jorhson_control上(exchanges

fault类型的队列,绑定在compute.node1_fanout这个 exchanges

openstack中的nova-compute服务创建RPC consumer的过程和上面的代码是类似的。

这里注册的回调函数是:oslo_messaging._drivers.amqpdriver.RpcAMQPListener

也即是针对server端,如果有消息被服务端接收之后,都会调用这个类对象进行处理!

注:python中一切皆对象,函数也是对象,同时也是可调用对象(callable)。自定义的函数、内置函数和类都属于可调用对象,但凡是可以把一对括号()应用到某个对象身上都可以称之为可调用对象,判断对象是否可调用可以用函数callable。一个类实例要变成一个可调用对象,只需要实现一个特殊方法__call__().

RpcAMQPListener类对象会将oslo_messaging._drivers.amqpdriver.AMQPIncomingMessage作为自有变量,在RpcAMQPListener对象中会首先调用message.acknowledge(),对客户端的消息进行确认;之后调用oslo_messaging._drivers.amqpdriver.AMQPListener对象(RpcAMQPListener的父类),在该函数中会进一步调用AMQPIncomingMessage对象,并将该对象放入到incoming队列中。

Nova-compute是如何创建RPC server以及如何收发消息的:

  1. nova.service.Service.start函数中会调用messaging.get_rpc_server函数构造rpc_server对象,如上述例子所示,传入的参数有TRANSPORTtarget,这两个的构造过程和上述例子差不多。Endpoints是通过nova.compute.manager._ComputeV5Proxy来构造的,就是获取manager对象中的所有类方法。
  2. oslo_messaging.rpc.server.get_rpc_server方法:首先将endpoints中的方法封装在oslo_messaging.rpc.dispatcher.RPCDispatcher对象中,后续可以通过调用oslo_messaging.rpc.dispatcher.RPCDispatcher.dispatch函数,对方法进行进一步的处理。将RPCDispatcher对象作为参数传入,并返回oslo_messaging.rpc.server.RPCServer对象。
  3. nova.service.Service.start函数中调用RPCServer.start()方法实现服务的创建。
  4. Nova层面调用的RPCServer.start函数是oslo_messaging.server.MessageHandlingServer.start,在这个函数中首先创建一个listeneroslo_messaging.rpc.server.RPCServer._create_listener--> oslo_messaging._drivers.amqpdriver.AMQPDriverBase.listen,这个函数创建三个队列,并把对象oslo_messaging._drivers.amqpdriver.RpcAMQPListener作为这三个队列的回调函数,完成相应队列的启动;最后返回oslo_messaging._drivers.base.PollStyleListenerAdapter对象;然后再调用新创建listener(PollStyleListenerAdapter)start函数,也就是oslo_messaging._drivers.base.PollStyleListenerAdapter.start;前面创建三个队列的时相当于nova_compute接收消息的服务就已经启动了,这里只是将self.listener.start(self._on_incoming)[oslo_messaging._drivers.base.PollStyleListenerAdapter.start]中传入的oslo_messaging.server.MessageHandlingServer._on_incoming 作为callback函数放到一个线程中进行调用,因为这个函数会进一步调用oslo_messaging.rpc.server.RPCServer._process_incoming_process_incoming函数也会进一步调用RPCDispatcher.dispatch完成最终compute服务中的函数分发。
  5. 步骤4中创建队列的函数是oslo_messaging._drivers.impl_rabbit.Connection.declare_direct_consumer,即先构造oslo_messaging._drivers.impl_rabbit.Consumer对象,在这个对象中,会将回调对象RpcAMQPListener进行进一步的封装;这个对象是比较接近底层驱动的函数,直接调用rabbitmqAPI,向其中注册一个回调函数,这个回调函数会将接收到的消息转换成python能够识别的字符,并封装成oslo_messaging._drivers.impl_rabbit.RabbitMessage对象,然后再调用RpcAMQPListener对象对该消息进行处理。因此在RpcAMQPListener对象中调用的acknowledge函数就是oslo_messaging._drivers.impl_rabbit.RabbitMessage.acknowledge函数。
  6. RpcAMQPListener对象中又会调用oslo_messaging._drivers.amqpdriver.AMQPListener来处理得到的消息,同时又会将对象oslo_messaging._drivers.amqpdriver.AMQPIncomingMessage作为其成员变量,后面会用到该变量来区分cast请求还是call请求,分别进行处理。AMQPListener则会调用AMQPIncomingMessage对象进行消息的处理,并同时把AMQPIncomingMessage对象放入到self.incoming列表中。
  7. 在步骤4中,传入PollStyleListenerAdapter对象中的参数有一个是RpcAMQPListener,在oslo_messaging._drivers.base.PollStyleListenerAdapter.start函数中会创建一个线程,不断的调用oslo_messaging._drivers.amqpdriver.AMQPListener.poll获取得到的消息,这个消息就是AMQPListener对象中添加到self.incoming列表中的AMQPIncomingMessage对象,并交给刚才传入的回调函数_on_incoming来处理,层层调用直到oslo_messaging.rpc.server.RPCServer._process_incoming,完成具体函数的分发。
  8. 消息处理:在_process_incoming函数中会调用self.dispatcher.dispatch(message)进行具体的消息分发,dispatch对象是步骤2中传入oslo_messaging.rpc.server.RPCServer对象的oslo_messaging.rpc.dispatcher.RPCDispatcher对象。从原来的endpoint中找到对应的函数原型,将参数经过deserialize_entity反序列化之后,得到对象实例进行处理,并将返回的消息经过序列化之后再返回给对方(如果有的话)。
self.endpoints=[<nova.compute.manager.ComputeManager>, <nova.baserpc.BaseRPCAPI >]

endpoint=nova.compute.manager.ComputeManager

self.serializer.deserialize_context=<bound method ProfilerRequestContextSerializer.deserialize_context of <nova.rpc.ProfilerRequestContextSerializer >>

最终在oslo_messaging.rpc.dispatcher.RPCDispatcher.dispatch函数中完成类方法的调用和参数的处理

9. 在_process_incoming函数中会调用message.reply(res),实际上调用的函数是oslo_messaging._drivers.amqpdriver.AMQPIncomingMessage.reply函数,会判断是否存在msg_id,这个参数是客户端传过来的,如果不存在这个参数,则直接返回了,存在还要向这个队列中发送消息。

RPC Client

远程调用RPC Server上的方法。远程调用的时候,需要提供一个字典对象指明调用的上下文,调用方法的名字和传递给调用方法的参数。

调用方式:

  1. 通过cast方式,异步远程调用,请求发送之后客户端直接返回。
  2. call方式 同步远程调用,等待服务器响应返回结果。

不管是cast还是call方式,其在业务层的构造方式基本一致:

cctxt = self.router.client(ctxt).prepare(
                server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'rescue_instance', **msg_args)
或者
return cctxt.call(ctxt, 'remove_volume_connection',
                          instance=instance, volume_id=volume_id)

只有调用的方法不一样,cast和call。

需要比较下cast和call的方式在底层实现上有什么不一样!直接比较两个函数中的传参即可看出:

oslo_messaging.rpc.client._BaseCallContext.cast:
self.transport._send(self.target, msg_ctxt, msg, retry=self.retry, transport_options=self.transport_options)

oslo_messaging.rpc.client._BaseCallContext.call:
result = self.transport._send(self.target, msg_ctxt, msg, wait_for_reply=True,
                    timeout=timeout, call_monitor_timeout=cm_timeout,
                    retry=self.retry, transport_options=self.transport_options)

可以通过上述代码看到两者最主要的区别在于在调用transport中的_send函数时,cast没有传time_out参数,而call却传了该参数,并且设置wait_for_reply为true。

跟着函数调用继续:oslo_messaging._drivers.amqpdriver.AMQPDriverBase._send

在这个函数中,会判断wait_for_reply标志位,如果该值为true,则代表是同步消息,会构造msg_id,并通过self._waiter.listen(msg_id)创建一个队列;然后将消息发送给服务端,这一步是cast和call都是一样的流程;后面客户端会继续监听这个msg_id队列,并设置一个超时时间,如果在设置的时间内未监听到服务端发来的消息,则会抛出超时的异常错误;最后会通过self._waiter.unlisten(msg_id)取消队列。

作为服务端,必须注册一组endpoint,这个endpoint就是队列接受消息之后的回调函数。见server基本原理。

在oslo.messaging组件中,oslo_messaging._drivers.impl_rabbit.Connection对象是用来建立连接的,和oslo_messaging._drivers.pool.Pool对象一起构成oslo_messaging._drivers.pool.ConnectionPool对象。

oslo_messaging._drivers.amqpdriver.AMQPDriverBase对象对oslo_messaging._drivers.impl_rabbit.Consumer对象和oslo_messaging._drivers.impl_rabbit.Connection对象进行进一步的封装,其中Connection对象中针对self._producer = kombu.messaging.Producer直接调用驱动的参数,然后再调用self._producer.publish将消息发送到目标route中。AMQPDriverBase对象中包含send,send_notification,listen,listen_for_notifications,虽然底层调用并不会区分是call还是cast请求,作为consumer,也不会区分是否是notification,这些都是在AMQPDriverBase对象中完成的。对于call请求,就是利用ReplyWaiter对象(oslo_messaging._drivers.amqpdriver.ReplyWaiter.wait),重新生成一个consumer进行队列的监听。而在listener端,会注册回调函数(对象) RpcAMQPListener(oslo_messaging._drivers.amqpdriver.RpcAMQPListener),在该对象中,得到消息中的_reply_q,并通过向其发送消息,然后ReplyWaiter对象中的线程会不断的轮询该队列,从而得到返回的消息,从而构成了同步消息流程。

而RPC cast请求不会在消息体中构造reply_q,因此RpcAMQPListener不会publish消息。

RPC Client参数传递

以call方法为例,oslo_messaging.rpc.client._BaseCallContext.call,在该函数中会从nova层传入method(在nova层将被远程调用的函数)和kwargs(在调用函数时需要传入的参数)。在oslo_messaging.rpc.client._BaseCallContext._make_message函数中会将method和对应的参数封装成字典的形式;然后将该参数传入oslo_messaging._drivers.amqpdriver.AMQPDriverBase._send方法中,在该方法中首先会判断是否是call请求,如果是call请求还要在这个消息体重增加_reply_q的字段,最后调用oslo_messaging._drivers.common.serialize_msg函数,将字典形式的msg变为JSON格式的消息再进行传输。

 类似资料: