本文环境python3.5.2,kombu4.6.8系列
本文主要根据kombu官方用例,来分析逐个分析kombu源码,了解kombu中的主要结构和代码实现
上文主要根据官方示例分析了Connection初始化的源码,本篇将继续根据示例代码讲解Producer的初始化和消息发布的源码,上文中提到,Connection初始化过程是并没有建立连接的,而是在使用时才能建立连接,本篇在Producer发布消息的时候会通过Transport类连接载体,连接的源码本篇也会重点讲解。
下面还是看一下官方示例
from kombu import Connection, Exchange, Queue
media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')
def process_media(body, message):
print body
message.ack()
# connections
with Connection('amqp://guest:guest@localhost//') as conn:
# produce
producer = conn.Producer(serializer='json')
producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
exchange=media_exchange, routing_key='video',
declare=[video_queue])
# the declare above, makes sure the video queue is declared
# so that the messages can be delivered.
# It's a best practice in Kombu to have both publishers and
# consumers declare the queue. You can also declare the
# queue manually using:
# video_queue(conn).declare()
# consume
with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
# Process messages and handle events on all channels
while True:
conn.drain_events()
# Consume from several queues on the same channel:
video_queue = Queue('video', exchange=media_exchange, key='video')
image_queue = Queue('image', exchange=media_exchange, key='image')
with connection.Consumer([video_queue, image_queue],
callbacks=[process_media]) as consumer:
while True:
connection.drain_events()
示例中,在Connection完成后,就会调用producer方法,并且进行发布消息,下面我们一起研究一下producer方法和消息发布的过程。
def Producer(self, channel=None, *args, **kwargs):
"""Create new :class:`kombu.Producer` instance."""
from .messaging import Producer
return Producer(channel or self, *args, **kwargs) # 导入Producer类,返回Producer的实例, channel为None,所以第一个参数为self,**kwargs 为serializer='json'
接着看一下Producer初始化过程中做了那些定义
@python_2_unicode_compatible
class Producer(object):
"""Message Producer.
Arguments:
channel (kombu.Connection, ChannelT): Connection or channel.
exchange (kombu.entity.Exchange, str): Optional default exchange.
routing_key (str): Optional default routing key.
serializer (str): Default serializer. Default is `"json"`.
compression (str): Default compression method.
Default is no compression.
auto_declare (bool): Automatically declare the default exchange
at instantiation. Default is :const:`True`.
on_return (Callable): Callback to call for undeliverable messages,
when the `mandatory` or `immediate` arguments to
:meth:`publish` is used. This callback needs the following
signature: `(exception, exchange, routing_key, message)`.
Note that the producer needs to drain events to use this feature.
"""
#: Default exchange
exchange = None
#: Default routing key.
routing_key = ''
#: Default serializer to use. Default is JSON.
serializer = None
#: Default compression method. Disabled by default.
compression = None
#: By default, if a defualt exchange is set,
#: that exchange will be declare when publishing a message.
auto_declare = True
#: Basic return callback.
on_return = None
#: Set if channel argument was a Connection instance (using
#: default_channel).
__connection__ = None
def __init__(self, channel, exchange=None, routing_key=None,
serializer=None, auto_declare=None, compression=None,
on_return=None):
self._channel = channel # channel 为上传的self,即connection实例
self.exchange = exchange
self.routing_key = routing_key or self.routing_key
self.serializer = serializer or self.serializer
self.compression = compression or self.compression
self.on_return = on_return or self.on_return
self._channel_promise = None
if self.exchange is None:
self.exchange = Exchange('') # self.exchange 为None,所以使用默认的Exchange,这个会在Exchange篇中讲解
if auto_declare is not None:
self.auto_declare = auto_declare
if self._channel: # 这一步比较重要,建立连接的方法,下面重点讲解,首先我们知道self._channel是connection实例
self.revive(self._channel)
以上代码为Producer初始化的过程,其中比较重要的是定义并实例化了Exchange,这个会在Exchange篇中讲解,另一点就是self.revive()方法,下面我们看一下这个方法具体做了什么。
def revive(self, channel):
"""Revive the producer after connection loss."""
if is_connection(channel): # 判断是否为Connection的实例,True, 运行代码
connection = channel
self.__connection__ = connection
channel = ChannelPromise(lambda: connection.default_channel) # channel是Connection的一个轻量化连接,这里比较重要了,Channel初始化的过程就是连接的过程
if isinstance(channel, ChannelPromise):
self._channel = channel
self.exchange = self.exchange(channel) # 初始化Exchange
else:
# Channel already concrete
self._channel = channel
if self.on_return:
self._channel.events['basic_return'].add(self.on_return)
self.exchange = self.exchange(channel)
这里重点看一下connection.default_channel,我们接下来分析一下default_channel方法
@property
def default_channel(self):
"""Default channel.
Created upon access and closed when the connection is closed.
Note:
Can be used for automatic channel handling when you only need one
channel, and also it is the channel implicitly used if
a connection is passed instead of a channel, to functions that
require a channel.
"""
conn_opts = {}
transport_opts = self.transport_options # Connetion初始化时transport_options为空
if transport_opts: # 可以定义的transport连接参数,包括最大连接次数,最大间隔时间等
if 'max_retries' in transport_opts:
conn_opts['max_retries'] = transport_opts['max_retries']
if 'interval_start' in transport_opts:
conn_opts['interval_start'] = transport_opts['interval_start']
if 'interval_step' in transport_opts:
conn_opts['interval_step'] = transport_opts['interval_step']
if 'interval_max' in transport_opts:
conn_opts['interval_max'] = transport_opts['interval_max']
# make sure we're still connected, and if not refresh.
self.ensure_connection(**conn_opts) # 下面开始要准备建立连接了,其中conn_opts={}
if self._default_channel is None:
self._default_channel = self.channel()
return self._default_channel
这个方法主要是准备了一些连接使用的参数,我们接着看self.ensure_connection(**conn_opts)
def ensure_connection(self, errback=None, max_retries=None,
interval_start=2, interval_step=2, interval_max=30,
callback=None, reraise_as_library_errors=True,
timeout=None):
"""Ensure we have a connection to the server.
If not retry establishing the connection with the settings
specified.
Arguments:
errback (Callable): Optional callback called each time the
connection can't be established. Arguments provided are
the exception raised and the interval that will be
slept ``(exc, interval)``.
max_retries (int): Maximum number of times to retry.
If this limit is exceeded the connection error
will be re-raised.
interval_start (float): The number of seconds we start
sleeping for.
interval_step (float): How many seconds added to the interval
for each retry.
interval_max (float): Maximum number of seconds to sleep between
each retry.
callback (Callable): Optional callback that is called for every
internal iteration (1 s).
timeout (int): Maximum amount of time in seconds to spend
waiting for connection
"""
def on_error(exc, intervals, retries, interval=0):
round = self.completes_cycle(retries)
if round:
interval = next(intervals)
if errback:
errback(exc, interval)
self.maybe_switch_next() # select next host
return interval if round else 0
ctx = self._reraise_as_library_errors # 定义了一个上下文管理器,可以使用with打开,定义了各种error,其中使用到了contextlib.contextmanager,感兴趣的同学可以看一下
if not reraise_as_library_errors:
ctx = self._dummy_context
with ctx():
retry_over_time(self.connect, self.recoverable_connection_errors, # 根据参数设定,重复连接,其中self.connect方法真正开始连接
(), {}, on_error, max_retries,
interval_start, interval_step, interval_max,
callback, timeout=timeout)
return self
这里首先使用一个上下文管理,将连接可能遇到的错误封装起来,然后开始调用self.connect函数,我们看一下retry_over_time方法
def retry_over_time(fun, catch, args=None, kwargs=None, errback=None,
max_retries=None, interval_start=2, interval_step=2,
interval_max=30, callback=None, timeout=None):
"""Retry the function over and over until max retries is exceeded.
For each retry we sleep a for a while before we try again, this interval
is increased for every retry until the max seconds is reached.
Arguments:
fun (Callable): The function to try
catch (Tuple[BaseException]): Exceptions to catch, can be either
tuple or a single exception class.
Keyword Arguments:
args (Tuple): Positional arguments passed on to the function.
kwargs (Dict): Keyword arguments passed on to the function.
errback (Callable): Callback for when an exception in ``catch``
is raised. The callback must take three arguments:
``exc``, ``interval_range`` and ``retries``, where ``exc``
is the exception instance, ``interval_range`` is an iterator
which return the time in seconds to sleep next, and ``retries``
is the number of previous retries.
max_retries (int): Maximum number of retries before we give up.
If neither of this and timeout is set, we will retry forever.
If one of this and timeout is reached, stop.
interval_start (float): How long (in seconds) we start sleeping
between retries.
interval_step (float): By how much the interval is increased for
each retry.
interval_max (float): Maximum number of seconds to sleep
between retries.
timeout (int): Maximum seconds waiting before we give up.
"""
kwargs = {} if not kwargs else kwargs
args = [] if not args else args
interval_range = fxrange(interval_start,
interval_max + interval_start,
interval_step, repeatlast=True)
end = time() + timeout if timeout else None
for retries in count():
try:
return fun(*args, **kwargs) # 重点来了,fun即为传来的参数self.connect
except catch as exc:
if max_retries is not None and retries >= max_retries:
raise
if end and time() > end:
raise
if callback:
callback()
tts = float(errback(exc, interval_range, retries) if errback
else next(interval_range))
if tts:
for _ in range(int(tts)):
if callback:
callback()
sleep(1.0)
# sleep remainder after int truncation above.
sleep(abs(int(tts) - tts))
这里fun就是self.connect方法,我们看一下connect方法
def connect(self):
"""Establish connection to server immediately."""
self._closed = False # 定义属性,初始化时self._closed为None
return self.connection # 调用方法
@property
def connection(self):
"""The underlying connection object.
Warning:
This instance is transport specific, so do not
depend on the interface of this object.
"""
if not self._closed:
if not self.connected:
self.declared_entities.clear()
self._default_channel = None
self._connection = self._establish_connection()
self._closed = False
return self._connection
def _establish_connection(self):
self._debug('establishing connection...')
conn = self.transport.establish_connection() # Connection初始化时已经声明self.transport,我们直接去看这个方法,真正建立连接的方法
self._debug('connection established: %r', self)
return conn
这三段代码依次调用执行,最后到self.transport.establish_connection() ,这里就要用的transport的建立连接方法,也是最终建立连接的方法,我们一起看一下
def establish_connection(self):
"""Establish connection to the AMQP broker."""
conninfo = self.client
for name, default_value in items(self.default_connection_params):
if not getattr(conninfo, name, None):
setattr(conninfo, name, default_value)
if conninfo.hostname == 'localhost':
conninfo.hostname = '127.0.0.1'
opts = dict({ # 连接需要的信息
'host': conninfo.host,
'userid': conninfo.userid,
'password': conninfo.password,
'login_method': conninfo.login_method,
'virtual_host': conninfo.virtual_host,
'insist': conninfo.insist,
'ssl': conninfo.ssl,
'connect_timeout': conninfo.connect_timeout,
'heartbeat': conninfo.heartbeat,
}, **conninfo.transport_options or {})
conn = self.Connection(**opts) # 这里调用了ampq包中的Connection类
conn.client = self.client
conn.connect() # 建立连接,这里的方法为ampq包中的方法,在这里就不详细赘述了
return conn
以上便是Producer初始化的过程,也详细讲解了建立连接的过程,通过这里我们可以清楚在Producer初始化过程中,连接就已经建立。
下面我们一起看一下消息是如何发布出去的
def publish(self, body, routing_key=None, delivery_mode=None,
mandatory=False, immediate=False, priority=0,
content_type=None, content_encoding=None, serializer=None,
headers=None, compression=None, exchange=None, retry=False,
retry_policy=None, declare=None, expiration=None,
**properties):
_publish = self._publish
declare = [] if declare is None else declare
headers = {} if headers is None else headers
retry_policy = {} if retry_policy is None else retry_policy
routing_key = self.routing_key if routing_key is None else routing_key
compression = self.compression if compression is None else compression
exchange_name, properties['delivery_mode'] = self._delivery_details(
exchange or self.exchange, delivery_mode,
)
if expiration is not None:
properties['expiration'] = str(int(expiration * 1000))
body, content_type, content_encoding = self._prepare(
body, serializer, content_type, content_encoding,
compression, headers) # 准备数据,数据压缩的过程
if self.auto_declare and self.exchange.name:
if self.exchange not in declare:
# XXX declare should be a Set.
declare.append(self.exchange)
if retry:
_publish = self.connection.ensure(self, _publish, **retry_policy) # 根据retry,当连接断掉时,重新连接
return _publish( # 重点发布方法
body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory, immediate,
exchange_name, declare,
)
这里只是进行了准备数据的工作,主要是最后调用了_publish方法,我们一起看一下
def publish(self, body, routing_key=None, delivery_mode=None,
mandatory=False, immediate=False, priority=0,
content_type=None, content_encoding=None, serializer=None,
headers=None, compression=None, exchange=None, retry=False,
retry_policy=None, declare=None, expiration=None,
**properties):
_publish = self._publish
declare = [] if declare is None else declare
headers = {} if headers is None else headers
retry_policy = {} if retry_policy is None else retry_policy
routing_key = self.routing_key if routing_key is None else routing_key
compression = self.compression if compression is None else compression
exchange_name, properties['delivery_mode'] = self._delivery_details(
exchange or self.exchange, delivery_mode,
)
if expiration is not None:
properties['expiration'] = str(int(expiration * 1000))
body, content_type, content_encoding = self._prepare(
body, serializer, content_type, content_encoding,
compression, headers) # 准备数据,数据压缩的过程
if self.auto_declare and self.exchange.name:
if self.exchange not in declare:
# XXX declare should be a Set.
declare.append(self.exchange)
if retry:
_publish = self.connection.ensure(self, _publish, **retry_policy) # 根据retry,当连接断掉时,重新连接
return _publish( # 重点发布方法
body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory, immediate,
exchange_name, declare,
)
def _publish(self, body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory,
immediate, exchange, declare):
channel = self.channel
message = channel.prepare_message(
body, priority, content_type,
content_encoding, headers, properties,
) # 对信息进行封装为message
if declare:
maybe_declare = self.maybe_declare
[maybe_declare(entity) for entity in declare] # mq 的 queue 进行声明
# handle autogenerated queue names for reply_to
reply_to = properties.get('reply_to')
if isinstance(reply_to, Queue):
properties['reply_to'] = reply_to.name
return channel.basic_publish( # 使用channel进行发布消息
message,
exchange=exchange, routing_key=routing_key,
mandatory=mandatory, immediate=immediate,
)
最终还是要使用到channel进行消息的发布,我们来看一下basic_publish方法
def basic_publish(self, message, exchange, routing_key, **kwargs):
"""Publish message."""
self._inplace_augment_message(message, exchange, routing_key)
if exchange:
return self.typeof(exchange).deliver( # 使用exchange来发布消息,exchange
message, exchange, routing_key, **kwargs
)
# anon exchange: routing_key is the destination queue
return self._put(routing_key, message, **kwargs)
def deliver(self, message, exchange, routing_key, **kwargs):
_lookup = self.channel._lookup
_put = self.channel._put
deadletter = self.channel.deadletter_queue
for queue in [q for q in _lookup(exchange, routing_key) # 获取队列信息
if q and q != deadletter]:
_put(queue, message, **kwargs) # 使用channel的_put方法发布消息
def _put(self, queue, message, **kwargs):
"""Deliver message."""
pri = self._get_message_priority(message, reverse=False)
with self.conn_or_acquire() as client: # 客户端将消息push到队列中
client.lpush(self._q_for_pri(queue, pri), dumps(message))
这一系列代码就是消息发布的内容,其中用到了几个重要的模块
Producer 消息压缩,将消息内容封装为Message类
Exchange 获取需要发送的队列信息
Chanel 实际消息发送
总结:
本篇主要讲解了连接过程和消息发布的过程
Connection,当Producer初始化是,真正调用self.connect方法,到预定的Transport类中连接载体,并初始化Chanel,self.chanel = self.connectionProducer 初始化过程完成了连接用的内容
Message 封装消息,这里没有细讲,有兴趣可以看一下,并不复杂
Exchange 将routing_key转为queue,后面的exchange篇会细讲
Channel 最终消息发布