当前位置: 首页 > 工具软件 > python pika > 使用案例 >

Python使用pika调用RabbitMQ

公冶子琪
2023-12-01

定义RabbitMQ类

import json
import os
import sys

import pika

from Data import Data
from MongoDB import MongoDB
from constants import *


class RabbitMQ:
    def __init__(self, queue_name):
        """
        初始化队列对象
        :param queue_name: 队列名称
        """
        self.queue_name = queue_name
        self.username = RABBITMQ_USERNAME
        self.password = RABBITMQ_PASSWORD
        self.RABBITMQ_HOST = RABBITMQ_HOST
        self.RABBITMQ_PORT = RABBITMQ_PORT

    def rabbitmq_init(self):
        """
        初始化消息队列
        :return: 通讯频道,RabbitMQ连接
        """
        # 设置用户
        credentials = pika.PlainCredentials(username=RABBITMQ_USERNAME, password=RABBITMQ_PASSWORD)
        # 连接RabbitMQ
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=RABBITMQ_HOST, port=RABBITMQ_PORT, credentials=credentials))
        # 创建频道
        channel = connection.channel()

        # 创建一个将消息传递到的问候队列
        queue = channel.queue_declare(queue=self.queue_name)
        return channel, connection

    def rabbitmq_receive(self):
        """
        获取消息队列连接
        :return:
        """
        channel, connection = self.rabbitmq_init()
        channel.basic_consume(queue=self.queue_name, on_message_callback=self.callback, auto_ack=True)
        print(" [*] 等待消息。按 'Ctrl + C' 退出")
        channel.start_consuming()

    def callback(self, ch, method, properties, body):
        """
        回调函数
        :param ch:
        :param method:
        :param properties:
        :param body: 获取到的body
        :return:
        """
        print(body)
        # 将二进制转换成字符串
        message = bytes.decode(body)
        print(" [x] 收到消息:{}".format(message))
        message = message.replace("'", '"')
        # 将json字符串转为字典
        data = json.loads(message)
        print(data)
        MongoDB.data_save(data_dict=data)

    def rabbitmq_send(self, message):
        """
        向消息队列发送消息
        :param message:
        :return:
        """
        # 获取消息队列连接
        channel, connection = self.rabbitmq_init()
        # 将内容转化为bytes
        body = Data.get_bytes(message)
        # 发送消息
        channel.basic_publish(exchange='', routing_key=self.queue_name, body=body)

        print("向队列-{}-发送:{}".format(self.queue_name, message))
        # 重新连接队列统计消息个数
        queue = channel.queue_declare(queue=self.queue_name, passive=True)
        message_count = queue.method.message_count
        print("{}队列消息数:{}".format(self.queue_name, message_count))
        # 关闭连接
        connection.close()

接收端:消费者

if __name__ == '__main__':
    rabbitmq = RabbitMQ(queue_name="Hello World")
    try:
        rabbitmq.rabbitmq_receive()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

发送端:生产者

if __name__ == '__main__':
    message = {
        "code": 1,
        "message": "正常",
        "data": {
            "name": "黎明",
            "dict": {
                "aabb": 123,
                "ccdd": 456,
            }
        }
    }
    rabbitmq = RabbitMQ(queue_name="Hello World")

    for i in range(20):
        rabbitmq.rabbitmq_send(message=message)
 类似资料: