相比于pika,kombu考虑的更全面,如它支持重连策略、支持connection pool和producer pool、故障转移策略等,于是我选择了kombu作为RabbitMQ的client端。
kombu文档
翻译自kombu文档:通过kombu.pools.connections可以得到一个连接池,传入一个连接实例,kombu.pools.connections会返回一个连接池实例。如果创建连接实例所传入的参数是一样的,如Connection(‘redis://localhost:6379’),则会返回相同的l连接池,代码如下:
>>> from kombu import Connection
>>> from kombu.pools import connections
>>> connections[Connection('redis://localhost:6379')]
<kombu.connection.ConnectionPool object at 0x101805650>
>>> connections[Connection('redis://localhost:6379')]
<kombu.connection.ConnectionPool object at 0x101805650>
from kombu import Connection
from kombu.pools import connections
connection = Connection('redis://localhost:6379')
"""
block=True意味着如果连接池中的连接都被占用,则会阻塞。注意如果你的代码中没有正确地释放
连接池的连接,则会造成死锁;为了防止该事情发生,可以设置timeout 参数,具体查看
kombu.connection.Resource.acquire()的使用
"""
with connections[connection].acquire(block=True) as conn:
print('Got connection: {0!r}'.format(conn.as_uri()))
还有更多的用法,例如连接多个消息中间件、producer pool的使用、自定义pool groups等,请查看kombu文档。
可以通过Connection的实例来创建生产者对象:
>>> producer = connection.Producer()
又或者通过Producer来实例化一个生产者对象,但需要传入一个channel或connection对象:
>>> from kombu import Connection, Producer
>>> with Connection('amqp://') as conn:
... with conn.channel() as channel:
... producer = Producer(channel)
获得生产者对象后,就可以发布消息到连接的RabbitMQ队列了:
>>> from kombu import Exchange, Queue
>>> # 实例一个队列对象,默认durable=True
>>> task_queue = Queue('tasks', Exchange('tasks'), routing_key='tasks')
>>> producer.publish(
... {'hello': 'world'},
... exchange=task_queue.exchange,
... routing_key=task_queue.routing_key,
... declare=[task_queue], # declares exchange, queue and binds before sending message
... retry=True,
... retry_policy={ # 重试策略
... 'interval_start': 0, # First retry immediately,
... 'interval_step': 2, # then increase by 2s for every retry.
... 'interval_max': 30, # but don't exceed 30s between retries.
... 'max_retries': 30, # give up after 30 tries.
... },
... )
publish方法更多的参数,请查看kombu文档。
通过Consumer来接收消息,需要传入connection实例对象,需要接收消息的队列(或列表),处理收到消息的回调函数列表,接收消息的格式,代码如下:
from kombu import Queue, Exchange, Connection
connection = Connection('redis://localhost:6379')
queue = Queue('tasks', Exchange('tasks'), routing_key='tasks')
# 必须接收body和message两个参数
def process_message(body, message):
print(body)
print("调用grpc服务")
# 返回acknowledge,消费者已经处理了该消息
message.ack()
with Consumer(connection, queues=queue, callbacks=[process_message], accept=['json']):
while True:
print("Start receiving tasks")
self.conn.drain_events()
或者用ConsumerMixin类来定义消费者:
from kombu.mixins import ConsumerMixin
class C(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [
Consumer(queues, callbacks=[self.on_message], accept=['json']),
]
def on_message(self, body, message):
print('RECEIVED MESSAGE: {0!r}'.format(body))
message.ack()
C(connection).run()
更多用法请查看kombu文档。