from kombu import Connection, Exchange, Queue
media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')
def process_media(body, message):
print body
# connections
with Connection('amqp://guest:guest@localhost//') as conn:
# produce
producer = conn.Producer(serializer='json')
producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
exchange=media_exchange, routing_key='video',
# the declare above, makes sure the video queue is declared
# so that the messages can be delivered.
# It's a best practice in Kombu to have both publishers and
# consumers declare the queue. You can also declare the
# queue manually using:
# video_queue(conn).declare()
# consume
with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
# Process messages and handle events on all channels
while True:
# Consume from several queues on the same channel:
video_queue = Queue('video', exchange=media_exchange, key='video')
image_queue = Queue('image', exchange=media_exchange, key='image')
with connection.Consumer([video_queue, image_queue],
callbacks=[process_media]) as consumer:
while True:
def Producer(self, channel=None, *args, **kwargs):
"""Create new :class:`kombu.Producer` instance."""
from .messaging import Producer
return Producer(channel or self, *args, **kwargs) # 导入Producer类,返回Producer的实例, channel为None,所以第一个参数为self,**kwargs 为serializer='json'
class Producer(object):
"""Message Producer.
channel (kombu.Connection, ChannelT): Connection or channel.
exchange (kombu.entity.Exchange, str): Optional default exchange.
routing_key (str): Optional default routing key.
serializer (str): Default serializer. Default is `"json"`.
compression (str): Default compression method.
Default is no compression.
auto_declare (bool): Automatically declare the default exchange
at instantiation. Default is :const:`True`.
on_return (Callable): Callback to call for undeliverable messages,
when the `mandatory` or `immediate` arguments to
:meth:`publish` is used. This callback needs the following
signature: `(exception, exchange, routing_key, message)`.
Note that the producer needs to drain events to use this feature.
#: Default exchange
exchange = None
#: Default routing key.
routing_key = ''
#: Default serializer to use. Default is JSON.
serializer = None
#: Default compression method. Disabled by default.
compression = None
#: By default, if a defualt exchange is set,
#: that exchange will be declare when publishing a message.
auto_declare = True
#: Basic return callback.
on_return = None
#: Set if channel argument was a Connection instance (using
#: default_channel).
__connection__ = None
def __init__(self, channel, exchange=None, routing_key=None,
serializer=None, auto_declare=None, compression=None,
self._channel = channel # channel 为上传的self,即connection实例
self.exchange = exchange
self.routing_key = routing_key or self.routing_key
self.serializer = serializer or self.serializer
self.compression = compression or self.compression
self.on_return = on_return or self.on_return
self._channel_promise = None
if self.exchange is None:
self.exchange = Exchange('') # self.exchange 为None,所以使用默认的Exchange,这个会在Exchange篇中讲解
if auto_declare is not None:
self.auto_declare = auto_declare
if self._channel: # 这一步比较重要,建立连接的方法,下面重点讲解,首先我们知道self._channel是connection实例
def revive(self, channel):
"""Revive the producer after connection loss."""
if is_connection(channel): # 判断是否为Connection的实例,True, 运行代码
connection = channel
self.__connection__ = connection
channel = ChannelPromise(lambda: connection.default_channel) # channel是Connection的一个轻量化连接,这里比较重要了,Channel初始化的过程就是连接的过程
if isinstance(channel, ChannelPromise):
self._channel = channel
self.exchange = self.exchange(channel) # 初始化Exchange
# Channel already concrete
self._channel = channel
if self.on_return:
self.exchange = self.exchange(channel)
def default_channel(self):
"""Default channel.
Created upon access and closed when the connection is closed.
Can be used for automatic channel handling when you only need one
channel, and also it is the channel implicitly used if
a connection is passed instead of a channel, to functions that
require a channel.
conn_opts = {}
transport_opts = self.transport_options # Connetion初始化时transport_options为空
if transport_opts: # 可以定义的transport连接参数,包括最大连接次数,最大间隔时间等
if 'max_retries' in transport_opts:
conn_opts['max_retries'] = transport_opts['max_retries']
if 'interval_start' in transport_opts:
conn_opts['interval_start'] = transport_opts['interval_start']
if 'interval_step' in transport_opts:
conn_opts['interval_step'] = transport_opts['interval_step']
if 'interval_max' in transport_opts:
conn_opts['interval_max'] = transport_opts['interval_max']
# make sure we're still connected, and if not refresh.
self.ensure_connection(**conn_opts) # 下面开始要准备建立连接了,其中conn_opts={}
if self._default_channel is None:
self._default_channel = self.channel()
return self._default_channel
def ensure_connection(self, errback=None, max_retries=None,
interval_start=2, interval_step=2, interval_max=30,
callback=None, reraise_as_library_errors=True,
"""Ensure we have a connection to the server.
If not retry establishing the connection with the settings
errback (Callable): Optional callback called each time the
connection can't be established. Arguments provided are
the exception raised and the interval that will be
slept ``(exc, interval)``.
max_retries (int): Maximum number of times to retry.
If this limit is exceeded the connection error
will be re-raised.
interval_start (float): The number of seconds we start
sleeping for.
interval_step (float): How many seconds added to the interval
for each retry.
interval_max (float): Maximum number of seconds to sleep between
each retry.
callback (Callable): Optional callback that is called for every
internal iteration (1 s).
timeout (int): Maximum amount of time in seconds to spend
waiting for connection
def on_error(exc, intervals, retries, interval=0):
round = self.completes_cycle(retries)
if round:
interval = next(intervals)
if errback:
errback(exc, interval)
self.maybe_switch_next() # select next host
return interval if round else 0
ctx = self._reraise_as_library_errors # 定义了一个上下文管理器,可以使用with打开,定义了各种error,其中使用到了contextlib.contextmanager,感兴趣的同学可以看一下
if not reraise_as_library_errors:
ctx = self._dummy_context
with ctx():
retry_over_time(self.connect, self.recoverable_connection_errors, # 根据参数设定,重复连接,其中self.connect方法真正开始连接
(), {}, on_error, max_retries,
interval_start, interval_step, interval_max,
callback, timeout=timeout)
return self
def retry_over_time(fun, catch, args=None, kwargs=None, errback=None,
max_retries=None, interval_start=2, interval_step=2,
interval_max=30, callback=None, timeout=None):
"""Retry the function over and over until max retries is exceeded.
For each retry we sleep a for a while before we try again, this interval
is increased for every retry until the max seconds is reached.
fun (Callable): The function to try
catch (Tuple[BaseException]): Exceptions to catch, can be either
tuple or a single exception class.
Keyword Arguments:
args (Tuple): Positional arguments passed on to the function.
kwargs (Dict): Keyword arguments passed on to the function.
errback (Callable): Callback for when an exception in ``catch``
is raised. The callback must take three arguments:
``exc``, ``interval_range`` and ``retries``, where ``exc``
is the exception instance, ``interval_range`` is an iterator
which return the time in seconds to sleep next, and ``retries``
is the number of previous retries.
max_retries (int): Maximum number of retries before we give up.
If neither of this and timeout is set, we will retry forever.
If one of this and timeout is reached, stop.
interval_start (float): How long (in seconds) we start sleeping
between retries.
interval_step (float): By how much the interval is increased for
each retry.
interval_max (float): Maximum number of seconds to sleep
between retries.
timeout (int): Maximum seconds waiting before we give up.
kwargs = {} if not kwargs else kwargs
args = [] if not args else args
interval_range = fxrange(interval_start,
interval_max + interval_start,
interval_step, repeatlast=True)
end = time() + timeout if timeout else None
for retries in count():
return fun(*args, **kwargs) # 重点来了,fun即为传来的参数self.connect
except catch as exc:
if max_retries is not None and retries >= max_retries:
if end and time() > end:
if callback:
tts = float(errback(exc, interval_range, retries) if errback
else next(interval_range))
if tts:
for _ in range(int(tts)):
if callback:
# sleep remainder after int truncation above.
sleep(abs(int(tts) - tts))
def connect(self):
"""Establish connection to server immediately."""
self._closed = False # 定义属性,初始化时self._closed为None
return self.connection # 调用方法
def connection(self):
"""The underlying connection object.
This instance is transport specific, so do not
depend on the interface of this object.
if not self._closed:
if not self.connected:
self._default_channel = None
self._connection = self._establish_connection()
self._closed = False
return self._connection
def _establish_connection(self):
self._debug('establishing connection...')
conn = self.transport.establish_connection() # Connection初始化时已经声明self.transport,我们直接去看这个方法,真正建立连接的方法
self._debug('connection established: %r', self)
return conn
这三段代码依次调用执行,最后到self.transport.establish_connection() ,这里就要用的transport的建立连接方法,也是最终建立连接的方法,我们一起看一下
def establish_connection(self):
"""Establish connection to the AMQP broker."""
conninfo = self.client
for name, default_value in items(self.default_connection_params):
if not getattr(conninfo, name, None):
setattr(conninfo, name, default_value)
if conninfo.hostname == 'localhost':
conninfo.hostname = ''
opts = dict({ # 连接需要的信息
'host': conninfo.host,
'userid': conninfo.userid,
'password': conninfo.password,
'login_method': conninfo.login_method,
'virtual_host': conninfo.virtual_host,
'insist': conninfo.insist,
'ssl': conninfo.ssl,
'connect_timeout': conninfo.connect_timeout,
'heartbeat': conninfo.heartbeat,
}, **conninfo.transport_options or {})
conn = self.Connection(**opts) # 这里调用了ampq包中的Connection类
conn.client = self.client
conn.connect() # 建立连接,这里的方法为ampq包中的方法,在这里就不详细赘述了
return conn
def publish(self, body, routing_key=None, delivery_mode=None,
mandatory=False, immediate=False, priority=0,
content_type=None, content_encoding=None, serializer=None,
headers=None, compression=None, exchange=None, retry=False,
retry_policy=None, declare=None, expiration=None,
_publish = self._publish
declare = [] if declare is None else declare
headers = {} if headers is None else headers
retry_policy = {} if retry_policy is None else retry_policy
routing_key = self.routing_key if routing_key is None else routing_key
compression = self.compression if compression is None else compression
exchange_name, properties['delivery_mode'] = self._delivery_details(
exchange or self.exchange, delivery_mode,
if expiration is not None:
properties['expiration'] = str(int(expiration * 1000))
body, content_type, content_encoding = self._prepare(
body, serializer, content_type, content_encoding,
compression, headers) # 准备数据,数据压缩的过程
if self.auto_declare and self.exchange.name:
if self.exchange not in declare:
# XXX declare should be a Set.
if retry:
_publish = self.connection.ensure(self, _publish, **retry_policy) # 根据retry,当连接断掉时,重新连接
return _publish( # 重点发布方法
body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory, immediate,
exchange_name, declare,
def publish(self, body, routing_key=None, delivery_mode=None,
mandatory=False, immediate=False, priority=0,
content_type=None, content_encoding=None, serializer=None,
headers=None, compression=None, exchange=None, retry=False,
retry_policy=None, declare=None, expiration=None,
_publish = self._publish
declare = [] if declare is None else declare
headers = {} if headers is None else headers
retry_policy = {} if retry_policy is None else retry_policy
routing_key = self.routing_key if routing_key is None else routing_key
compression = self.compression if compression is None else compression
exchange_name, properties['delivery_mode'] = self._delivery_details(
exchange or self.exchange, delivery_mode,
if expiration is not None:
properties['expiration'] = str(int(expiration * 1000))
body, content_type, content_encoding = self._prepare(
body, serializer, content_type, content_encoding,
compression, headers) # 准备数据,数据压缩的过程
if self.auto_declare and self.exchange.name:
if self.exchange not in declare:
# XXX declare should be a Set.
if retry:
_publish = self.connection.ensure(self, _publish, **retry_policy) # 根据retry,当连接断掉时,重新连接
return _publish( # 重点发布方法
body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory, immediate,
exchange_name, declare,
def _publish(self, body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory,
immediate, exchange, declare):
channel = self.channel
message = channel.prepare_message(
body, priority, content_type,
content_encoding, headers, properties,
) # 对信息进行封装为message
if declare:
maybe_declare = self.maybe_declare
[maybe_declare(entity) for entity in declare] # mq 的 queue 进行声明
# handle autogenerated queue names for reply_to
reply_to = properties.get('reply_to')
if isinstance(reply_to, Queue):
properties['reply_to'] = reply_to.name
return channel.basic_publish( # 使用channel进行发布消息
exchange=exchange, routing_key=routing_key,
mandatory=mandatory, immediate=immediate,
def basic_publish(self, message, exchange, routing_key, **kwargs):
"""Publish message."""
self._inplace_augment_message(message, exchange, routing_key)
if exchange:
return self.typeof(exchange).deliver( # 使用exchange来发布消息,exchange
message, exchange, routing_key, **kwargs
# anon exchange: routing_key is the destination queue
return self._put(routing_key, message, **kwargs)
def deliver(self, message, exchange, routing_key, **kwargs):
_lookup = self.channel._lookup
_put = self.channel._put
deadletter = self.channel.deadletter_queue
for queue in [q for q in _lookup(exchange, routing_key) # 获取队列信息
if q and q != deadletter]:
_put(queue, message, **kwargs) # 使用channel的_put方法发布消息
def _put(self, queue, message, **kwargs):
"""Deliver message."""
pri = self._get_message_priority(message, reverse=False)
with self.conn_or_acquire() as client: # 客户端将消息push到队列中
client.lpush(self._q_for_pri(queue, pri), dumps(message))
Producer 消息压缩,将消息内容封装为Message类
Exchange 获取需要发送的队列信息
Chanel 实际消息发送
Connection,当Producer初始化是,真正调用self.connect方法,到预定的Transport类中连接载体,并初始化Chanel,self.chanel = self.connectionProducer 初始化过程完成了连接用的内容
Message 封装消息,这里没有细讲,有兴趣可以看一下,并不复杂
Exchange 将routing_key转为queue,后面的exchange篇会细讲
Channel 最终消息发布