当前位置: 首页 > 工具软件 > Kombu > 使用案例 >

kombu------python的消息库

轩辕亮
2023-12-01

kombu是一个python的消息库。

Kombu的目标是通过为AMQP协议提供一个地道的高层次接口,来使python中的消息编程更为简单。同时也为通用的消息问题提供试验和测试的解决方案。


AMQP是先进的消息队列协议,一个提供消息定位,队列,路由,高靠性和安全性的开放的标准协议,其中RabbitMQ消息服务是一个流行的实现。


特性

通过可扩展的通道允许应用程序支持多种消息服务方案。

a. AMQP通道使用py-amqp,librabbitmq,或者qpid-python客户端库。

b.如果你安装了librabbitmq,当使用librabbitmq时,C实现的高性能的AMQP通道会自动使能。

$ pip install librabbitmq
c.虚拟的通道使增加对非AMQP通道的支持变得十分容易。现在已经有对   Redis Beanstalk Amazon SQS CouchDB MongoDB ZeroMQ ZooKeeper SoftLayer MQ  和  Pyro的内置支持。

d.你也可以通过使用SQLAlchemy和Django ORM通道来使用数据库作为中间人。

e.用于单元测试的内存通道。


支持自动编码,序列化和对消息负载进行压缩。

跨通道的统一异常处理。

有能力确保操作能够优雅地处理连接和通过错误。

amqplib的一些讨厌错误已经修正,比如支持超时和等待多个通道的事件的能力。

已经使用了carrot的程序可以通过一个兼容层很容易地进行移植。


要了解AMQP的相关介绍,可以参考以下链接:

Rabbits and warrens

Wikipedia article about AMQP.


不同通道之间的比较:

ClientTypeDirectTopicFanout
amqpNativeYesYesYes
qpidNativeYesYesYes
redisVirtualYesYesYes (PUB/SUB)
mongodbVirtualYesYesYes
beanstalkVirtualYesYes [1]No
SQSVirtualYesYes [1]Yes [2]
couchdbVirtualYesYes [1]No
zookeeperVirtualYesYes [1]No
in-memoryVirtualYesYes [1]No
djangoVirtualYesYes [1]No
sqlalchemyVirtualYesYes [1]No
SLMQVirtualYesYes [1]No

[1]仅仅在内存中支持,所以exchanges/queues必须在所有需要它们的客户端中声明。

[2]通过在SimpleDB中存储路由表来提供对扇出的支持。默认是不使能的,通过通道的supports_fanout选项来使能。


文档

Kombu使用了Sphinx,最新的文档可以在这里找到:

http://kombu.readthedocs.org/


快速浏览

from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')

def process_media(body, message):
    print body
    message.ack()

# connections
with Connection('amqp://guest:guest@localhost//') as conn:

    # produce
    producer = conn.Producer(serializer='json')
    producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
                      exchange=media_exchange, routing_key='video',
                      declare=[video_queue])

    # the declare above, makes sure the video queue is declared
    # so that the messages can be delivered.
    # It's a best practice in Kombu to have both publishers and
    # consumers declare the queue.  You can also declare the
    # queue manually using:
    #     video_queue(conn).declare()

    # consume
    with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
        # Process messages and handle events on all channels
        while True:
            conn.drain_events()

# Consume from several queues on the same channel:
video_queue = Queue('video', exchange=media_exchange, key='video')
image_queue = Queue('image', exchange=media_exchange, key='image')

with connection.Consumer([video_queue, image_queue],
                         callbacks=[process_media]) as consumer:
    while True:
        connection.drain_events()
或者手动处理channel:

with connection.channel() as channel:
    producer = Producer(channel, ...)
    consumer = Producer(channel)
所有的对象都可以在with表达式之外使用,记得在使用完之后关闭对象:

from kombu import Connection, Consumer, Producer

connection = Connection()
    # ...
connection.release()

consumer = Consumer(channel_or_connection, ...)
consumer.register_callback(my_callback)
consumer.consume()
    # ....
consumer.cancel()
交换器和队列可以简单地声明然后被pickled和在配置文件中使用。

它们也支持操作,但是要对它们操作需要绑定一个channel。

将交换器和队列绑定到连接将使它们使用这个连接的默认通道。

>>> exchange = Exchange('tasks', 'direct')

>>> connection = Connection()
>>> bound_exchange = exchange(connection)
>>> bound_exchange.delete()

# the original exchange is not affected, and stays unbound.
>>> exchange.delete()
raise NotBoundError: Can't call delete on Exchange not bound to
    a channel.

安装

你可以通过Python Package Index(PyPI)安装或者通过源码安装。

通过pip安装:

$ pip install kombu
通过easy_install安装:

$ easy_install kombu
如果你下载了源码,也可以向下面这样安装:

$ python setup.py build
# python setup.py install # as root

术语

在开始进一步学习之前,你需要了解一些术语:

  • 生产者

    生产者发送消息到交换器。

  • 交换器

    消息被发送到交换器. 交换器是有名称的,而且可以被配置使用多个路由算法。交换器通过匹配消息中的路由键值来将消息路由给消费者。消费者绑定到交换器时需要提供关心的路由键值。

  • 消费者

    消费者声明一个队列,将它绑定到交换器并从中接收消息。

  • 队列

    队列接收发送到交换器的消息。队列是被消费者声明的。

  • 路由键

    每个消息都有一个路由键。对路由键的解释依懒于交换器的类型。标准AMQP默认有4种交换器类型, 不同的实现可以自定义类型 (需要查看相应手册获取相关信息).

    下面是 AMQP/0.8的定义的默认交换器类型:

    • Direct 交换器

      如果一个消息的路由键属性和消费者的路由键值一致则匹配。

    • Fan-out交换器

      总是匹配, 尽管绑定不有一个路由键值。

    • Topic 交换器

      通过一个原生的类型匹配模式来匹配消息的路由键。消息的路由键值由多个"."分隔的单词组成。(”.”,就像域名一样), 有2个特殊的符号可以使用; 星号 (“*”)和 (“#”). "*"匹配任何单词 ,"#"匹配0个或多个单词. 比如“*.stock.#” 匹配“usd.stock” 和“eur.stock.db” ,但是不匹配“stock.nasdaq”.

获得帮助

邮件列表

carrot-users 


Bug跟踪

 http://github.com/celery/kombu/issues/


贡献代码

Github: http://github.com/celery/kombu


开源协议:

BSD

 类似资料: