当前位置: 首页 > 工具软件 > Kombu > 使用案例 >

Kombu源码分析(二)Producer 消息发布

赵君植
2023-12-01

本文环境python3.5.2,kombu4.6.8系列
本文主要根据kombu官方用例,来分析逐个分析kombu源码,了解kombu中的主要结构和代码实现

上文主要根据官方示例分析了Connection初始化的源码,本篇将继续根据示例代码讲解Producer的初始化和消息发布的源码,上文中提到,Connection初始化过程是并没有建立连接的,而是在使用时才能建立连接,本篇在Producer发布消息的时候会通过Transport类连接载体,连接的源码本篇也会重点讲解。

下面还是看一下官方示例

from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')

def process_media(body, message):
    print body
    message.ack()

# connections
with Connection('amqp://guest:guest@localhost//') as conn:

    # produce
    producer = conn.Producer(serializer='json')
    producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
                      exchange=media_exchange, routing_key='video',
                      declare=[video_queue])

    # the declare above, makes sure the video queue is declared
    # so that the messages can be delivered.
    # It's a best practice in Kombu to have both publishers and
    # consumers declare the queue. You can also declare the
    # queue manually using:
    #     video_queue(conn).declare()

    # consume
    with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
        # Process messages and handle events on all channels
        while True:
            conn.drain_events()

# Consume from several queues on the same channel:
video_queue = Queue('video', exchange=media_exchange, key='video')
image_queue = Queue('image', exchange=media_exchange, key='image')

with connection.Consumer([video_queue, image_queue],
                         callbacks=[process_media]) as consumer:
    while True:
        connection.drain_events()

示例中,在Connection完成后,就会调用producer方法,并且进行发布消息,下面我们一起研究一下producer方法和消息发布的过程。

    def Producer(self, channel=None, *args, **kwargs):
        """Create new :class:`kombu.Producer` instance."""
        from .messaging import Producer
        return Producer(channel or self, *args, **kwargs)              # 导入Producer类,返回Producer的实例, channel为None,所以第一个参数为self,**kwargs 为serializer='json'

接着看一下Producer初始化过程中做了那些定义

@python_2_unicode_compatible
class Producer(object):
    """Message Producer.

    Arguments:
        channel (kombu.Connection, ChannelT): Connection or channel.
        exchange (kombu.entity.Exchange, str): Optional default exchange.
        routing_key (str): Optional default routing key.
        serializer (str): Default serializer. Default is `"json"`.
        compression (str): Default compression method.
            Default is no compression.
        auto_declare (bool): Automatically declare the default exchange
            at instantiation. Default is :const:`True`.
        on_return (Callable): Callback to call for undeliverable messages,
            when the `mandatory` or `immediate` arguments to
            :meth:`publish` is used. This callback needs the following
            signature: `(exception, exchange, routing_key, message)`.
            Note that the producer needs to drain events to use this feature.
    """

    #: Default exchange
    exchange = None

    #: Default routing key.
    routing_key = ''

    #: Default serializer to use. Default is JSON.
    serializer = None

    #: Default compression method.  Disabled by default.
    compression = None

    #: By default, if a defualt exchange is set,
    #: that exchange will be declare when publishing a message.
    auto_declare = True

    #: Basic return callback.
    on_return = None

    #: Set if channel argument was a Connection instance (using
    #: default_channel).
    __connection__ = None

    def __init__(self, channel, exchange=None, routing_key=None,
                 serializer=None, auto_declare=None, compression=None,
                 on_return=None):
        self._channel = channel                                                  # channel 为上传的self,即connection实例
        self.exchange = exchange
        self.routing_key = routing_key or self.routing_key
        self.serializer = serializer or self.serializer
        self.compression = compression or self.compression
        self.on_return = on_return or self.on_return
        self._channel_promise = None
        if self.exchange is None:
            self.exchange = Exchange('')                                          # self.exchange 为None,所以使用默认的Exchange,这个会在Exchange篇中讲解
        if auto_declare is not None:
            self.auto_declare = auto_declare

        if self._channel:                                                        # 这一步比较重要,建立连接的方法,下面重点讲解,首先我们知道self._channel是connection实例
            self.revive(self._channel)

以上代码为Producer初始化的过程,其中比较重要的是定义并实例化了Exchange,这个会在Exchange篇中讲解,另一点就是self.revive()方法,下面我们看一下这个方法具体做了什么。

    def revive(self, channel):
        """Revive the producer after connection loss."""
        if is_connection(channel):                                               # 判断是否为Connection的实例,True, 运行代码
            connection = channel
            self.__connection__ = connection 
            channel = ChannelPromise(lambda: connection.default_channel)         # channel是Connection的一个轻量化连接,这里比较重要了,Channel初始化的过程就是连接的过程
        if isinstance(channel, ChannelPromise):
            self._channel = channel
            self.exchange = self.exchange(channel)                                # 初始化Exchange
        else:
            # Channel already concrete
            self._channel = channel
            if self.on_return:
                self._channel.events['basic_return'].add(self.on_return)
            self.exchange = self.exchange(channel)      

这里重点看一下connection.default_channel,我们接下来分析一下default_channel方法

    @property
    def default_channel(self):
        """Default channel.

        Created upon access and closed when the connection is closed.

        Note:
            Can be used for automatic channel handling when you only need one
            channel, and also it is the channel implicitly used if
            a connection is passed instead of a channel, to functions that
            require a channel.
        """
        conn_opts = {}
        transport_opts = self.transport_options                                       # Connetion初始化时transport_options为空
        if transport_opts:                                                            # 可以定义的transport连接参数,包括最大连接次数,最大间隔时间等
            if 'max_retries' in transport_opts:
                conn_opts['max_retries'] = transport_opts['max_retries']
            if 'interval_start' in transport_opts:
                conn_opts['interval_start'] = transport_opts['interval_start']
            if 'interval_step' in transport_opts:
                conn_opts['interval_step'] = transport_opts['interval_step']
            if 'interval_max' in transport_opts:
                conn_opts['interval_max'] = transport_opts['interval_max']

        # make sure we're still connected, and if not refresh.
        self.ensure_connection(**conn_opts)                                           # 下面开始要准备建立连接了,其中conn_opts={}
        if self._default_channel is None:
            self._default_channel = self.channel()
        return self._default_channel

这个方法主要是准备了一些连接使用的参数,我们接着看self.ensure_connection(**conn_opts)

    def ensure_connection(self, errback=None, max_retries=None,
                          interval_start=2, interval_step=2, interval_max=30,
                          callback=None, reraise_as_library_errors=True,
                          timeout=None):
        """Ensure we have a connection to the server.

        If not retry establishing the connection with the settings
        specified.

        Arguments:
            errback (Callable): Optional callback called each time the
                connection can't be established.  Arguments provided are
                the exception raised and the interval that will be
                slept ``(exc, interval)``.

            max_retries (int): Maximum number of times to retry.
                If this limit is exceeded the connection error
                will be re-raised.

            interval_start (float): The number of seconds we start
                sleeping for.
            interval_step (float): How many seconds added to the interval
                for each retry.
            interval_max (float): Maximum number of seconds to sleep between
                each retry.
            callback (Callable): Optional callback that is called for every
                internal iteration (1 s).
            timeout (int): Maximum amount of time in seconds to spend
                waiting for connection
        """
        def on_error(exc, intervals, retries, interval=0):
            round = self.completes_cycle(retries)
            if round:
                interval = next(intervals)
            if errback:
                errback(exc, interval)
            self.maybe_switch_next()  # select next host

            return interval if round else 0

        ctx = self._reraise_as_library_errors                                      # 定义了一个上下文管理器,可以使用with打开,定义了各种error,其中使用到了contextlib.contextmanager,感兴趣的同学可以看一下
        if not reraise_as_library_errors:
            ctx = self._dummy_context
        with ctx():
            retry_over_time(self.connect, self.recoverable_connection_errors,      # 根据参数设定,重复连接,其中self.connect方法真正开始连接
                            (), {}, on_error, max_retries,
                            interval_start, interval_step, interval_max,
                            callback, timeout=timeout)
        return self

这里首先使用一个上下文管理,将连接可能遇到的错误封装起来,然后开始调用self.connect函数,我们看一下retry_over_time方法

