当前位置: 首页 > 工具软件 > python pika > 使用案例 >

python实时连接rabbitmq_rabbitmq之python_pika模块连接MQ使用(五)

令狐唯
2023-12-01

前言

接下来使用python的pika模块连接rabbitmq。

环境搭建

安装pika模块

pip install pika

实例介绍

先从一个最简单的生产者/消费者说起

# send.py

class SenderClient(object):

def __init__(self, username, passwd, host="127.0.0.1", port=5672,

queuename='eeg', exchange='eeg',routing_key='eeg'):

self.__host = host

self.__port = port

self.__username = username

self.__passwd = passwd

self.__queue = queuename

self.__exchange = exchange

self.__rout_key = routing_key

def connect_mq(self):

"""

连接mq

:return:

"""

try:

# 创建一个连接对象

if not hasattr(self, 'connection') or self.connection.is_closed:

# 添加用户名和密码

credentials = pika.PlainCredentials(self.__username, self.__passwd)

# 配置连接参数

parameters = pika.ConnectionParameters(host=self.__host, port=self.__port, credentials=credentials)

self.connection = pika.BlockingConnection(parameters)

except Exception as e:

print(e)

def channel_mq(self):

"""

# 创建一个信道

# 声明队列和交换机和绑定

:return:

"""

self.connect_mq()

if not hasattr(self, 'channel') or self.channel.is_closed:

self.channel = self.connection.channel()

# 声明一个队列,durable参数声明队列持久化

self.channel.queue_declare(queue=self.__queue, durable=True)

self.channel.exchange_declare(exchange=self.__exchange, durable=True)

# 交换机和队列绑定

self.channel.queue_bind(exchange=self.__exchange, queue=self.__queue, routing_key=self.__rout_key)

def open_data(self, filename):

"""

# 打开一个数据文件

:return:

"""

with open(filename, 'r', encoding='utf-8') as f:

data = f.read()

return data

def send_data(self, data:str):

"""

# 发送数据

:param channel:

:return:

"""

self.channel_mq()

# 使用默认交换机投递消息,返回TRUE或False

self.channel.basic_publish(exchange=self.__exchange,

routing_key=self.__rout_key,

body=data,

properties=pika.BasicProperties(delivery_mode=2))

def close_connect(self):

"""

# 关闭tcp连接

:return:

"""

self.connection.close()

def close_channel(self, channel):

"""

# 关闭信道

:param channel:

:return:

"""

if not hasattr(self, 'channel'):

raise ValueError("the object of SenderClient has not attr of channel.")

self.channel.close()

# receiver.py

class ReceiverClient(SenderClient):

"""

接收rabbitmq消息的消费者

"""

def run(self,queuename=None):

"""

从mq中接收消息。

:return:

"""

if not hasattr(self, 'channel') or self.channel.is_closed:

self.channel_mq()

if not queuename: queuename=self.__queue

# 订阅消息

self.channel.basic_consume(self.callback,

queue=queuename,

no_ack=False)

# 循环等待

self.channel.start_consuming()

def callback(self,ch, method, properties, body):

"""

# 接收消息处理函数

:param ch:

:param method:

:param properties:

:param body:

:return:

"""

print('接收成功!')

# 发送确认

ch.basic_ack(delivery_tag=method.delivery_tag)

分析方法

创建一个连接connection

# 添加用户名和密码

credentials = pika.PlainCredentials(self.__username, self.__passwd)

# 配置连接参数

parameters = pika.ConnectionParameters(host=self.__host, credentials=credentials)

# 创建一个连接对象

connection = pika.BlockingConnection(parameters)

pika.PlainCredentials:一个凭据类

# 该类传递的参数

def __init__(self, username, # 用户名

password, # 密码

erase_on_connect=False): # 是否保存凭据在内存中,如果参数是True,那么该类会在连接完成后删除

pika.ConnectionParameters:TCP连接的参数类

def __init__(self,

host=_DEFAULT, # 默认的ip127.0.0.1

port=_DEFAULT, # 端口5672

virtual_host=_DEFAULT, # 默认的虚拟主机/

credentials=_DEFAULT, # 默认的凭据 user:guest passwd:guest

# 以下的参数含义可以在配置文件中找到,一般不需要在这里配置

channel_max=_DEFAULT,

frame_max=_DEFAULT,

heartbeat=_DEFAULT,

ssl=_DEFAULT,

ssl_options=_DEFAULT,

connection_attempts=_DEFAULT,

retry_delay=_DEFAULT,

socket_timeout=_DEFAULT,

locale=_DEFAULT,

backpressure_detection=_DEFAULT,

blocked_connection_timeout=_DEFAULT,

client_properties=_DEFAULT,

tcp_options=_DEFAULT,

**kwargs):

pika.BlockingConnection:创建一个连接类

def __init__(self, parameters=None, _impl_class=None):

# 传递一个参数类就可以了,_impl_class只用于测试

创建一个channel

# 创建一个信道

channel = connection.channel()

