Celery是项目中使用率最高的异步框架,Celery 通过消息机制进行通信,通常使用中间人(Broker)作为客户端和职程(Worker)调节。启动一个任务,客户端向消息队列发送一条消息,然后中间人(Broker)将消息传递给一个职程(Worker),最后由职程(Worker)进行执行中间人(Broker)分配的任务。支持RabbitMQ、Redis等作为Broker。
Kombu则是Celery重要组成部件。主要实现AMQP中各个结构的抽象,Kombu实现了对AMQP transport和non-AMQP transports(Redis、Amazon SQS、ZoopKeeper等)的兼容。
在 Kombu 中,存在多个概念,其实我们在前边简单的生产/消费者样例中已经看到了了一些,他们分别是:
Producer: 消息发送者
from kombu import Connection, Exchange, Queue
media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')
def process_media(body, message):
print body
# connections
with Connection('amqp://guest:guest@localhost//') as conn:
# produce
producer = conn.Producer(serializer='json')
producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
exchange=media_exchange, routing_key='video',
# the declare above, makes sure the video queue is declared
# so that the messages can be delivered.
# It's a best practice in Kombu to have both publishers and
# consumers declare the queue. You can also declare the
# queue manually using:
# video_queue(conn).declare()
# consume
with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
# Process messages and handle events on all channels
while True:
# Consume from several queues on the same channel:
video_queue = Queue('video', exchange=media_exchange, key='video')
image_queue = Queue('image', exchange=media_exchange, key='image')
with connection.Consumer([video_queue, image_queue],
callbacks=[process_media]) as consumer:
while True:
class Connection(object):
"""A connection to the broker.
>>> Connection('amqp://guest:guest@localhost:5672//')
>>> Connection('amqp://foo;amqp://bar',
... failover_strategy='round-robin')
>>> Connection('redis://', transport_options={
... 'visibility_timeout': 3000,
... })
>>> import ssl
>>> Connection('amqp://', login_method='EXTERNAL', ssl={
... 'ca_certs': '/etc/pki/tls/certs/something.crt',
... 'keyfile': '/etc/something/system.key',
... 'certfile': '/etc/something/system.cert',
... 'cert_reqs': ssl.CERT_REQUIRED,
... })
SSL currently only works with the py-amqp, and qpid
transports. For other transports you can use stunnel.
URL (str, Sequence): Broker URL, or a list of URLs.
Keyword Arguments:
ssl (bool): Use SSL to connect to the server. Default is ``False``.
May not be supported by the specified transport.
transport (Transport): Default transport if not specified in the URL.
connect_timeout (float): Timeout in seconds for connecting to the
server. May not be supported by the specified transport.
transport_options (Dict): A dict of additional connection arguments to
pass to alternate kombu channel implementations. Consult the
transport documentation for available options.
heartbeat (float): Heartbeat interval in int/float seconds.
Note that if heartbeats are enabled then the
:meth:`heartbeat_check` method must be called regularly,
around once per second.
The connection is established lazily when needed. If you need the # 默认为懒建立连接,如果需要强制建立连接,可以使用connect()函数
connection to be established, then force it by calling
>>> conn = Connection('amqp://')
>>> conn.connect()
and always remember to close the connection::
>>> conn.release()
These options have been replaced by the URL argument, but are still
supported for backwards compatibility:
:keyword hostname: Host name/address.
NOTE: You cannot specify both the URL argument and use the hostname
keyword argument at the same time.
:keyword userid: Default user name if not provided in the URL.
:keyword password: Default password if not provided in the URL.
:keyword virtual_host: Default virtual host if not provided in the URL.
:keyword port: Default port if not provided in the URL.
port = None
virtual_host = '/'
connect_timeout = 5
_closed = None # 主要定义默认参数,如是否连接,是否关闭,默认的channel,transport
_connection = None
_default_channel = None
_transport = None
_logger = False
uri_prefix = None
#: The cache of declared entities is per connection,
#: in case the server loses data.
declared_entities = None
#: Iterator returning the next broker URL to try in the event
#: of connection failure (initialized by :attr:`failover_strategy`).
cycle = None
#: Additional transport specific options,
#: passed on to the transport instance.
transport_options = None
#: Strategy used to select new hosts when reconnecting after connection
#: failure. One of "round-robin", "shuffle" or any custom iterator
#: constantly yielding new URLs to try.
failover_strategy = 'round-robin'
#: Heartbeat value, currently only supported by the py-amqp transport.
heartbeat = None
resolve_aliases = resolve_aliases
failover_strategies = failover_strategies
hostname = userid = password = ssl = login_method = None
def __init__(self, hostname='localhost', userid=None, # 以官方实例中为例 hostname='amqp://guest:guest@localhost//'
password=None, virtual_host=None, port=None, insist=False,
ssl=False, transport=None, connect_timeout=5,
transport_options=None, login_method=None, uri_prefix=None,
heartbeat=0, failover_strategy='round-robin',
alternates=None, **kwargs):
alt = [] if alternates is None else alternates
# have to spell the args out, just to get nice docstrings :(
params = self._initial_params = { # 初始化赋值
'hostname': hostname, 'userid': userid,
'password': password, 'virtual_host': virtual_host,
'port': port, 'insist': insist, 'ssl': ssl,
'transport': transport, 'connect_timeout': connect_timeout,
'login_method': login_method, 'heartbeat': heartbeat
if hostname and not isinstance(hostname, string_t): # hostname为字符串,这段代码不执行
hostname = alt[0]
if hostname:
if ';' in hostname:
alt = hostname.split(';') + alt
hostname = alt[0]
if '://' in hostname and '+' in hostname[:hostname.index('://')]: # hostname[:hostname.index('://')] 执行后获得amqp,所以这个if不执行
# e.g. sqla+mysql://root:masterkey@localhost/
params['transport'], params['hostname'] = \
hostname.split('+', 1)
self.uri_prefix = params['transport']
elif '://' in hostname: # 进入此段代码中
transport = transport or urlparse(hostname).scheme # transport 为 amqp
if not get_transport_cls(transport).can_parse_url: # 这里看函数,这里就要开始定义transport了
# we must parse the URL
url_params = parse_url(hostname)
params['transport'] = transport
# fallback hosts
self.alt = alt
# keep text representation for .info
# only temporary solution as this won't work when
# passing a custom object (Issue celery/celery#3320).
self._failover_strategy = failover_strategy or 'round-robin' # 当connect连接失败时,设置重新连接的策略
self.failover_strategy = self.failover_strategies.get( # 循环连接的次数
self._failover_strategy) or self._failover_strategy
if self.alt:
self.cycle = self.failover_strategy(self.alt)
next(self.cycle) # skip first entry
if transport_options is None: # 备用transport信息
transport_options = {}
self.transport_options = transport_options
if _log_connection: # pragma: no cover
self._logger = True
if uri_prefix:
self.uri_prefix = uri_prefix
self.declared_entities = set()
def get_transport_cls(transport=None):
"""Get transport class by name.
The transport string is the full path to a transport class, e.g.::
If the name does not include `"."` (is not fully qualified),
the alias table will be consulted.
if transport not in _transport_cache:
_transport_cache[transport] = resolve_transport(transport) # 调用函数获得transport
return _transport_cache[transport]
def resolve_transport(transport=None):
"""Get transport by name.
transport (Union[str, type]): This can be either
an actual transport class, or the fully qualified
path to a transport class, or the alias of a transport.
if isinstance(transport, string_t):
transport = TRANSPORT_ALIASES[transport] # 根据transport进行匹配,以amqp为例,根据映射匹配到相应的transport类
except KeyError:
if '.' not in transport and ':' not in transport:
from kombu.utils.text import fmatch_best
alt = fmatch_best(transport, TRANSPORT_ALIASES)
if alt:
raise KeyError(
'No such transport: {0}. Did you mean {1}?'.format(
transport, alt))
raise KeyError('No such transport: {0}'.format(transport))
if callable(transport):
transport = transport()
return symbol_by_name(transport)
return transport
'amqp': 'kombu.transport.pyamqp:Transport',
'amqps': 'kombu.transport.pyamqp:SSLTransport',
'pyamqp': 'kombu.transport.pyamqp:Transport',
'librabbitmq': 'kombu.transport.librabbitmq:Transport',
'memory': 'kombu.transport.memory:Transport',
'redis': 'kombu.transport.redis:Transport',
'rediss': 'kombu.transport.redis:Transport',
'SQS': 'kombu.transport.SQS:Transport',
'sqs': 'kombu.transport.SQS:Transport',
'mongodb': 'kombu.transport.mongodb:Transport',
'zookeeper': 'kombu.transport.zookeeper:Transport',
'sqlalchemy': 'kombu.transport.sqlalchemy:Transport',
'sqla': 'kombu.transport.sqlalchemy:Transport',
'SLMQ': 'kombu.transport.SLMQ.Transport',
'slmq': 'kombu.transport.SLMQ.Transport',
'filesystem': 'kombu.transport.filesystem:Transport',
'qpid': 'kombu.transport.qpid:Transport',
'sentinel': 'kombu.transport.redis:SentinelTransport',
'consul': 'kombu.transport.consul:Transport',
'etcd': 'kombu.transport.etcd:Transport',
'azurestoragequeues': 'kombu.transport.azurestoragequeues:Transport',
'azureservicebus': 'kombu.transport.azureservicebus:Transport',
'pyro': 'kombu.transport.pyro:Transport'
def connect(self):
"""Establish connection to server immediately."""
self._closed = False # 定义参数self._closed为False,初始化时为None
return self.connection # 调用connection函数
def connection(self):
"""The underlying connection object.
This instance is transport specific, so do not
depend on the interface of this object.
if not self._closed:
if not self.connected: # 两个参数均为False
self.declared_entities.clear() # The cache of declared entities is per connection, in case the server loses data.
self._default_channel = None # 默认channel
self._connection = self._establish_connection()
self._closed = False
return self._connection
def _establish_connection(self):
self._debug('establishing connection...')
conn = self.transport.establish_connection() # 调用相应transport 的建立连接的方法,根据初始化时确定的transport查看方法
self._debug('connection established: %r', self)
return conn