本文地址:http://blog.csdn.net/spch2008/article/details/11571491
from kombu.entity import Exchange, Queue
from kombu.messaging import Consumer
from kombu.connection import Connection
connection = Connection('amqp://guest:bupt@172.16.4.1:5672//')
channel = connection.channel()
news_exchange = Exchange('news', type='topic')
science_news = Queue('science_news', exchange=news_exchange,routing_key='news')
bound_science_news = science_news(channel)
bound_science_news.declare()
第11行声明一个队列,仅仅是描述名字与配置,不能进行操作。
第12行将队列与channel进行绑定,与Exchange绑定类似,内部记住channel,然后复制一份返回。
第13行在server上创建一个队列。
class kombu.Queue(name='', exchange=None, routing_key='', channel=None, bindings=None, on_declared=None, **kwargs)
Parameters:
name – 队列名字
exchange – 绑定的exchange
routing_key – 路由,exchange根据routing_key来路由消息
channel – 绑定的channel
durable – Server重启,queue是否消失
exclusive – 队列专有,属于当前连接(connection)
auto_delete – 所有的consumer使用完后,自动删除。如果exclusive被设定,
则auto_delete默认被设定为True.
queue_declare(self, nowait=False, passive=False) server上创建队列
nowait True不等待server的响应信息
passive True server不创建队列,可以用来检测当前server是否存在该队列
queue_bind(self, nowait=False) queue与Exchange绑定
declare(self, nowait=False) 创建队列,并与Exchange绑定,内部调用queue_declare,queue_bind
purge(self, nowait=False) 删除队列中所有信息
delete(self, if_unused=False, if_empty=False, nowait=False) 删除队列
if_unused 队列没被使用时删除
if_empty 队列空时删除
consume(consumer_tag='', callback=None, no_ack=None, nowait=False)
文档上说该函数“Start a queue consumer.”,这里的queue consumer不同于类Consumer。我认为:就是注册一个回调函数并进行相应配置。
而Consumer类中的consume也是调用Queue中的consume:
def consume(self, no_ack=None):
self._basic_consume(T, no_ack=no_ack, nowait=False)
def _basic_consume(self, queue, consumer_tag=None,
no_ack=no_ack, nowait=True):
queue.consume(tag, self._receive_callback,
no_ack=no_ack, nowait=nowait)
return tag
_receive_callback对用户提供的callback进行了封装。