当前位置: 首页 > 工具软件 > py-amqp > 使用案例 >

基于py-amqp编写生产者和消费者代码

岑元徽
2023-12-01

在介绍erlang的dbg调试RabbitMQ源码之前,首先介绍基于py-amqp编写RabbitMQ的生产者和消费者代码,其中py-amqp的安装包可在以下链接下载:https://pypi.org/project/amqp/

1 公共模块

生产者和消费者一些公共代码编写在utils.py文件中,代码如下:

import logging
import json
import os

class LOG(object):
    """
    LOG object is used to print logging informations.
    """
    def __init__(self, name=__name__):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(level=logging.INFO)
        handler = logging.StreamHandler()
        handler.setLevel(logging.INFO)
        formatter = logging.Formatter('%(asctime)s %(thread)d %(levelname)s %(name)s %(message)s')
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)

    def info(self, msg):
        self.logger.info(msg)

    def debug(self, msg):
        self.logger.debug(msg)

    def warning(self, msg):
        self.logger.warning(msg)

    def error(self, msg):
        self.logger.error(msg)

def get_record_file():
    for item in os.listdir(os.getcwd()):
        if item.endswith('.json'):
            return os.path.join(os.getcwd(),item)

def read_record():
    file = get_record_file()
    with open(file, 'r') as f:
        record = json.load(f)
    return record

其中的RabbitMQ的实体(如queue和exchange)以及发送消息编写在record.json文件中,即:

{
    "context": {
        "version": "1.0",
        "body": "RabbitMQ transport message"
    },
    "entities": {
        "exchange": "base_exchange",
        "exchange_type": "topic",
        "queue": "base_queue",
        "binding_key": "base.#",
        "routing_key": "base.test"
    }
}

2 生产者模块

生产者代码如下:

import sys
import amqp
import time
import utils
import traceback
import json

LOG = utils.LOG(__name__)

_ts = lambda: time.time()

def seed_msg(count, context):
    if count <= 0:
        LOG.error("Please specify a value greater than 0.")
        raise ValueError
    msg = amqp.Message(json.dumps(context))
    msg.properties['delivery_mode'] = 2
    for _ in range(count):
        yield msg


class Producer(object):
    def __init__(self, url="localhost:5672", name="guest",
                    password="guest"):
        self.url = url
        self.name = name
        self.password = password

    def __enter__(self):
        self.connect()
        return self

    def __exit__(self, *eargs):
        self.close()

    def connect(self):
        LOG.info("Connect to RabbitMQ server.")
        try:
            self.conn = amqp.Connection(host=self.url,
                                        userid=self.name,
                                        password=self.password,
                                        connect_timeout=5)
            self.chan = self.conn.channel()
        except Exception as exc:
            LOG.error("Connect host: {0} failed and raise exception: {1}".format(
                        self.url, exc))
            raise exc
        return self

    def close(self):
        LOG.info("Close connection with RabbitMQ server.")
        if hasattr(self, 'chan'):
            self.chan.close()
        if hasattr(self, 'conn'):
            self.conn.close()

    def declare(self, exchange, exchange_type, queue,
                binding_key, routing_key):
        self.exchange = exchange
        self.exchange_type = exchange_type
        self.queue = queue
        self.binding_key = binding_key
        self.routing_key = routing_key
        try:
            self.chan.exchange_declare(exchange=self.exchange,
                                       type=self.exchange_type,
                                       durable=False,
                                       auto_delete=False)
            self.chan.queue_declare(queue=self.queue,
                                    exclusive=False,
                                    durable=False,
                                    auto_delete=False)
            self.chan.queue_bind(queue=self.queue,
                                 exchange=self.exchange,
                                 routing_key=self.binding_key)
        except Exception as exc:
            LOG.error("Declare failed and raise exception: {0}".format(exc))
            raise exc

    def publish_message(self, msg):
        try:

            self.chan.basic_publish_confirm(msg, exchange=self.exchange,
                                            routing_key=self.routing_key)
        except Exception as exc:
            LOG.error("Publish message failed and raise exception: {0}".format(exc))
            raise exc

def parse_record():
    record = utils.read_record()
    context = record.get('context')
    entities = record.get('entities')
    return context, entities

def main(argv):
    url = raw_input("Please input a URL (<ip>:5672): ")
    name, password = "guest", "guest"
    interval = 2
    context, entities = parse_record()
    with Producer(url, name, password) as p:
        p.declare(exchange=entities['exchange'],
                  exchange_type=entities['exchange_type'],
                  queue=entities['queue'],
                  binding_key=entities['binding_key'],
                  routing_key=entities['routing_key'])
        for i, msg in enumerate(seed_msg(2, context), 1):
            start_time = _ts()
            LOG.info("Start to send {0} msg.".format(i))
            p.publish_message(msg)
            LOG.info("Finish to send {0} msg.".format(i))
            end_time = _ts()
            delay = end_time - start_time - interval
            time.sleep(-delay if delay < 0 else 0)


if __name__ == "__main__":
    main(sys.argv)

3 消费者模块

消费者代码如下:

import sys
import amqp
import time
import utils
import traceback
import socket
import signal

LOG = utils.LOG(__name__)


