当前位置: 首页 > 工具软件 > Kombu > 使用案例 >

Kombu源码分析(一) Connection

郑星雨
2023-12-01

本文环境python3.5.2,kombu4.6.8系列
本文主要根据kombu官方用例,来分析逐个分析kombu源码,了解kombu中的主要结构和代码实现。

Celery是项目中使用率最高的异步框架,Celery 通过消息机制进行通信,通常使用中间人(Broker)作为客户端和职程(Worker)调节。启动一个任务,客户端向消息队列发送一条消息,然后中间人(Broker)将消息传递给一个职程(Worker),最后由职程(Worker)进行执行中间人(Broker)分配的任务。支持RabbitMQ、Redis等作为Broker。
Kombu则是Celery重要组成部件。主要实现AMQP中各个结构的抽象,Kombu实现了对AMQP transport和non-AMQP transports(Redis、Amazon SQS、ZoopKeeper等)的兼容。

在 Kombu 中,存在多个概念,其实我们在前边简单的生产/消费者样例中已经看到了了一些,他们分别是:

Message:消息,发送和消费的主体
Producer: 消息发送者
Consumer:消息接收者
Exchange:交换机,消息发送者将消息发至Exchange,Exchange负责将消息分发至队列
Queue:消息队列,消息内容的载体,负责接受发送者消息,接收者获取消息
Connection:对消息队列连接的抽象
Channel:与AMQP中概念类似,可以理解成共享一个Connection的多个轻量化连接
Transport:真实的MQ连接,区分底层消息队列的实现,对于不同的Transport的支持

官方示范用例
from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')

def process_media(body, message):
    print body
    message.ack()

# connections
with Connection('amqp://guest:guest@localhost//') as conn:

    # produce
    producer = conn.Producer(serializer='json')
    producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
                      exchange=media_exchange, routing_key='video',
                      declare=[video_queue])

    # the declare above, makes sure the video queue is declared
    # so that the messages can be delivered.
    # It's a best practice in Kombu to have both publishers and
    # consumers declare the queue. You can also declare the
    # queue manually using:
    #     video_queue(conn).declare()

    # consume
    with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
        # Process messages and handle events on all channels
        while True:
            conn.drain_events()

# Consume from several queues on the same channel:
video_queue = Queue('video', exchange=media_exchange, key='video')
image_queue = Queue('image', exchange=media_exchange, key='image')

with connection.Consumer([video_queue, image_queue],
                         callbacks=[process_media]) as consumer:
    while True:
        connection.drain_events()

首先从connection开始看一下源码中定义了那些参数信息

