当前位置: 首页 > 工具软件 > python pika > 使用案例 >

python使用pika操作rabbitmq总结(一)

夏侯承恩
2023-12-01

python 连接操作rabbitMQ 主要是使用pika库

安装:

pip install pika==1.0.1

注意: pika 1.x 与 pika 0.x 有一些不同,使用的时候需要看清版本使用,避免踩坑

Pika是用于Python的RabbitMQ(AMQP 0-9-1)客户端库

注: 官方对于pika有如下介绍:

Since threads aren’t appropriate to every situation, it doesn’t require threads. 
Pika core takes care not to forbid them, either. 
The same goes for greenlets, callbacks, continuations, and generators.
 An instance of Pika’s built-in  connection adapters isn’t thread-safe, however.

线程并不适用于每种场景, 因此并不要求使用线程。 但是pika并不禁用线程, 对于

greenlets, callbacks也一样。 一个pika建立的连接并不是线程安全的

因此在多线程中共享一个pika连接不是线程安全的, 当然也有一种使用:

with one exception: you may call the connection method add_callback_threadsafe from
 another thread to schedule a callback within an active pika connection.

使用add_callback_threadsafe方法callback 一个pika连接从另外一个线程中

pika提供建立连接方式:

  1. pika.adapters.asyncio_connection.AsyncioConnection - 用于python 3 AsyncIO的I/O异步模式
  2. pika.BlockingConnection - 同步模式, 简单易用
  3. pika.SelectConnection - 没有第三方依赖包的异步模式
  4. pika.adapters.tornado_connection.TornadoConnection - 基于Tornado 的异步IO请求模式
  5. pika.adapters.twisted_connection.TwistedProtocolConnection - 基于Twisted’的异步IO请求模式

例子:

一. 最经典的’hello world’

  1. 生产者:
#!/usr/bin/env python
import pika

auth = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=8000))
channel = connection.channel()

channel.queue_declare(queue='TEST01')

channel.basic_publish(exchange='',
                      routing_key='TEST01',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
 [x] Sent 'Hello World!'
  1. 消费者:
#!/usr/bin/env python
import pika

auth = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=8000))
channel = connection.channel()

channel.queue_declare(queue='TEST01')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(on_message_callback=callback,
                      queue='TEST01',
                      auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'

消费者其他例子, 消费10次退出:

#!/usr/bin/env python
import pika

auth = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=8000))
channel = connection.channel()

channel.queue_declare(queue='TEST01')

for method_frame, properties, body in channel.consume('TEST01'):
    #显示消息部分并确认消息
    print method_frame, properties, body
    channel.basic_ack(method_frame.delivery_tag)
    #在10条消息后退出循环
    if method_frame.delivery_tag == 10:
        break
#取消消费者并返回任何待处理消息
requeued_messages = channel.cancel()
print 'Requeued %i messages' % requeued_messages
connection.close()
<Basic.Deliver(['consumer_tag=ctag1.26348636b92c4be8951b8ad51145a53a', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=TEST01'])> <BasicProperties> Hello World!
.....
<Basic.Deliver(['consumer_tag=ctag1.26348636b92c4be8951b8ad51145a53a', 'delivery_tag=10', 'exchange=', 'redelivered=False', 'routing_key=TEST01'])> <BasicProperties> Hello World!
Requeued 0 messages

delivery_tag 从1 到10, 等于10的时候退出并关闭通道

二. 集群连接方式

rabbitmq提供cluster集群功能,对于集群的连接与单节点连接稍微有点不一样, 如果集群依然只是用单节点的连接方式,则pika只连接到一个节点,但节点宕机, 服务异常等服务不可用时无法实现故障转移功能。因此需要配置多个节点,使得节点异常时,能够在另外一个node上重连,并继续提供服务

为了实现重试功能需设置connection_attemptsretry_delay, 重试发生在在所有给定的节点都连接失败后

import pika

parameters = (
    pika.ConnectionParameters(host='rabbitmq.node1.com'),
    pika.ConnectionParameters(host='rabbitmq.node2.com',
                              connection_attempts=5, retry_delay=1))
connection = pika.BlockingConnection(parameters)

对于非阻塞适配器(非BlockingConnection)(如pika.selectconnection和pika.adapter s.asyncio_connection.asynciioconnection),可以通过连接适配器的create_connection()类方法使用多个连接参数实例请求连接。

三. 另外一个线程处理及确认

一个单线程的pika处理程序,当遇到一个需要很长执行时间的请求时,可能造成阻塞,其他使用者将会超时,一种解决方式是,一个线程接收到消息后,将其委托给另外一个线程进行处理, 而该线程继续处理其他请求, 及非阻塞

因为是多线程非阻塞,那么另外一个消息的处理结果可能无法通过主线程直接处理,解决办法是可以使用回调来实现

定义一个函数:

def ack_message(channel, delivery_tag):
    """Note that `channel` must be the same Pika channel instance via which
    the message being acknowledged was retrieved (AMQP protocol constraint).
    """
    if channel.is_open:
        channel.basic_ack(delivery_tag)
    else:
        # Channel is already closed, so we can't acknowledge this message;
        # log and/or do something that makes sense for your app in this case.
        pass

在另外一个线程中,则连接适配器adapter需要加入这个ack_message回调处理函数

pika.BlockingConnection 是使用pika.BlockingConnection.add_callback_threadsafe() 添加的

connection.add_callback_threadsafe(functools.partial(ack_message, channel, delivery_tag))

完整的消费端程序为:

#!/usr/bin/env python
# coding=utf-8
import pika
import functools
import threading

auth = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=8000))
channel = connection.channel()

channel.queue_declare(queue='TEST01')


def ack_message(channel, delivery_tag, body):
    """Note that `channel` must be the same Pika channel instance via which
    the message being acknowledged was retrieved (AMQP protocol constraint).
    """
    if channel.is_open:
        print method_frame, body
        channel.basic_ack(delivery_tag)
    else:
        # Channel is already closed, so we can't acknowledge this message;
        # log and/or do something that makes sense for your app in this case.
        pass

for method_frame, properties, body in channel.consume('TEST01'):
    #显示消息部分并确认消息
    connection.add_callback_threadsafe(functools.partial(ack_message, channel, method_frame.delivery_tag, body))
    
    if method_frame.delivery_tag == 10:
        break

requeued_messages = channel.cancel()
print 'Requeued %i messages' % requeued_messages
connection.close()

输出:

<Basic.Deliver(['consumer_tag=ctag1.ac8618875c104dbdae353295b29fef39', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=TEST01'])> Hello World!
...
<Basic.Deliver(['consumer_tag=ctag1.ac8618875c104dbdae353295b29fef39', 'delivery_tag=9', 'exchange=', 'redelivered=False', 'routing_key=TEST01'])> Hello World!
Requeued 0 messages

pika.SelectConnection 使用add_callback_threadsafe()方法

pika.adapters.tornado_connection.TornadoConnection 使用add_callback()

pika.adapters.asyncio_connection.AsyncioConnection

使用call_soon_threadsafe()

四. 重连

对于Bunny, Java, .NET, Objective-C, Swift 的rabbitmq客户端拥有自动重连机制, 但是对于python 客户端 目前还没有提供自动重连机制,需要自行实现

  1. while实现:
import pika

while True:
    try:
        connection = pika.BlockingConnection()
        channel = connection.channel()
        channel.basic_consume('test', on_message_callback)
        channel.start_consuming()
    # Don't recover if connection was closed by broker
    except pika.exceptions.ConnectionClosedByBroker:
        break
    # Don't recover on channel errors
    except pika.exceptions.AMQPChannelError:
        break
    # Recover on all other connection errors
    except pika.exceptions.AMQPConnectionError:
        continue

这种方式简单,但不够优雅, 因为异常后,会不停地进行重试。

  1. retry实现
from retry import retry

@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
def consume():
    connection = pika.BlockingConnection()
    channel = connection.channel()
    channel.basic_consume('test', on_message_callback)

    try:
        channel.start_consuming()
    # Don't recover connections closed by server
    except pika.exceptions.ConnectionClosedByBroker:
        pass

consume()

五. 扩展功能

扩展支持其他IO框架需要遵循下面方法:

  1. 通过继承pika.BaseConnection,实现其抽象方法并将其构造函数传递给pika.adapters.utils.nbio_interface.AbstractIOService

pika.BaseConnection实现了pika.connection.Connection的抽象方法,包括内部启动的连接逻辑。可以参考pika.adapters.asyncio_connection.AsyncioConnectionpika.adapters.tornado_connection.TornadoConnection的实现

  1. 通过继承pika.connection.Connection并实现其抽象方法
 类似资料: