目的:了解Kombu基本用法
例子:producer发送“Hello kombu”,consumer打印收到的信息。
== send.py ==
== recv.py ==
Notes:
When producers and consumers connects to the broker using a TCP socket after authenticating the connection they establish a channel where AMQP commands are sent. The channel is a virtual path inside a TCP connection between this is very useful because there can be multiple channels inside the TCP connection each channels is identified using an unique ID.
exchange:
发送方需要发送到某个exchange,所以一定要创建exchange。
接收方因为要绑定/注册queue到exchange上,也得知道exchange,所以也要创建exchange
== send.py ==
from kombu.connection import Connection
from kombu.entity import Exchange, Queue
from kombu.messaging import Producer
conn = Connection(hostname='localhost', userid='guest', password='guest')
# 只创建了对象,没有和mq-server真正连接
# In [4]: conn = Connection(hostname='localhost', userid='guest', password='guest')
# In [5]: conn.connected
# Out[5]: False
ch = conn.channel()
# 创建channel之后,才真正连接到了server上
# In [6]: ch = conn.channel()
# In [7]: conn.connected
# Out[7]: True
# 会调用self.transport.create_channel(self.connection),默认情况下:
# ipdb> self.transport.create_channel
# <bound method Transport.create_channel of <kombu.transport.pyamqp.Transport object at 0x7f984b856310>>
# 最终会调用/usr/local/lib/python2.7/dist-packages/amqp/connection.py的Connection类
#direct_exc = Exchange('my_exechange3', 'direct') # where is channel
direct_exc = Exchange('my_exechange4', 'direct', ch)
# 这一步只是创建Exchange对象,还没有在broker上创建exchange。可以调用direct_exc.declare()来创建。看看下面哪一步真正创建了exchange?
# 创建Exchange对象的时候可以不指定channel,但是这样没有意义,没法调用declare()
#my_q = Queue('my_queue3', exchange=direct_exc, routing_key='my_key')
#my_q = Queue('my_queue4', exchange=direct_exc, routing_key='my_key', channel=ch)
# 发送方可以不用创建Queue
# channel也是要指定的
# 这里也只是创建了一个对象,没有真正创建queue,可以调用my_q.declare()创建
#p = Producer(conn, exchange=direct_exc)
p = Producer(ch, exchange=direct_exc, routing_key='my_key')
#p.publish("Hello kombu", routing_key='my_key')
# 用channel而不是connection,channel和exchange对象必须存在。
# 这一步中真正创建了exchange:Producer类有个属性auto_declare默认为True,初始化时调用self.revive(self, channel)会最终调用self.exchange.declare()
#p.declare() # 实际上是declare exchange, self.revive也是调用的它。这里没必要调用。
p.publish("Hello kombu")
# publish的可选参数很多,可以指定exchange, routing_key, serializer等
== recv.py ==
from kombu.connection import Connection
from kombu.entity import Exchange, Queue
from kombu.messaging import Consumer
conn = Connection(hostname='localhost', userid='guest', password='guest')
my_ch = conn.channel()
direct_exc = Exchange('my_exechange4', 'direct', my_ch)
my_q = Queue('my_queue4', exchange=direct_exc, channel=my_ch, routing_key='my_key')
# my_q.declare()
# 只创建了Queue对象,没有真正的queue
def print_msg(body, msg):
print body
msg.ack()
# callbacks必须有两个参数(body, message)
#: List of callbacks called in order when a message is received.
#:
#: The signature of the callbacks must take two arguments:
#: `(body, message)`, which is the decoded message body and
#: the `Message` instance (a subclass of
#: :class:`~kombu.transport.base.Message`).
c = Consumer(my_ch, queues=[my_q], callbacks=[print_msg])
# queues是一个list,指定了这个consumer要从哪些queue取消息。和Producer创建exchange类似,如果queue不存在,会调用self.revive(self.channel)创建queue,最终调用的还是self.declare()
# callback是一个list
c.consume()
# 向server注册,表明现在可以接收信息啦。server可以向该consumer发送消息
while True:
conn.drain_events()
# drain_events参考http://blog.csdn.net/spch2008/article/details/11530007
c.cancel()
http://blog.csdn.net/spch2008/article/details/11530007
http://blog.csdn.net/hackerain/article/details/7875614