@python_2_unicode_compatible
class Connection(object):
    """A connection to the broker.

    Example:
        >>> Connection('amqp://guest:guest@localhost:5672//')
        >>> Connection('amqp://foo;amqp://bar',
        ...            failover_strategy='round-robin')
        >>> Connection('redis://', transport_options={
        ...     'visibility_timeout': 3000,
        ... })

        >>> import ssl
        >>> Connection('amqp://', login_method='EXTERNAL', ssl={
        ...    'ca_certs': '/etc/pki/tls/certs/something.crt',
        ...    'keyfile': '/etc/something/system.key',
        ...    'certfile': '/etc/something/system.cert',
        ...    'cert_reqs': ssl.CERT_REQUIRED,
        ... })

    Note:
        SSL currently only works with the py-amqp, and qpid
        transports.  For other transports you can use stunnel.

    Arguments:
        URL (str, Sequence): Broker URL, or a list of URLs.

    Keyword Arguments:
        ssl (bool): Use SSL to connect to the server. Default is ``False``.
            May not be supported by the specified transport.
        transport (Transport): Default transport if not specified in the URL.
        connect_timeout (float): Timeout in seconds for connecting to the
            server. May not be supported by the specified transport.
        transport_options (Dict): A dict of additional connection arguments to
            pass to alternate kombu channel implementations.  Consult the
            transport documentation for available options.
        heartbeat (float): Heartbeat interval in int/float seconds.
            Note that if heartbeats are enabled then the
            :meth:`heartbeat_check` method must be called regularly,
            around once per second.

    Note:
        The connection is established lazily when needed. If you need the         # 默认为懒建立连接,如果需要强制建立连接,可以使用connect()函数
        connection to be established, then force it by calling
        :meth:`connect`::

            >>> conn = Connection('amqp://')
            >>> conn.connect()

        and always remember to close the connection::

            >>> conn.release()

    These options have been replaced by the URL argument, but are still
    supported for backwards compatibility:

    :keyword hostname: Host name/address.
        NOTE: You cannot specify both the URL argument and use the hostname
        keyword argument at the same time.
    :keyword userid: Default user name if not provided in the URL.
    :keyword password: Default password if not provided in the URL.
    :keyword virtual_host: Default virtual host if not provided in the URL.
    :keyword port: Default port if not provided in the URL.
    """

    port = None
    virtual_host = '/'
    connect_timeout = 5

    _closed = None                                                # 主要定义默认参数,如是否连接,是否关闭,默认的channel,transport
    _connection = None
    _default_channel = None
    _transport = None
    _logger = False
    uri_prefix = None

    #: The cache of declared entities is per connection,
    #: in case the server loses data.
    declared_entities = None

    #: Iterator returning the next broker URL to try in the event
    #: of connection failure (initialized by :attr:`failover_strategy`).
    cycle = None

    #: Additional transport specific options,
    #: passed on to the transport instance.
    transport_options = None

    #: Strategy used to select new hosts when reconnecting after connection
    #: failure.  One of "round-robin", "shuffle" or any custom iterator
    #: constantly yielding new URLs to try.
    failover_strategy = 'round-robin'

    #: Heartbeat value, currently only supported by the py-amqp transport.
    heartbeat = None

    resolve_aliases = resolve_aliases
    failover_strategies = failover_strategies

    hostname = userid = password = ssl = login_method = None

    def __init__(self, hostname='localhost', userid=None,                        # 以官方实例中为例 hostname='amqp://guest:guest@localhost//'
                 password=None, virtual_host=None, port=None, insist=False,
                 ssl=False, transport=None, connect_timeout=5,
                 transport_options=None, login_method=None, uri_prefix=None,
                 heartbeat=0, failover_strategy='round-robin',
                 alternates=None, **kwargs):
        alt = [] if alternates is None else alternates
        # have to spell the args out, just to get nice docstrings :(
        params = self._initial_params = {                                        # 初始化赋值
            'hostname': hostname, 'userid': userid,
            'password': password, 'virtual_host': virtual_host,
            'port': port, 'insist': insist, 'ssl': ssl,
            'transport': transport, 'connect_timeout': connect_timeout,
            'login_method': login_method, 'heartbeat': heartbeat
        }

        if hostname and not isinstance(hostname, string_t):                      # hostname为字符串,这段代码不执行
            alt.extend(hostname)
            hostname = alt[0]
            params.update(hostname=hostname)
        if hostname:
            if ';' in hostname:
                alt = hostname.split(';') + alt
                hostname = alt[0]
                params.update(hostname=hostname)
            if '://' in hostname and '+' in hostname[:hostname.index('://')]:     # hostname[:hostname.index('://')] 执行后获得amqp,所以这个if不执行
                # e.g. sqla+mysql://root:masterkey@localhost/
                params['transport'], params['hostname'] = \
                    hostname.split('+', 1)
                self.uri_prefix = params['transport']
            elif '://' in hostname:                                               # 进入此段代码中
                transport = transport or urlparse(hostname).scheme                # transport 为 amqp
                if not get_transport_cls(transport).can_parse_url:                # 这里看函数,这里就要开始定义transport了
                    # we must parse the URL
                    url_params = parse_url(hostname)
                    params.update(
                        dictfilter(url_params),
                        hostname=url_params['hostname'],
                    )

                params['transport'] = transport

        self._init_params(**params)

        # fallback hosts
        self.alt = alt
        # keep text representation for .info
        # only temporary solution as this won't work when
        # passing a custom object (Issue celery/celery#3320).
        self._failover_strategy = failover_strategy or 'round-robin'           # 当connect连接失败时,设置重新连接的策略
        self.failover_strategy = self.failover_strategies.get(                 # 循环连接的次数
            self._failover_strategy) or self._failover_strategy
        if self.alt:
            self.cycle = self.failover_strategy(self.alt)
            next(self.cycle)  # skip first entry

        if transport_options is None:                                            # 备用transport信息
            transport_options = {}
        self.transport_options = transport_options

        if _log_connection:  # pragma: no cover
            self._logger = True

        if uri_prefix:
            self.uri_prefix = uri_prefix

        self.declared_entities = set()

现在的连接其实并未真正建立,只有在需要使用的时候才真正建立连接并将连接缓存。
初始化过程中,定义了各项参数,其中比较重要的就是通过transport参数获得transport的抽象类。主要是通过了上面使用的get_transport_cls方法

def get_transport_cls(transport=None):
    """Get transport class by name.

    The transport string is the full path to a transport class, e.g.::

        "kombu.transport.pyamqp:Transport"

    If the name does not include `"."` (is not fully qualified),
    the alias table will be consulted.
    """
    if transport not in _transport_cache:
        _transport_cache[transport] = resolve_transport(transport)        # 调用函数获得transport
    return _transport_cache[transport]



def resolve_transport(transport=None):
    """Get transport by name.

    Arguments:
        transport (Union[str, type]): This can be either
            an actual transport class, or the fully qualified
            path to a transport class, or the alias of a transport.
    """
    if isinstance(transport, string_t):
        try:
            transport = TRANSPORT_ALIASES[transport]                       # 根据transport进行匹配,以amqp为例,根据映射匹配到相应的transport类
        except KeyError:
            if '.' not in transport and ':' not in transport:
                from kombu.utils.text import fmatch_best
                alt = fmatch_best(transport, TRANSPORT_ALIASES)
                if alt:
                    raise KeyError(
                        'No such transport: {0}.  Did you mean {1}?'.format(
                            transport, alt))
                raise KeyError('No such transport: {0}'.format(transport))
        else:
            if callable(transport):
                transport = transport()
        return symbol_by_name(transport)
    return transport


TRANSPORT_ALIASES = {
    'amqp': 'kombu.transport.pyamqp:Transport',
    'amqps': 'kombu.transport.pyamqp:SSLTransport',
    'pyamqp': 'kombu.transport.pyamqp:Transport',
    'librabbitmq': 'kombu.transport.librabbitmq:Transport',
    'memory': 'kombu.transport.memory:Transport',
    'redis': 'kombu.transport.redis:Transport',
    'rediss': 'kombu.transport.redis:Transport',
    'SQS': 'kombu.transport.SQS:Transport',
    'sqs': 'kombu.transport.SQS:Transport',
    'mongodb': 'kombu.transport.mongodb:Transport',
    'zookeeper': 'kombu.transport.zookeeper:Transport',
    'sqlalchemy': 'kombu.transport.sqlalchemy:Transport',
    'sqla': 'kombu.transport.sqlalchemy:Transport',
    'SLMQ': 'kombu.transport.SLMQ.Transport',
    'slmq': 'kombu.transport.SLMQ.Transport',
    'filesystem': 'kombu.transport.filesystem:Transport',
    'qpid': 'kombu.transport.qpid:Transport',
    'sentinel': 'kombu.transport.redis:SentinelTransport',
    'consul': 'kombu.transport.consul:Transport',
    'etcd': 'kombu.transport.etcd:Transport',
    'azurestoragequeues': 'kombu.transport.azurestoragequeues:Transport',
    'azureservicebus': 'kombu.transport.azureservicebus:Transport',
    'pyro': 'kombu.transport.pyro:Transport'
}

通过以上方法,获得transport抽象类,以上是Connection类初始化的过程,由代码可见,初始化过程中并没有与transport进行连接,而是在使用的过程中才能连接,下面看一下主动连接的过程
通过调用connect函数可以主动连接

connection.connect()

下面我们一起研究一下connect函数

    def connect(self):
        """Establish connection to server immediately."""
        self._closed = False                                   # 定义参数self._closed为False,初始化时为None
        return self.connection                                 # 调用connection函数


    @property
    def connection(self):
        """The underlying connection object.

        Warning:
            This instance is transport specific, so do not
            depend on the interface of this object.
        """
        if not self._closed:                                   
            if not self.connected:                             # 两个参数均为False
                self.declared_entities.clear()                 # The cache of declared entities is per connection, in case the server loses data.
                self._default_channel = None                   # 默认channel
                self._connection = self._establish_connection()
                self._closed = False
            return self._connection  

    def _establish_connection(self):
        self._debug('establishing connection...')
        conn = self.transport.establish_connection()           # 调用相应transport 的建立连接的方法,根据初始化时确定的transport查看方法
        self._debug('connection established: %r', self)
        return conn

通过代码可以看出,最后是通过调用相应的transport中的establish_connection()来连接的,transport后面我们会以ampq为例子进行仔细的讲解。

总结:本篇主要概述了kumbo的功能模块,并且根据官方用例,我们分析了Connection类初始化的过程,其中重点为根据参数hostname选择出相应的transport类,下一篇将进行Producer的讲解。

 类似资料: