当前位置: 首页 > 工具软件 > Kombu > 使用案例 >

Kombu小练习

上官迪
2023-12-01

目的:了解Kombu基本用法

例子:producer发送“Hello kombu”,consumer打印收到的信息。

基本思路

== send.py ==

  1. create a connection
  2. create a channel
  3. create a exchange with the channel
  4. (optional) create a queue with the exchange
  5. publish a msg to the exchange

== recv.py ==

  1. create a connection
  2. create a channel
  3. create a exchange with the channel
  4. create a queue with the exchange
  5. create a callback function
  6. create a consumer and start consume
  7. drain_events from that connection

Notes:

  • 双方都要创建connection和channel。特别是channel,一定需要。
    所谓channel:

    When producers and consumers connects to the broker using a TCP socket after authenticating the connection they establish a channel where AMQP commands are sent. The channel is a virtual path inside a TCP connection between this is very useful because there can be multiple channels inside the TCP connection each channels is identified using an unique ID.

  • exchange:

    • 发送方需要发送到某个exchange,所以一定要创建exchange。

    • 接收方因为要绑定/注册queue到exchange上,也得知道exchange,所以也要创建exchange

    • exchange的name属性也很重要,mq-sever不允许重复创建name相同的exchange
    • exchange实际上创建在哪儿?比如mq-server在server1上,producer在server2上,consumer在server3上。 == 实际上exchange和queue都在server1上
  • queue
    • producer可以不知道queue,但是rabbitmq best practice都推荐producer也创建queue

实现

== send.py ==

from kombu.connection import Connection
from kombu.entity import Exchange, Queue
from kombu.messaging import Producer

conn = Connection(hostname='localhost', userid='guest', password='guest')
# 只创建了对象,没有和mq-server真正连接
# In [4]: conn = Connection(hostname='localhost', userid='guest', password='guest')
# In [5]: conn.connected
# Out[5]: False
ch = conn.channel()
# 创建channel之后,才真正连接到了server上
# In [6]: ch = conn.channel()
# In [7]: conn.connected
# Out[7]: True

# 会调用self.transport.create_channel(self.connection),默认情况下:
# ipdb> self.transport.create_channel
# <bound method Transport.create_channel of <kombu.transport.pyamqp.Transport object at 0x7f984b856310>>
# 最终会调用/usr/local/lib/python2.7/dist-packages/amqp/connection.py的Connection类

#direct_exc = Exchange('my_exechange3', 'direct')       # where is channel
direct_exc = Exchange('my_exechange4', 'direct', ch)
# 这一步只是创建Exchange对象,还没有在broker上创建exchange。可以调用direct_exc.declare()来创建。看看下面哪一步真正创建了exchange?
# 创建Exchange对象的时候可以不指定channel,但是这样没有意义,没法调用declare()

#my_q = Queue('my_queue3', exchange=direct_exc, routing_key='my_key')
#my_q = Queue('my_queue4', exchange=direct_exc, routing_key='my_key', channel=ch)
# 发送方可以不用创建Queue
# channel也是要指定的
# 这里也只是创建了一个对象,没有真正创建queue,可以调用my_q.declare()创建

#p = Producer(conn, exchange=direct_exc)
p = Producer(ch, exchange=direct_exc, routing_key='my_key')
#p.publish("Hello kombu", routing_key='my_key')
# 用channel而不是connection,channel和exchange对象必须存在。
# 这一步中真正创建了exchange:Producer类有个属性auto_declare默认为True,初始化时调用self.revive(self, channel)会最终调用self.exchange.declare()

#p.declare()                  # 实际上是declare exchange, self.revive也是调用的它。这里没必要调用。
p.publish("Hello kombu")
# publish的可选参数很多,可以指定exchange, routing_key, serializer等

== recv.py ==

from kombu.connection import Connection
from kombu.entity import Exchange, Queue
from kombu.messaging import Consumer

conn = Connection(hostname='localhost', userid='guest', password='guest')
my_ch = conn.channel()

direct_exc = Exchange('my_exechange4', 'direct', my_ch)

my_q = Queue('my_queue4', exchange=direct_exc, channel=my_ch, routing_key='my_key')
# my_q.declare()
# 只创建了Queue对象,没有真正的queue

def print_msg(body, msg):
    print body
    msg.ack()
# callbacks必须有两个参数(body, message)
    #: List of callbacks called in order when a message is received.
    #:
    #: The signature of the callbacks must take two arguments:
    #: `(body, message)`, which is the decoded message body and
    #: the `Message` instance (a subclass of
    #: :class:`~kombu.transport.base.Message`).

c = Consumer(my_ch, queues=[my_q], callbacks=[print_msg])
# queues是一个list,指定了这个consumer要从哪些queue取消息。和Producer创建exchange类似,如果queue不存在,会调用self.revive(self.channel)创建queue,最终调用的还是self.declare()
# callback是一个list

c.consume()
# 向server注册,表明现在可以接收信息啦。server可以向该consumer发送消息

while True:
    conn.drain_events()
# drain_events参考http://blog.csdn.net/spch2008/article/details/11530007

c.cancel()

参考

http://blog.csdn.net/spch2008/article/details/11530007
http://blog.csdn.net/hackerain/article/details/7875614

 类似资料: