pip install pika
import pika
# 建立连接
# user:账号 pwd:密码
userx = pika.PlainCredentials(user, pwd)
# hosh:rabbitmq所在的ip port: 端口号
parameters = pika.ConnectionParameters(host, int(port), '/', credentials=userx)
conn = pika.BlockingConnection(parameters)
# 开辟管道
channel = conn.channel()
# 声明队列,不存在则创建,存在则连接
# queue:队列名 durable:是否持久化
queue_declare = channel.queue_declare(queue=key, durable=True)
# prefetch_count: 通道的最大容量
channel.basic_qos(prefetch_count=prefetch_count)
# 存储(生产者)
# 发送数据,发送一条,如果要发送多条则复制此段
channel.basic_publish(
exchange='', # 这是发布才需的参数,不用管(发布、订阅模式)
routing_key=key, # 队列名
body=data.encode(), # 需要存储的数据
properties=pika.BasicProperties(
delivery_mode=2, # 实现消息永久保存
)
)
# 读取(消费者)
# 会持续监听
# 准备接收
channel.basic_consume(
on_message_callback=fun, # 回调的函数对象
queue=key # 队列名
)
# 开始接收(将数据放入回调函数开始执行)
self.channel.start_consuming()
# 回调函数
def fun(x1, x2, x3, data):
print(x1)
print(x2)
print(x3)
print(data)
# 查看声明时队列中剩余的数据个数(如需查看最新的需重新声明)
queue_declare.method.message_coun
# 获取一个可迭代对象
# key:队列名
ret = channel.consume(key)
# 获取队列中的一条数据,得到的是个元组(如果队列为空则会阻塞,持续等待数据)
data = next(ret)
# 本次请求编号
delivery_tag = data[0].delivery_tag
# 获取到的数据
text = data[-1].decode()
# 通知本条消息已经确实收到了,队列会将该条数据删除(该方法不会自动通知,因此需要手动操作)
channel.basic_ack(delivery_tag)
# 此方法与上方的类似,但是更加直接且可以查看队列中剩余的个数
# 获取队列中的一条数据,得到的是个元组(如果队列为空则返回(None, None, None))
# key:队列名
data_obj = channel.basic_get(queue=key)
# 本次请求编号
delivery_tag = data_obj[0].delivery_tag
# 本次获取完毕剩余的个数(不包含本条)
count = data_obj[0].message_count
# 获得的数据
text = data_obj[-1].decode()
# 提交本次响应结果
channel.basic_ack(delivery_tag)