class Consumer(object):
    def __init__(self, url="localhost:5672", name="guest",
                    password="guest"):
        self.url = url
        self.name = name
        self.password = password

    def __enter__(self):
        self.connect()
        return self

    def __exit__(self, *eargs):
        self.close()

    def connect(self):
        LOG.info("Connect to RabbitMQ server.")
        try:
            self.conn = amqp.Connection(host=self.url,
                                        userid=self.name,
                                        password=self.password,
                                        connect_timeout=5)
            self.chan = self.conn.channel()
        except Exception as exc:
            LOG.error("Connect host: {0} failed and "
                        "raise exception: {1}".format(self.url, exc))
            raise exc
        return self

    def close(self):
        LOG.info("Close connection with RabbitMQ server.")
        if hasattr(self, 'chan'):
            self.chan.close()
        if hasattr(self, 'conn'):
            self.conn.close()

    def _callback(self, msg):
        LOG.info("Received msg: {0} properties: {1} delivery: {2}".format(
                    msg.body, msg.properties, msg.delivery_info))
        delivery_tag = msg.delivery_tag
        self.chan.basic_ack(delivery_tag)

    def declare(self, exchange, exchange_type, queue,
                binding_key, routing_key):
        self.exchange = exchange
        self.exchange_type = exchange_type
        self.queue = queue
        self.binding_key = binding_key
        self.routing_key = routing_key
        try:
            self.chan.exchange_declare(exchange=self.exchange,
                                       type=self.exchange_type,
                                       durable=False,
                                       auto_delete=False)
            self.chan.queue_declare(queue=self.queue,
                                    exclusive=False,
                                    durable=False,
                                    auto_delete=False)
            self.chan.queue_bind(queue=self.queue,
                                 exchange=self.exchange,
                                 routing_key=self.binding_key)
            self.chan.basic_consume(queue=self.queue,
                                    no_ack=False,
                                    callback=self._callback,
                                    consumer_tag="tag_{0}".format(queue))
        except Exception as exc:
            LOG.error("Declare failed and raise exception: {0}".format(exc))
            raise exc

    def start_consuming(self):
        while not self._exit:
            wait_method = amqp.spec.Channel.CloseOk
            try:
                self.chan.wait(wait_method, timeout=1)
            except socket.timeout:
                pass
            except Exception as exc:
                LOG.error("Consume stop and raise exception: {0}".format(exc))
                raise exc

    def signal_handler(self, signum, frame):
        LOG.info("Received signal: {} and exit".format(signum))
        self._exit = True

    def setup_signal(self):
        self._exit = False
        signal.signal(signal.SIGHUP, self.signal_handler)
        signal.signal(signal.SIGINT, self.signal_handler)
        signal.signal(signal.SIGQUIT, self.signal_handler)
        signal.signal(signal.SIGALRM, self.signal_handler)
        signal.signal(signal.SIGTERM, self.signal_handler)
        signal.signal(signal.SIGCONT, self.signal_handler)

def parse_record():
    record = utils.read_record()
    entities = record.get('entities')
    return entities

def main(argv):
    url = raw_input("Please input a URL (<ip>:5672): ")
    name, password = "guest", "guest"
    entities = parse_record()
    with Consumer(url, name, password) as c:
        c.declare(exchange=entities['exchange'],
                  exchange_type=entities['exchange_type'],
                  queue=entities['queue'],
                  binding_key=entities['binding_key'],
                  routing_key=entities['routing_key'])
        c.setup_signal()
        c.start_consuming()


if __name__ == "__main__":
    main(sys.argv)

4 运行

[root@master code]# python producer.py
Please input a URL (<ip>:5672): 192.168.1.100
2019-10-06 10:44:20,915 140120100865856 INFO __main__ Connect to RabbitMQ server.
/usr/lib/python2.7/site-packages/amqp-2.5.2-py2.7.egg/amqp/connection.py:325: AMQPDeprecationWarning: The .frame_writer attribute on the connection was accessed before
the connection was established.  This is supported for now, but will
be deprecated in amqp 2.2.0.

Since amqp 2.0 you have to explicitly call Connection.connect()
before using the connection.

  W_FORCE_CONNECT.format(attr=attr)))
2019-10-06 10:44:20,933 140120100865856 INFO __main__ Start to send 1 msg.
2019-10-06 10:44:20,935 140120100865856 INFO __main__ Finish to send 1 msg.
2019-10-06 10:44:22,936 140120100865856 INFO __main__ Start to send 2 msg.
2019-10-06 10:44:22,938 140120100865856 INFO __main__ Finish to send 2 msg.
2019-10-06 10:44:24,949 140120100865856 INFO __main__ Close connection with RabbitMQ server.
[root@master code]# python consumer.py
Please input a URL (<ip>:5672): 192.168.1.100
2019-10-06 10:44:08,268 140428916774720 INFO __main__ Connect to RabbitMQ server.
/usr/lib/python2.7/site-packages/amqp-2.5.2-py2.7.egg/amqp/connection.py:325: AMQPDeprecationWarning: The .frame_writer attribute on the connection was accessed before
the connection was established.  This is supported for now, but will
be deprecated in amqp 2.2.0.

Since amqp 2.0 you have to explicitly call Connection.connect()
before using the connection.

  W_FORCE_CONNECT.format(attr=attr)))

2019-10-06 10:44:20,936 140428916774720 INFO __main__ Received msg: {"body": "RabbitMQ transport message", "version": "1.0"} properties: {u'delivery_mode': 2} delivery: {u'consumer_tag': u'tag_base_queue', u'redelivered': False, u'routing_key': u'base.test', u'delivery_tag': 1, u'exchange': u'base_exchange'}
2019-10-06 10:44:22,938 140428916774720 INFO __main__ Received msg: {"body": "RabbitMQ transport message", "version": "1.0"} properties: {u'delivery_mode': 2} delivery: {u'consumer_tag': u'tag_base_queue', u'redelivered': False, u'routing_key': u'base.test', u'delivery_tag': 2, u'exchange': u'base_exchange'}

 

 类似资料: