当前位置: 首页 > 工具软件 > Kombu > 使用案例 >

Python消息库——kombu的简单使用

阎修明
2023-12-01

相比于pika,kombu考虑的更全面,如它支持重连策略、支持connection pool和producer pool、故障转移策略等,于是我选择了kombu作为RabbitMQ的client端。
kombu文档

连接池(connection pools)

1、创建连接池

翻译自kombu文档:通过kombu.pools.connections可以得到一个连接池,传入一个连接实例,kombu.pools.connections会返回一个连接池实例。如果创建连接实例所传入的参数是一样的,如Connection(‘redis://localhost:6379’),则会返回相同的l连接池,代码如下:

>>> from kombu import Connection
>>> from kombu.pools import connections

>>> connections[Connection('redis://localhost:6379')]
<kombu.connection.ConnectionPool object at 0x101805650>
>>> connections[Connection('redis://localhost:6379')]
<kombu.connection.ConnectionPool object at 0x101805650>

2、从连接池获取和释放连接

from kombu import Connection
from kombu.pools import connections

connection = Connection('redis://localhost:6379')
"""
block=True意味着如果连接池中的连接都被占用,则会阻塞。注意如果你的代码中没有正确地释放
连接池的连接,则会造成死锁;为了防止该事情发生,可以设置timeout 参数,具体查看
kombu.connection.Resource.acquire()的使用
"""
with connections[connection].acquire(block=True) as conn:
	print('Got connection: {0!r}'.format(conn.as_uri()))

还有更多的用法,例如连接多个消息中间件、producer pool的使用、自定义pool groups等,请查看kombu文档

生产者

可以通过Connection的实例来创建生产者对象:

>>> producer = connection.Producer()

又或者通过Producer来实例化一个生产者对象,但需要传入一个channel或connection对象:

>>> from kombu import Connection, Producer

>>> with Connection('amqp://') as conn:
...     with conn.channel() as channel:
...          producer = Producer(channel)

获得生产者对象后,就可以发布消息到连接的RabbitMQ队列了:

>>> from kombu import Exchange, Queue
>>> # 实例一个队列对象,默认durable=True
>>> task_queue = Queue('tasks', Exchange('tasks'), routing_key='tasks')
>>> producer.publish(
...     {'hello': 'world'},
...		exchange=task_queue.exchange,
...		routing_key=task_queue.routing_key,
...     declare=[task_queue],  	 # declares exchange, queue and binds before sending message
...     retry=True,	
...     retry_policy={			 # 重试策略
...         'interval_start': 0, # First retry immediately,
...         'interval_step': 2,  # then increase by 2s for every retry.
...         'interval_max': 30,  # but don't exceed 30s between retries.
...         'max_retries': 30,   # give up after 30 tries.
...     },
... )

publish方法更多的参数,请查看kombu文档

消费者

通过Consumer来接收消息,需要传入connection实例对象,需要接收消息的队列(或列表),处理收到消息的回调函数列表,接收消息的格式,代码如下:

from kombu import Queue, Exchange, Connection

connection = Connection('redis://localhost:6379')
queue = Queue('tasks', Exchange('tasks'), routing_key='tasks')

# 必须接收body和message两个参数
def process_message(body, message):
	print(body)
	print("调用grpc服务")
	# 返回acknowledge,消费者已经处理了该消息
	message.ack()

with Consumer(connection, queues=queue, callbacks=[process_message], accept=['json']):
	while True:
	    print("Start receiving tasks")
	    self.conn.drain_events()

或者用ConsumerMixin类来定义消费者:

from kombu.mixins import ConsumerMixin

class C(ConsumerMixin):

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [
            Consumer(queues, callbacks=[self.on_message], accept=['json']),
        ]

    def on_message(self, body, message):
        print('RECEIVED MESSAGE: {0!r}'.format(body))
        message.ack()

C(connection).run()

更多用法请查看kombu文档

 类似资料: