python 连接操作rabbitMQ 主要是使用pika库
安装:
pip install pika==1.0.1
注意: pika 1.x 与 pika 0.x 有一些不同,使用的时候需要看清版本使用,避免踩坑
Pika是用于Python的RabbitMQ(AMQP 0-9-1)客户端库
注: 官方对于pika有如下介绍:
Since threads aren’t appropriate to every situation, it doesn’t require threads.
Pika core takes care not to forbid them, either.
The same goes for greenlets, callbacks, continuations, and generators.
An instance of Pika’s built-in connection adapters isn’t thread-safe, however.
线程并不适用于每种场景, 因此并不要求使用线程。 但是pika并不禁用线程, 对于
greenlets, callbacks也一样。 一个pika建立的连接并不是线程安全的
因此在多线程中共享一个pika连接不是线程安全的, 当然也有一种使用:
with one exception: you may call the connection method add_callback_threadsafe from
another thread to schedule a callback within an active pika connection.
使用add_callback_threadsafe方法callback 一个pika连接从另外一个线程中
pika提供建立连接方式:
pika.adapters.asyncio_connection.AsyncioConnection - 用于python 3 AsyncIO的I/O异步模式
pika.BlockingConnection - 同步模式, 简单易用
pika.SelectConnection - 没有第三方依赖包的异步模式
pika.adapters.tornado_connection.TornadoConnection - 基于Tornado 的异步IO请求模式
pika.adapters.twisted_connection.TwistedProtocolConnection - 基于Twisted’的异步IO请求模式
例子:
#!/usr/bin/env python
# **********生产数据************
import pika
auth = pika.PlainCredentials('guid','guid')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='*',port=5672,virtual_host='/',credentials=auth))
channel = connection.channel()
channel.queue_declare(queue='TEST01')
channel.basic_publish(exchange='',
routing_key='TEST01333',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
#!/usr/bin/env python
# **********消费数据************
import pika
auth = pika.PlainCredentials('wangying','Ja21s11o07n')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.56.203.32',port=5672,virtual_host='/',credentials=auth))
channel = connection.channel()
channel.queue_declare(queue='TEST01')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(on_message_callback=callback,
queue='TEST01',
auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
备注:
connection = pika.BlockingConnection(pika.ConnectionParameters(host='*',port=5672))需要改为connection = pika.BlockingConnection(pika.ConnectionParameters(host='*',port=5672,virtual_host='/',credentials=auth))就是网上大部分都是没有传虚拟主机和用户名的问题