RabbitMQ实现了AMQP(advanced message queue protocol)定义的消息队列:从Publisher接收数据然后传递到Subscriber。它能保证多并发,数据安全传递,可扩展.
# example_publisher.py
import pika, os, logging
import time
logging.basicConfig()
# Parse CLODUAMQP_URL (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://user:password@192.168.1.58:5672/%2f')
params = pika.URLParameters(url)
params.socket_timeout = 5
connection = pika.BlockingConnection(params) # Connect to CloudAMQP
channel = connection.channel() # start a channel
channel.queue_declare(queue='pdfprocess') # Declare a queue
# send a message
count = 1;
while True:
count += 1
channel.basic_publish(exchange='', routing_key='pdfprocess', body='User information ' + str(count))
time.sleep(1)
print ("[x] Message sent to consumer " + str(count))
connection.close()
# example_consumer.py
import pika, os, time
def pdf_process_function(msg):
print(" PDF processing")
print(" [x] Received " + str(msg))
time.sleep(5) # delays for 5 seconds
print(" PDF processing finished");
return;
# Access the CLODUAMQP_URL environment variable and parse it (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://user:password@192.168.1.58:5672/%2f')
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel() # start a channel
channel.queue_declare(queue='pdfprocess') # Declare a queue
# create a function which is called on incoming messages
def callback(ch, method, properties, body):
pdf_process_function(body)
# set up subscription on the queue
channel.basic_consume('pdfprocess',
callback,
auto_ack=True)
# start consuming (blocks)
channel.start_consuming()
connection.close()