def channel(self, channel_number=None): # 指定通道的编号,一般让rabbitmq自动去管理

声明一个队列queue

def queue_declare(self, queue='', # 队列的名字,默认为空,此时将自动创建一个名字,

passive=False, # 检查一下队列是否存在,如果该参数为True,该方法判断队列存在否,不会声明队列;存在返回queue的状态;

durable=False, # 队列持久化参数,默认不持久化

exclusive=False, # 设置独享队列,该队列只被当前的connection使用,如果该tcp关闭了,队列会被删除

auto_delete=False,# 当最后一个消费者退订后自动删除,默认不开启

arguments=None) # 一个字典,用于队列传递额外的参数

声明一个交换机exchange

channel.exchange_declare

def exchange_declare(self, exchange=None, # 交换机的名字,为空则自动创建一个名字

exchange_type='direct', # 默认交换机类型为direct

passive=False, # 检查交换机是否存在,存在返回状态信息,不存在返回404错误

durable=False, # 设置是否持久化

auto_delete=False, # 最后一个队列解绑则删除

internal=False, # 是否设置为值接收从其他交换机发送过来的消息,不接收生产者的消息

arguments=None): # 一个字典,用于传递额外的参数

声明一个绑定

channel.queue_bind:队列和交换机绑定

def queue_bind(self, queue, # 队列的名字

exchange, # 交换机的名字

routing_key=None, # 路由键规则,当为None时,默认使用queue的名字作为路由键规则

arguments=None): # 一个字典,传递额外的参数

# 返回绑定的状态信息

channel.exchange_bind:交换机之间的绑定

def exchange_bind(self, destination=None, # 目的交换机的名字

source=None, # 源交换机的名字

routing_key='', # 路由键规则,当为None时,默认使用queue的名字作为路由键规则

arguments=None): # 一个字典,传递额外的参数

投递一条消息

channel.basic_publish

def basic_publish(self, exchange, # 交换机的名字

routing_key, # 路由键,吐过交换机是扇形交换机,可以随便写

body, # 消息主体

properties=None, # 消息的属性

mandatory=False, # 是否设置消息托管

immediate=False) # 是否消息实时同步确认,一般和confirm模式配合使用

# properties属性有一个专门的类来设置

pika.BasicProperties:

def __init__(self, content_type=None, content_encoding=None, headers=None, delivery_mode=None, priority=None, correlation_id=None, reply_to=None, expiration=None, message_id=None, timestamp=None, type=None, user_id=None, app_id=None, cluster_id=None):

self.content_type = content_type # 消息的类型,如text/html,json等

self.content_encoding = content_encoding # 消息的编码,如gbk,utf-8等

self.headers = headers # 消息头,可以和头交换机约定规则

self.delivery_mode = delivery_mode # 消息持久化,2表示持久化,

self.priority = priority # 消息的优先权

self.correlation_id = correlation_id

self.reply_to = reply_to

self.expiration = expiration # 消息的有效期

self.message_id = message_id # 消息iD,自动管理

self.timestamp = timestamp # 消息的时间戳

self.type = type

self.user_id = user_id

self.app_id = app_id # 发布应用的ID

self.cluster_id = cluster_id

订阅消息

方式一:客户端主动推送消息

channel.basic_consume+channel.start_consuming

def basic_consume(self, # 启动队列消费者,告诉服务端开启一个消费者

consumer_callback, # 消费者回调函数

queue, # 队列名称

no_ack=False, # 发送确认,默认开启消息确认模式,为True是关闭消息确认;如果回调函数中不发送消息确认,消息会一直存在队列中,等待推送给新连接的消费者

exclusive=False, # 设置独享消费者,不允许其他消费者订阅该队列

consumer_tag=None, # 消费者标签,如果不指定,系统自动生成

arguments=None): # 字典,额外的参数

consumer_callback:回调函数

def consumer_callback(channel, # 信道

method, # 一个交付的deliver对象,用来通知客户端消息

properties, # 消息的属性,就是消息在发送时定义的属性

body) # 消息的主题,二进制格式

method:spec.Basic.Deliver:交付对象的属性

def __init__(self, consumer_tag=None, delivery_tag=None, redelivered=False, exchange=None, routing_key=None):

self.consumer_tag = consumer_tag # 消费者标签,用来标记是哪一个消费者

self.delivery_tag = delivery_tag # 交付标签,用来发送消息确认和标记这是推送给该消费者的第几条消息

self.redelivered = redelivered # bool类型,若果为False表示这是消息第一次被推送,如果是True,表示这是一条被重复推送的消息

self.exchange = exchange # 该消息是从哪个交换机上来的

self.routing_key = routing_key # 该消息的路由键是什么

# 函数中,可以通过

method.consumer_tag/method.delivery_tag/method.redelivered等获取相应的属性

注意:推送是异步的,也就是说一次可能推送多条消息,提高性能。

start_consuming :开始阻塞等待消息

阻塞等待消息是有时间限制的,超过一定时间内如果没有新的消息推送过来会强制关闭连接,因此如果需要全时段等待的话需要监听该连接;

方式二:客户端主动获取消息

channel.basic_get:同步获取消息,性能比方式一低

def basic_get(self,

queue=None, # 队列名称

no_ack=False): # 是否需要开启确认模式

return method,properties,body

# 需要主动进行消息确认,basic_ack

取消订阅

channel.basic_cancel:取消某个消费者订阅

channel.stop_consuming:取消所有的订阅

订阅消息确认

basic_ack

def basic_ack(self,

delivery_tag=0, # 消息的标记,int类型,一般将回调函数consumer_callback中获取的交付标记放到这个位置

multiple=False): # Flase表示确认单个消息,为True表示确认多个消息

订阅消息拒绝

basic_nack

def basic_nack(self,

delivery_tag=None, # 交付这标记,和basic_ack一样

multiple=False, # Flase表示拒绝单个消息,为True表示拒绝多个消息

requeue=True) # True表示拒绝了消息后重新放回队列,False表示丢弃消息

basic_reject:另一个方法,只能拒绝单个消息,没有multiple参数;

公平调度

- basic_qos:一般在信道声明的时候使用,确定该信道的预取数,提高性能

def basic_qos(self,

prefetch_size=0, # 设置预取数据包的大小

prefetch_count=0, # 设置预取的数量,如果为0则不预取,消费者处理越快,可以将这个这设置的越高

all_channels=False) # 是否将所有的信道都设置上述参数

投递消息确认机制

使用AMQP协议的事务方式

有三个方法tx_select,tx_commit,tx_rollback

# 开启一个事务,在提交事务之前必须先执行此方法

channel.tx_select

# 提交一个事务

channel.tx_select

# 捕捉到异常就使用回滚

channel.tx_rollback

import pika

if __name__ == "__main__":

# 配置连接参数

parameters = pika.ConnectionParameters(host=self.__host)

# 创建一个连接对象

connection = pika.BlockingConnection(parameters)

# 创建一个信道

channel = connection.channel()

# 声明队列

channel.queue_declare(queue='test',durable=True)

# 开启事务

channel.tx_select()

try:

channel.basic_publish(exchange='',

routing_key='test',

body='hello-world')

result = 1/0

channel.tx_commit()

except:

channel.tx_rollback()

# tx_select和tx_commit之间的所有的操作都是事务的一部分

以上的方式是十分消耗rabbitmq的性能的,一般不推荐使用;

confirm 模式

rabbbitmq自带confirm模式,生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID)。

channel.confirm_delivery

import pika

if __name__ == "__main__":

# 配置连接参数

parameters = pika.ConnectionParameters(host=HOST)

# 创建一个连接对象

connection = pika.BlockingConnection(parameters)

# 创建一个信道

channel = connection.channel()

# 声明队列

channel.queue_declare(queue='test', durable=True)

# 打开通道的确认模式

channel.confirm_delivery()

for i in range(3):

result = channel.basic_publish(exchange='',

routing_key='test',

body='hello-world')

if result:

break

说明

当确认模式没有打开时,即使队列和交换机不存在,投递消息返回的都是True;

当确认模式打开时,投递失败会返回False,成功返回True,如果队列不存在,交换机会叫消息丢掉,但不会通知生产者;如果交换机不存在,会报错;

同一个信道,确认模式和事务模式只能存在一个,不能同时启用,否则报错;

交换机相互绑定

方法:channel.exchange_bind

import pika

if __name__ == "__main__":

# 添加用户名和密码

credentials = pika.PlainCredentials(USERNAME, PASSWD)

# 配置连接参数

parameters = pika.ConnectionParameters(host=HOST, credentials=credentials)

# 创建一个连接对象

connection = pika.BlockingConnection(parameters)

# 创建一个信道

channel = connection.channel()

# 声明队列

channel.queue_declare(queue='test1', durable=True)

channel.queue_declare(queue='test2', durable=True)

# 声明交换机

channel.exchange_declare('myname')

channel.exchange_declare('youname')

# 交换机绑定

channel.exchange_bind(destination='youname',source='myname',routing_key='ourheart')

# 队列绑定

channel.queue_bind(queue='test1',exchange='myname',routing_key='ourheart')

channel.queue_bind(queue='test2', exchange='youname',routing_key='ourheart')

channel.basic_publish(exchange='myname',

routing_key='ourheart',

body='hello-world')

说明

交换机相互绑定后,如果他们之间连接的桥routing_key是相同的,向源交换机投递消息,数据可以到达相同路由键的所有的队列;向目的交换机投递消息,消息不能到达源交换机;

交换机设置了internal了True参数后,该交换机不能再接收到生产者发送的消息,但可以得到源交换机发送的消息;

其他方法

exchange_delete: 删除交换机

queue_delete: 删除队列

queue_purge: 清除指定队列的所有的消息

queue_unbind: 队列和交换机解除绑定

basic.cancel: 清除消费者

参考

 类似资料: