当前位置: 首页 > 工具软件 > python pika > 使用案例 >

python使用pika操作rabbitmq总结

谯乐池
2023-12-01

python 连接操作rabbitMQ 主要是使用pika库

安装:

pip install pika==1.0.1


注意: pika 1.x 与 pika 0.x 有一些不同,使用的时候需要看清版本使用,避免踩坑

Pika是用于Python的RabbitMQ(AMQP 0-9-1)客户端库

注: 官方对于pika有如下介绍:

Since threads aren’t appropriate to every situation, it doesn’t require threads. 
Pika core takes care not to forbid them, either. 
The same goes for greenlets, callbacks, continuations, and generators.
 An instance of Pika’s built-in  connection adapters isn’t thread-safe, however.


线程并不适用于每种场景, 因此并不要求使用线程。 但是pika并不禁用线程, 对于

greenlets, callbacks也一样。 一个pika建立的连接并不是线程安全的

因此在多线程中共享一个pika连接不是线程安全的, 当然也有一种使用:

with one exception: you may call the connection method add_callback_threadsafe from
 another thread to schedule a callback within an active pika connection.


使用add_callback_threadsafe方法callback 一个pika连接从另外一个线程中

pika提供建立连接方式:

pika.adapters.asyncio_connection.AsyncioConnection - 用于python 3 AsyncIO的I/O异步模式
pika.BlockingConnection - 同步模式, 简单易用
pika.SelectConnection - 没有第三方依赖包的异步模式
pika.adapters.tornado_connection.TornadoConnection - 基于Tornado 的异步IO请求模式
pika.adapters.twisted_connection.TwistedProtocolConnection - 基于Twisted’的异步IO请求模式

例子:

  1. 生产者
     
    
    #!/usr/bin/env python
    # **********生产数据************
    import pika
    
    auth = pika.PlainCredentials('guid','guid')
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='*',port=5672,virtual_host='/',credentials=auth))
    channel = connection.channel()
    
    channel.queue_declare(queue='TEST01')
    
    channel.basic_publish(exchange='',
                          routing_key='TEST01333',
                          body='Hello World!')
    print(" [x] Sent 'Hello World!'")
    connection.close()

     
  2. 消费者
    #!/usr/bin/env python
    # **********消费数据************
    import pika
    
    auth = pika.PlainCredentials('wangying','Ja21s11o07n')
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.56.203.32',port=5672,virtual_host='/',credentials=auth))
    channel = connection.channel()
    
    channel.queue_declare(queue='TEST01')
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
    
    channel.basic_consume(on_message_callback=callback,
                          queue='TEST01',
                          auto_ack=True)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

    备注:

  • 错误:pika.exceptions.ProbableAuthenticationError: ConnectionClosedByBroker: (403) 'ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.'
    这个错误的原因就是在连接时没有传主机和账户用户名,
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='*',port=5672))需要改为connection = pika.BlockingConnection(pika.ConnectionParameters(host='*',port=5672,virtual_host='/',credentials=auth))就是网上大部分都是没有传虚拟主机和用户名的问题
  • 错误:connection_configs does not support iteration: TypeError("'Blocki login was refused using authentication mechanism plain
    这个问题找了半天,最后发现是由于防火墙的问题,我把防火墙给关闭后还不行,最后重启了下服务器可以了,可能是关闭防火墙后由于cdn问题导致的没有生效。
    解决方法:出入站添加端口5672的出入站规则和15672的出入站规则,这样就可以远程访问和远程连接了

 类似资料: