python 连接操作rabbitMQ 主要是使用pika库
安装:
pip install pika==1.0.1
注意: pika 1.x 与 pika 0.x 有一些不同,使用的时候需要看清版本使用,避免踩坑
Pika是用于Python的RabbitMQ(AMQP 0-9-1)客户端库
注: 官方对于pika有如下介绍:
Since threads aren’t appropriate to every situation, it doesn’t require threads.
Pika core takes care not to forbid them, either.
The same goes for greenlets, callbacks, continuations, and generators.
An instance of Pika’s built-in connection adapters isn’t thread-safe, however.
线程并不适用于每种场景, 因此并不要求使用线程。 但是pika并不禁用线程, 对于
greenlets, callbacks也一样。 一个pika建立的连接并不是线程安全的
因此在多线程中共享一个pika连接不是线程安全的, 当然也有一种使用:
with one exception: you may call the connection method add_callback_threadsafe from
another thread to schedule a callback within an active pika connection.
使用add_callback_threadsafe方法callback 一个pika连接从另外一个线程中
pika提供建立连接方式:
例子:
一. 最经典的’hello world’
#!/usr/bin/env python
import pika
auth = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=8000))
channel = connection.channel()
channel.queue_declare(queue='TEST01')
channel.basic_publish(exchange='',
routing_key='TEST01',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
[x] Sent 'Hello World!'
#!/usr/bin/env python
import pika
auth = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=8000))
channel = connection.channel()
channel.queue_declare(queue='TEST01')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(on_message_callback=callback,
queue='TEST01',
auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World!'
消费者其他例子, 消费10次退出:
#!/usr/bin/env python
import pika
auth = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=8000))
channel = connection.channel()
channel.queue_declare(queue='TEST01')
for method_frame, properties, body in channel.consume('TEST01'):
#显示消息部分并确认消息
print method_frame, properties, body
channel.basic_ack(method_frame.delivery_tag)
#在10条消息后退出循环
if method_frame.delivery_tag == 10:
break
#取消消费者并返回任何待处理消息
requeued_messages = channel.cancel()
print 'Requeued %i messages' % requeued_messages
connection.close()
<Basic.Deliver(['consumer_tag=ctag1.26348636b92c4be8951b8ad51145a53a', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=TEST01'])> <BasicProperties> Hello World!
.....
<Basic.Deliver(['consumer_tag=ctag1.26348636b92c4be8951b8ad51145a53a', 'delivery_tag=10', 'exchange=', 'redelivered=False', 'routing_key=TEST01'])> <BasicProperties> Hello World!
Requeued 0 messages
delivery_tag 从1 到10, 等于10的时候退出并关闭通道
二. 集群连接方式
rabbitmq提供cluster集群功能,对于集群的连接与单节点连接稍微有点不一样, 如果集群依然只是用单节点的连接方式,则pika只连接到一个节点,但节点宕机, 服务异常等服务不可用时无法实现故障转移功能。因此需要配置多个节点,使得节点异常时,能够在另外一个node上重连,并继续提供服务
为了实现重试功能需设置connection_attempts
和 retry_delay
, 重试发生在在所有给定的节点都连接失败后
import pika
parameters = (
pika.ConnectionParameters(host='rabbitmq.node1.com'),
pika.ConnectionParameters(host='rabbitmq.node2.com',
connection_attempts=5, retry_delay=1))
connection = pika.BlockingConnection(parameters)
对于非阻塞适配器(非BlockingConnection)(如pika.selectconnection和pika.adapter s.asyncio_connection.asynciioconnection),可以通过连接适配器的create_connection()类方法使用多个连接参数实例请求连接。
三. 另外一个线程处理及确认
一个单线程的pika处理程序,当遇到一个需要很长执行时间的请求时,可能造成阻塞,其他使用者将会超时,一种解决方式是,一个线程接收到消息后,将其委托给另外一个线程进行处理, 而该线程继续处理其他请求, 及非阻塞
因为是多线程非阻塞,那么另外一个消息的处理结果可能无法通过主线程直接处理,解决办法是可以使用回调来实现
定义一个函数:
def ack_message(channel, delivery_tag):
"""Note that `channel` must be the same Pika channel instance via which
the message being acknowledged was retrieved (AMQP protocol constraint).
"""
if channel.is_open:
channel.basic_ack(delivery_tag)
else:
# Channel is already closed, so we can't acknowledge this message;
# log and/or do something that makes sense for your app in this case.
pass
在另外一个线程中,则连接适配器adapter需要加入这个ack_message回调处理函数
pika.BlockingConnection 是使用pika.BlockingConnection.add_callback_threadsafe() 添加的
connection.add_callback_threadsafe(functools.partial(ack_message, channel, delivery_tag))
完整的消费端程序为:
#!/usr/bin/env python
# coding=utf-8
import pika
import functools
import threading
auth = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=8000))
channel = connection.channel()
channel.queue_declare(queue='TEST01')
def ack_message(channel, delivery_tag, body):
"""Note that `channel` must be the same Pika channel instance via which
the message being acknowledged was retrieved (AMQP protocol constraint).
"""
if channel.is_open:
print method_frame, body
channel.basic_ack(delivery_tag)
else:
# Channel is already closed, so we can't acknowledge this message;
# log and/or do something that makes sense for your app in this case.
pass
for method_frame, properties, body in channel.consume('TEST01'):
#显示消息部分并确认消息
connection.add_callback_threadsafe(functools.partial(ack_message, channel, method_frame.delivery_tag, body))
if method_frame.delivery_tag == 10:
break
requeued_messages = channel.cancel()
print 'Requeued %i messages' % requeued_messages
connection.close()
输出:
<Basic.Deliver(['consumer_tag=ctag1.ac8618875c104dbdae353295b29fef39', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=TEST01'])> Hello World!
...
<Basic.Deliver(['consumer_tag=ctag1.ac8618875c104dbdae353295b29fef39', 'delivery_tag=9', 'exchange=', 'redelivered=False', 'routing_key=TEST01'])> Hello World!
Requeued 0 messages
pika.SelectConnection 使用add_callback_threadsafe()方法
pika.adapters.tornado_connection.TornadoConnection 使用add_callback()
pika.adapters.asyncio_connection.AsyncioConnection
使用call_soon_threadsafe()
四. 重连
对于Bunny, Java, .NET, Objective-C, Swift 的rabbitmq客户端拥有自动重连机制, 但是对于python 客户端 目前还没有提供自动重连机制,需要自行实现
import pika
while True:
try:
connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_consume('test', on_message_callback)
channel.start_consuming()
# Don't recover if connection was closed by broker
except pika.exceptions.ConnectionClosedByBroker:
break
# Don't recover on channel errors
except pika.exceptions.AMQPChannelError:
break
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError:
continue
这种方式简单,但不够优雅, 因为异常后,会不停地进行重试。
from retry import retry
@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
def consume():
connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_consume('test', on_message_callback)
try:
channel.start_consuming()
# Don't recover connections closed by server
except pika.exceptions.ConnectionClosedByBroker:
pass
consume()
五. 扩展功能
扩展支持其他IO框架需要遵循下面方法:
继承pika.BaseConnection
,实现其抽象方法并将其构造函数传递给pika.adapters.utils.nbio_interface.AbstractIOService
pika.BaseConnection
实现了pika.connection.Connection
的抽象方法,包括内部启动的连接逻辑。可以参考pika.adapters.asyncio_connection.AsyncioConnection
和pika.adapters.tornado_connection.TornadoConnection
的实现
继承pika.connection.Connection
并实现其抽象方法