本文环境python3.5.2,kombu4.6.8系列
本文主要根据kombu官方用例,来分析逐个分析kombu源码,了解kombu中的主要结构和代码实现。
Celery是项目中使用率最高的异步框架,Celery 通过消息机制进行通信,通常使用中间人(Broker)作为客户端和职程(Worker)调节。启动一个任务,客户端向消息队列发送一条消息,然后中间人(Broker)将消息传递给一个职程(Worker),最后由职程(Worker)进行执行中间人(Broker)分配的任务。支持RabbitMQ、Redis等作为Broker。
Kombu则是Celery重要组成部件。主要实现AMQP中各个结构的抽象,Kombu实现了对AMQP transport和non-AMQP transports(Redis、Amazon SQS、ZoopKeeper等)的兼容。
在 Kombu 中,存在多个概念,其实我们在前边简单的生产/消费者样例中已经看到了了一些,他们分别是:
Message:消息,发送和消费的主体
Producer: 消息发送者
Consumer:消息接收者
Exchange:交换机,消息发送者将消息发至Exchange,Exchange负责将消息分发至队列
Queue:消息队列,消息内容的载体,负责接受发送者消息,接收者获取消息
Connection:对消息队列连接的抽象
Channel:与AMQP中概念类似,可以理解成共享一个Connection的多个轻量化连接
Transport:真实的MQ连接,区分底层消息队列的实现,对于不同的Transport的支持
from kombu import Connection, Exchange, Queue
media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')
def process_media(body, message):
print body
message.ack()
# connections
with Connection('amqp://guest:guest@localhost//') as conn:
# produce
producer = conn.Producer(serializer='json')
producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
exchange=media_exchange, routing_key='video',
declare=[video_queue])
# the declare above, makes sure the video queue is declared
# so that the messages can be delivered.
# It's a best practice in Kombu to have both publishers and
# consumers declare the queue. You can also declare the
# queue manually using:
# video_queue(conn).declare()
# consume
with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
# Process messages and handle events on all channels
while True:
conn.drain_events()
# Consume from several queues on the same channel:
video_queue = Queue('video', exchange=media_exchange, key='video')
image_queue = Queue('image', exchange=media_exchange, key='image')
with connection.Consumer([video_queue, image_queue],
callbacks=[process_media]) as consumer:
while True:
connection.drain_events()
首先从connection开始看一下源码中定义了那些参数信息
@python_2_unicode_compatible
class Connection(object):
"""A connection to the broker.
Example:
>>> Connection('amqp://guest:guest@localhost:5672//')
>>> Connection('amqp://foo;amqp://bar',
... failover_strategy='round-robin')
>>> Connection('redis://', transport_options={
... 'visibility_timeout': 3000,
... })
>>> import ssl
>>> Connection('amqp://', login_method='EXTERNAL', ssl={
... 'ca_certs': '/etc/pki/tls/certs/something.crt',
... 'keyfile': '/etc/something/system.key',
... 'certfile': '/etc/something/system.cert',
... 'cert_reqs': ssl.CERT_REQUIRED,
... })
Note:
SSL currently only works with the py-amqp, and qpid
transports. For other transports you can use stunnel.
Arguments:
URL (str, Sequence): Broker URL, or a list of URLs.
Keyword Arguments:
ssl (bool): Use SSL to connect to the server. Default is ``False``.
May not be supported by the specified transport.
transport (Transport): Default transport if not specified in the URL.
connect_timeout (float): Timeout in seconds for connecting to the
server. May not be supported by the specified transport.
transport_options (Dict): A dict of additional connection arguments to
pass to alternate kombu channel implementations. Consult the
transport documentation for available options.
heartbeat (float): Heartbeat interval in int/float seconds.
Note that if heartbeats are enabled then the
:meth:`heartbeat_check` method must be called regularly,
around once per second.
Note:
The connection is established lazily when needed. If you need the # 默认为懒建立连接,如果需要强制建立连接,可以使用connect()函数
connection to be established, then force it by calling
:meth:`connect`::
>>> conn = Connection('amqp://')
>>> conn.connect()
and always remember to close the connection::
>>> conn.release()
These options have been replaced by the URL argument, but are still
supported for backwards compatibility:
:keyword hostname: Host name/address.
NOTE: You cannot specify both the URL argument and use the hostname
keyword argument at the same time.
:keyword userid: Default user name if not provided in the URL.
:keyword password: Default password if not provided in the URL.
:keyword virtual_host: Default virtual host if not provided in the URL.
:keyword port: Default port if not provided in the URL.
"""
port = None
virtual_host = '/'
connect_timeout = 5
_closed = None # 主要定义默认参数,如是否连接,是否关闭,默认的channel,transport
_connection = None
_default_channel = None
_transport = None
_logger = False
uri_prefix = None
#: The cache of declared entities is per connection,
#: in case the server loses data.
declared_entities = None
#: Iterator returning the next broker URL to try in the event
#: of connection failure (initialized by :attr:`failover_strategy`).
cycle = None
#: Additional transport specific options,
#: passed on to the transport instance.
transport_options = None
#: Strategy used to select new hosts when reconnecting after connection
#: failure. One of "round-robin", "shuffle" or any custom iterator
#: constantly yielding new URLs to try.
failover_strategy = 'round-robin'
#: Heartbeat value, currently only supported by the py-amqp transport.
heartbeat = None
resolve_aliases = resolve_aliases
failover_strategies = failover_strategies
hostname = userid = password = ssl = login_method = None
def __init__(self, hostname='localhost', userid=None, # 以官方实例中为例 hostname='amqp://guest:guest@localhost//'
password=None, virtual_host=None, port=None, insist=False,
ssl=False, transport=None, connect_timeout=5,
transport_options=None, login_method=None, uri_prefix=None,
heartbeat=0, failover_strategy='round-robin',
alternates=None, **kwargs):
alt = [] if alternates is None else alternates
# have to spell the args out, just to get nice docstrings :(
params = self._initial_params = { # 初始化赋值
'hostname': hostname, 'userid': userid,
'password': password, 'virtual_host': virtual_host,
'port': port, 'insist': insist, 'ssl': ssl,
'transport': transport, 'connect_timeout': connect_timeout,
'login_method': login_method, 'heartbeat': heartbeat
}
if hostname and not isinstance(hostname, string_t): # hostname为字符串,这段代码不执行
alt.extend(hostname)
hostname = alt[0]
params.update(hostname=hostname)
if hostname:
if ';' in hostname:
alt = hostname.split(';') + alt
hostname = alt[0]
params.update(hostname=hostname)
if '://' in hostname and '+' in hostname[:hostname.index('://')]: # hostname[:hostname.index('://')] 执行后获得amqp,所以这个if不执行
# e.g. sqla+mysql://root:masterkey@localhost/
params['transport'], params['hostname'] = \
hostname.split('+', 1)
self.uri_prefix = params['transport']
elif '://' in hostname: # 进入此段代码中
transport = transport or urlparse(hostname).scheme # transport 为 amqp
if not get_transport_cls(transport).can_parse_url: # 这里看函数,这里就要开始定义transport了
# we must parse the URL
url_params = parse_url(hostname)
params.update(
dictfilter(url_params),
hostname=url_params['hostname'],
)
params['transport'] = transport
self._init_params(**params)
# fallback hosts
self.alt = alt
# keep text representation for .info
# only temporary solution as this won't work when
# passing a custom object (Issue celery/celery#3320).
self._failover_strategy = failover_strategy or 'round-robin' # 当connect连接失败时,设置重新连接的策略
self.failover_strategy = self.failover_strategies.get( # 循环连接的次数
self._failover_strategy) or self._failover_strategy
if self.alt:
self.cycle = self.failover_strategy(self.alt)
next(self.cycle) # skip first entry
if transport_options is None: # 备用transport信息
transport_options = {}
self.transport_options = transport_options
if _log_connection: # pragma: no cover
self._logger = True
if uri_prefix:
self.uri_prefix = uri_prefix
self.declared_entities = set()
现在的连接其实并未真正建立,只有在需要使用的时候才真正建立连接并将连接缓存。
初始化过程中,定义了各项参数,其中比较重要的就是通过transport参数获得transport的抽象类。主要是通过了上面使用的get_transport_cls方法
def get_transport_cls(transport=None):
"""Get transport class by name.
The transport string is the full path to a transport class, e.g.::
"kombu.transport.pyamqp:Transport"
If the name does not include `"."` (is not fully qualified),
the alias table will be consulted.
"""
if transport not in _transport_cache:
_transport_cache[transport] = resolve_transport(transport) # 调用函数获得transport
return _transport_cache[transport]
def resolve_transport(transport=None):
"""Get transport by name.
Arguments:
transport (Union[str, type]): This can be either
an actual transport class, or the fully qualified
path to a transport class, or the alias of a transport.
"""
if isinstance(transport, string_t):
try:
transport = TRANSPORT_ALIASES[transport] # 根据transport进行匹配,以amqp为例,根据映射匹配到相应的transport类
except KeyError:
if '.' not in transport and ':' not in transport:
from kombu.utils.text import fmatch_best
alt = fmatch_best(transport, TRANSPORT_ALIASES)
if alt:
raise KeyError(
'No such transport: {0}. Did you mean {1}?'.format(
transport, alt))
raise KeyError('No such transport: {0}'.format(transport))
else:
if callable(transport):
transport = transport()
return symbol_by_name(transport)
return transport
TRANSPORT_ALIASES = {
'amqp': 'kombu.transport.pyamqp:Transport',
'amqps': 'kombu.transport.pyamqp:SSLTransport',
'pyamqp': 'kombu.transport.pyamqp:Transport',
'librabbitmq': 'kombu.transport.librabbitmq:Transport',
'memory': 'kombu.transport.memory:Transport',
'redis': 'kombu.transport.redis:Transport',
'rediss': 'kombu.transport.redis:Transport',
'SQS': 'kombu.transport.SQS:Transport',
'sqs': 'kombu.transport.SQS:Transport',
'mongodb': 'kombu.transport.mongodb:Transport',
'zookeeper': 'kombu.transport.zookeeper:Transport',
'sqlalchemy': 'kombu.transport.sqlalchemy:Transport',
'sqla': 'kombu.transport.sqlalchemy:Transport',
'SLMQ': 'kombu.transport.SLMQ.Transport',
'slmq': 'kombu.transport.SLMQ.Transport',
'filesystem': 'kombu.transport.filesystem:Transport',
'qpid': 'kombu.transport.qpid:Transport',
'sentinel': 'kombu.transport.redis:SentinelTransport',
'consul': 'kombu.transport.consul:Transport',
'etcd': 'kombu.transport.etcd:Transport',
'azurestoragequeues': 'kombu.transport.azurestoragequeues:Transport',
'azureservicebus': 'kombu.transport.azureservicebus:Transport',
'pyro': 'kombu.transport.pyro:Transport'
}
通过以上方法,获得transport抽象类,以上是Connection类初始化的过程,由代码可见,初始化过程中并没有与transport进行连接,而是在使用的过程中才能连接,下面看一下主动连接的过程
通过调用connect函数可以主动连接
connection.connect()
下面我们一起研究一下connect函数
def connect(self):
"""Establish connection to server immediately."""
self._closed = False # 定义参数self._closed为False,初始化时为None
return self.connection # 调用connection函数
@property
def connection(self):
"""The underlying connection object.
Warning:
This instance is transport specific, so do not
depend on the interface of this object.
"""
if not self._closed:
if not self.connected: # 两个参数均为False
self.declared_entities.clear() # The cache of declared entities is per connection, in case the server loses data.
self._default_channel = None # 默认channel
self._connection = self._establish_connection()
self._closed = False
return self._connection
def _establish_connection(self):
self._debug('establishing connection...')
conn = self.transport.establish_connection() # 调用相应transport 的建立连接的方法,根据初始化时确定的transport查看方法
self._debug('connection established: %r', self)
return conn
通过代码可以看出,最后是通过调用相应的transport中的establish_connection()来连接的,transport后面我们会以ampq为例子进行仔细的讲解。
总结:本篇主要概述了kumbo的功能模块,并且根据官方用例,我们分析了Connection类初始化的过程,其中重点为根据参数hostname选择出相应的transport类,下一篇将进行Producer的讲解。