def retry_over_time(fun, catch, args=None, kwargs=None, errback=None,
                    max_retries=None, interval_start=2, interval_step=2,
                    interval_max=30, callback=None, timeout=None):
    """Retry the function over and over until max retries is exceeded.

    For each retry we sleep a for a while before we try again, this interval
    is increased for every retry until the max seconds is reached.

    Arguments:
        fun (Callable): The function to try
        catch (Tuple[BaseException]): Exceptions to catch, can be either
            tuple or a single exception class.

    Keyword Arguments:
        args (Tuple): Positional arguments passed on to the function.
        kwargs (Dict): Keyword arguments passed on to the function.
        errback (Callable): Callback for when an exception in ``catch``
            is raised.  The callback must take three arguments:
            ``exc``, ``interval_range`` and ``retries``, where ``exc``
            is the exception instance, ``interval_range`` is an iterator
            which return the time in seconds to sleep next, and ``retries``
            is the number of previous retries.
        max_retries (int): Maximum number of retries before we give up.
            If neither of this and timeout is set, we will retry forever.
            If one of this and timeout is reached, stop.
        interval_start (float): How long (in seconds) we start sleeping
            between retries.
        interval_step (float): By how much the interval is increased for
            each retry.
        interval_max (float): Maximum number of seconds to sleep
            between retries.
        timeout (int): Maximum seconds waiting before we give up.
    """
    kwargs = {} if not kwargs else kwargs
    args = [] if not args else args
    interval_range = fxrange(interval_start,
                             interval_max + interval_start,
                             interval_step, repeatlast=True)
    end = time() + timeout if timeout else None
    for retries in count():
        try:
            return fun(*args, **kwargs)                                    # 重点来了,fun即为传来的参数self.connect
        except catch as exc:
            if max_retries is not None and retries >= max_retries:
                raise
            if end and time() > end:
                raise
            if callback:
                callback()
            tts = float(errback(exc, interval_range, retries) if errback
                        else next(interval_range))
            if tts:
                for _ in range(int(tts)):
                    if callback:
                        callback()
                    sleep(1.0)
                # sleep remainder after int truncation above.
                sleep(abs(int(tts) - tts))

这里fun就是self.connect方法,我们看一下connect方法

    def connect(self):
        """Establish connection to server immediately.""" 
        self._closed = False                                             # 定义属性,初始化时self._closed为None
        return self.connection                                           # 调用方法
    @property
    def connection(self):
        """The underlying connection object.

        Warning:
            This instance is transport specific, so do not
            depend on the interface of this object.
        """
        if not self._closed:
            if not self.connected:
                self.declared_entities.clear()
                self._default_channel = None
                self._connection = self._establish_connection()
                self._closed = False
            return self._connection  
    def _establish_connection(self):
        self._debug('establishing connection...') 
        conn = self.transport.establish_connection()                          # Connection初始化时已经声明self.transport,我们直接去看这个方法,真正建立连接的方法
        self._debug('connection established: %r', self)
        return conn  

这三段代码依次调用执行,最后到self.transport.establish_connection() ,这里就要用的transport的建立连接方法,也是最终建立连接的方法,我们一起看一下

    def establish_connection(self):
        """Establish connection to the AMQP broker."""
        conninfo = self.client
        for name, default_value in items(self.default_connection_params):
            if not getattr(conninfo, name, None):
                setattr(conninfo, name, default_value)
        if conninfo.hostname == 'localhost':
            conninfo.hostname = '127.0.0.1'
        opts = dict({                                                            # 连接需要的信息
            'host': conninfo.host,
            'userid': conninfo.userid,
            'password': conninfo.password,
            'login_method': conninfo.login_method,
            'virtual_host': conninfo.virtual_host,
            'insist': conninfo.insist,
            'ssl': conninfo.ssl,
            'connect_timeout': conninfo.connect_timeout,
            'heartbeat': conninfo.heartbeat,
        }, **conninfo.transport_options or {})
        conn = self.Connection(**opts)                                         # 这里调用了ampq包中的Connection类
        conn.client = self.client
        conn.connect()                                                         # 建立连接,这里的方法为ampq包中的方法,在这里就不详细赘述了
        return conn                  

以上便是Producer初始化的过程,也详细讲解了建立连接的过程,通过这里我们可以清楚在Producer初始化过程中,连接就已经建立。
下面我们一起看一下消息是如何发布出去的

    def publish(self, body, routing_key=None, delivery_mode=None,
                mandatory=False, immediate=False, priority=0,
                content_type=None, content_encoding=None, serializer=None,
                headers=None, compression=None, exchange=None, retry=False,
                retry_policy=None, declare=None, expiration=None,
                **properties):
        _publish = self._publish

        declare = [] if declare is None else declare
        headers = {} if headers is None else headers
        retry_policy = {} if retry_policy is None else retry_policy
        routing_key = self.routing_key if routing_key is None else routing_key
        compression = self.compression if compression is None else compression

        exchange_name, properties['delivery_mode'] = self._delivery_details(
            exchange or self.exchange, delivery_mode,
        )

        if expiration is not None:
            properties['expiration'] = str(int(expiration * 1000))

        body, content_type, content_encoding = self._prepare(
            body, serializer, content_type, content_encoding,
            compression, headers)                                                        # 准备数据,数据压缩的过程

        if self.auto_declare and self.exchange.name:
            if self.exchange not in declare:
                # XXX declare should be a Set.
                declare.append(self.exchange)

        if retry:
            _publish = self.connection.ensure(self, _publish, **retry_policy)            # 根据retry,当连接断掉时,重新连接
        return _publish(                                                                 # 重点发布方法
            body, priority, content_type, content_encoding,
            headers, properties, routing_key, mandatory, immediate,
            exchange_name, declare,
        )  

这里只是进行了准备数据的工作,主要是最后调用了_publish方法,我们一起看一下

    def publish(self, body, routing_key=None, delivery_mode=None,
                mandatory=False, immediate=False, priority=0,
                content_type=None, content_encoding=None, serializer=None,
                headers=None, compression=None, exchange=None, retry=False,
                retry_policy=None, declare=None, expiration=None,
                **properties):
        _publish = self._publish

        declare = [] if declare is None else declare
        headers = {} if headers is None else headers
        retry_policy = {} if retry_policy is None else retry_policy
        routing_key = self.routing_key if routing_key is None else routing_key
        compression = self.compression if compression is None else compression

        exchange_name, properties['delivery_mode'] = self._delivery_details(
            exchange or self.exchange, delivery_mode,
        )

        if expiration is not None:
            properties['expiration'] = str(int(expiration * 1000))

        body, content_type, content_encoding = self._prepare(
            body, serializer, content_type, content_encoding,
            compression, headers)                                                        # 准备数据,数据压缩的过程

        if self.auto_declare and self.exchange.name:
            if self.exchange not in declare:
                # XXX declare should be a Set.
                declare.append(self.exchange)

        if retry:
            _publish = self.connection.ensure(self, _publish, **retry_policy)            # 根据retry,当连接断掉时,重新连接
        return _publish(                                                                 # 重点发布方法
            body, priority, content_type, content_encoding,
            headers, properties, routing_key, mandatory, immediate,
            exchange_name, declare,
        )  
    def _publish(self, body, priority, content_type, content_encoding,
                 headers, properties, routing_key, mandatory,
                 immediate, exchange, declare):
        channel = self.channel
        message = channel.prepare_message(
            body, priority, content_type,
            content_encoding, headers, properties,
        )                                                                  # 对信息进行封装为message
        if declare:
            maybe_declare = self.maybe_declare
            [maybe_declare(entity) for entity in declare]                  #  mq 的 queue 进行声明

        # handle autogenerated queue names for reply_to
        reply_to = properties.get('reply_to')
        if isinstance(reply_to, Queue):
            properties['reply_to'] = reply_to.name
        return channel.basic_publish(                                      # 使用channel进行发布消息
            message,
            exchange=exchange, routing_key=routing_key,
            mandatory=mandatory, immediate=immediate,
        )   

最终还是要使用到channel进行消息的发布,我们来看一下basic_publish方法

    def basic_publish(self, message, exchange, routing_key, **kwargs):
        """Publish message."""
        self._inplace_augment_message(message, exchange, routing_key)
        if exchange:
            return self.typeof(exchange).deliver(                         # 使用exchange来发布消息,exchange
                message, exchange, routing_key, **kwargs
            )
        # anon exchange: routing_key is the destination queue
        return self._put(routing_key, message, **kwargs)  
    def deliver(self, message, exchange, routing_key, **kwargs):
        _lookup = self.channel._lookup
        _put = self.channel._put
        deadletter = self.channel.deadletter_queue
        for queue in [q for q in _lookup(exchange, routing_key)                  # 获取队列信息
                      if q and q != deadletter]:
            _put(queue, message, **kwargs)                                       # 使用channel的_put方法发布消息




    def _put(self, queue, message, **kwargs):
        """Deliver message."""
        pri = self._get_message_priority(message, reverse=False)

        with self.conn_or_acquire() as client:                                  # 客户端将消息push到队列中
            client.lpush(self._q_for_pri(queue, pri), dumps(message))

这一系列代码就是消息发布的内容,其中用到了几个重要的模块
Producer 消息压缩,将消息内容封装为Message类
Exchange 获取需要发送的队列信息
Chanel 实际消息发送

总结:
本篇主要讲解了连接过程和消息发布的过程
Connection,当Producer初始化是,真正调用self.connect方法,到预定的Transport类中连接载体,并初始化Chanel,self.chanel = self.connection

Producer 初始化过程完成了连接用的内容
Message 封装消息,这里没有细讲,有兴趣可以看一下,并不复杂
Exchange 将routing_key转为queue,后面的exchange篇会细讲
Channel 最终消息发布

 类似资料: