当前位置: 首页 > 工具软件 > py-amqp > 使用案例 >

rabbitmq的amqp基础-publish/subscribe

颜哲彦
2023-12-01

RabbitMQ实现了AMQP(advanced message queue protocol)定义的消息队列:从Publisher接收数据然后传递到Subscriber。它能保证多并发,数据安全传递,可扩展.

publisher过程(producer)

  • 建立连接,
  • 创建channel
  • 创建exchange
  • 创建队列
  • 发布消息到交换机
  • 关闭信道
  • 关闭连接

subscriber过程(consumer)

  • 建立连接
  • 创建channel
  • 创建exchange
  • 创建队列与publisher同名
  • binding
  • 订阅消息
  • 关闭信道
  • 关闭连接

sample code(python)

  • publisher
# example_publisher.py
import pika, os, logging
import time
logging.basicConfig()

# Parse CLODUAMQP_URL (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://user:password@192.168.1.58:5672/%2f')
params = pika.URLParameters(url)
params.socket_timeout = 5

connection = pika.BlockingConnection(params) # Connect to CloudAMQP
channel = connection.channel() # start a channel
channel.queue_declare(queue='pdfprocess') # Declare a queue
# send a message
count = 1;
while True:
    count += 1
    channel.basic_publish(exchange='', routing_key='pdfprocess', body='User information ' + str(count))
    time.sleep(1)
    print ("[x] Message sent to consumer " + str(count))
connection.close()
  • consumer
# example_consumer.py
import pika, os, time

def pdf_process_function(msg):
  print(" PDF processing")
  print(" [x] Received " + str(msg))

  time.sleep(5) # delays for 5 seconds
  print(" PDF processing finished");
  return;

# Access the CLODUAMQP_URL environment variable and parse it (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://user:password@192.168.1.58:5672/%2f')
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel() # start a channel
channel.queue_declare(queue='pdfprocess') # Declare a queue

# create a function which is called on incoming messages
def callback(ch, method, properties, body):
  pdf_process_function(body)

# set up subscription on the queue
channel.basic_consume('pdfprocess',
  callback,
  auto_ack=True)

# start consuming (blocks)
channel.start_consuming()
connection.close()
 类似资料: