当前位置: 首页 > 工具软件 > python pika > 使用案例 >

python框架pika的基本使用

柳奇思
2023-12-01

安装

pip install pika

使用

import pika
# 建立连接
# user:账号	pwd:密码
userx = pika.PlainCredentials(user, pwd)
# hosh:rabbitmq所在的ip	port: 端口号
parameters = pika.ConnectionParameters(host, int(port), '/', credentials=userx)
conn = pika.BlockingConnection(parameters)
# 开辟管道
channel = conn.channel()
# 声明队列,不存在则创建,存在则连接
# queue:队列名		durable:是否持久化
queue_declare = channel.queue_declare(queue=key, durable=True)
# prefetch_count: 通道的最大容量
channel.basic_qos(prefetch_count=prefetch_count)


# 存储(生产者)
# 发送数据,发送一条,如果要发送多条则复制此段
channel.basic_publish(
    exchange='',	# 这是发布才需的参数,不用管(发布、订阅模式)
    routing_key=key,	# 队列名
    body=data.encode(),	# 需要存储的数据
    properties=pika.BasicProperties(
        delivery_mode=2,  # 实现消息永久保存
    )
)


# 读取(消费者)
# 会持续监听
# 准备接收
channel.basic_consume(
    on_message_callback=fun,	# 回调的函数对象
    queue=key	# 队列名
)
# 开始接收(将数据放入回调函数开始执行)
self.channel.start_consuming()


# 回调函数
def fun(x1, x2, x3, data):
    print(x1)
    print(x2)
    print(x3)
    print(data)

其他读取方法

# 查看声明时队列中剩余的数据个数(如需查看最新的需重新声明)
queue_declare.method.message_coun
# 获取一个可迭代对象
# key:队列名
ret = channel.consume(key)
# 获取队列中的一条数据,得到的是个元组(如果队列为空则会阻塞,持续等待数据)
data = next(ret)
# 本次请求编号
delivery_tag = data[0].delivery_tag
# 获取到的数据
text = data[-1].decode()
# 通知本条消息已经确实收到了,队列会将该条数据删除(该方法不会自动通知,因此需要手动操作)
channel.basic_ack(delivery_tag)


# 此方法与上方的类似,但是更加直接且可以查看队列中剩余的个数
# 获取队列中的一条数据,得到的是个元组(如果队列为空则返回(None, None, None))
# key:队列名
data_obj = channel.basic_get(queue=key)
# 本次请求编号
delivery_tag = data_obj[0].delivery_tag
# 本次获取完毕剩余的个数(不包含本条)
count = data_obj[0].message_count
# 获得的数据
text = data_obj[-1].decode()
# 提交本次响应结果
channel.basic_ack(delivery_tag)
 类似资料: