当前位置: 首页 > 知识库问答 >
问题:

在Python中是否可以将RabbitMQ直接回复功能与Pika生成器使用者一起使用?

段超
2023-03-14

我想在Python中的Pika客户端库中使用RabbitMQ的直接回复功能。它适用于基本消费者。但它引发了发电机使用者的以下例外情况:

鼠兔。例外情况。ChannelClosedByBroker:(406,“前提条件_失败-快速回复使用者不存在”)

有没有一种方法可以将直接回复功能用于生成器使用者?

使用基本消费者的示例客户端代码(它的工作原理):

import pika


def handle(channel, method, properties, body):
    message = body.decode()
    print("received:", message)


connection = pika.BlockingConnection()
channel = connection.channel()

with connection, channel:
    message = "hello"
    channel.basic_consume(queue="amq.rabbitmq.reply-to",
                          on_message_callback=handle, auto_ack=True)
    channel.basic_publish(
        exchange="", routing_key="test", body=message.encode(),
        properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
    print("sent:", message)
    channel.start_consuming()

使用生成器消费者的示例客户端代码(它引发了异常):

import pika


def handle(channel, method, properties, body):
    message = body.decode()
    print("received:", message)


connection = pika.BlockingConnection()
channel = connection.channel()

with connection, channel:
    message = "hello"
    channel.basic_publish(
        exchange="", routing_key="test", body=message.encode(),
        properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
    print("sent:", message)

    for (method, properties, body) in channel.consume(
            queue="amq.rabbitmq.reply-to", auto_ack=True):
        handle(channel, method, properties, body)

环境Windows 10、RabbitMQ 3.7.13、CPython 3.7.3、Pika 1.0.1。

注-在使用基本使用者的示例客户端代码中的basic\u publish方法之后调用basic\u consume方法会引发与使用生成器使用者相同的异常:

import pika


def handle(channel, method, properties, body):
    message = body.decode()
    print("received:", message)


connection = pika.BlockingConnection()
channel = connection.channel()

with connection, channel:
    message = "hello"
    channel.basic_publish(
        exchange="", routing_key="test", body=message.encode(),
        properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
    print("sent:", message)
    channel.basic_consume(queue="amq.rabbitmq.reply-to",
                          on_message_callback=handle, auto_ack=True)
    channel.start_consuming()

共有2个答案

杨曜瑞
2023-03-14

我推荐pika文档中给出的示例。

这对我也很管用。

这就是地点

孔和畅
2023-03-14

正如Luke Bakken在这里所建议的,这就是诀窍:

import pika


def handle(channel, method, properties, body):
    message = body.decode()
    print("received:", message)


connection = pika.BlockingConnection()
channel = connection.channel()

with connection, channel:
    message = "hello"
    next(channel.consume(queue="amq.rabbitmq.reply-to", auto_ack=True,
                         inactivity_timeout=0.1))
    channel.basic_publish(
        exchange="", routing_key="test", body=message.encode(),
        properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
    print("sent:", message)

    for (method, properties, body) in channel.consume(
            queue="amq.rabbitmq.reply-to", auto_ack=True):
        handle(channel, method, properties, body)
 类似资料:
  • 我想让flume代理位于hadoop集群之外,并想知道是否有可能使用flume通过WebHDFS向hadoop集群发送消息。 如果没有,是否有使用WebHDFS的替代方案?使用多层水槽层仍然需要我在hadoop集群中运行水槽代理。

  • 是否可以在AWS Lambda中构建一个函数来创建websocket并将数据发送到订阅的应用程序? 类似这样: John在他的手机中打开了应用程序SuperPhotoApp,但决定使用桌面浏览器将照片上传到SuperPhotoApp服务(S3 Bucket),此事件执行创建套接字的Lambda函数。io服务器并将更新推送到所有订户,他的手机打开了应用程序,因此应用程序会自动更新新照片。 这可以通过

  • 我一直使用ElasticsearchIntegrationTest在没有elasticsearch服务器的情况下测试我的代码。我的类中的私有成员很少,所以考虑使用Powermock访问私有成员。在运行测试时,我得到异常。 Powermock是否可以与ElasticSearchintegrationTest一起使用?? 代码: 例外情况: 原因:javassist.CanNotCompileExce

  • 问题内容: 我对正则表达式很糟糕,但是我想知道是否可以将ng-pattern与变量一起使用 例如, 其中validationCode是控制器中附加到$ scope的变量 如果 则ng-pattern将是 但这不起作用,似乎我需要创建一个我真的不想要的自定义指令 问题答案: 需要一个正则表达式。 从Angular的文档中有关: 如果该值与模式表达式不匹配,则设置模式验证错误键。期望值用于内联模式或定

  • 我对Spring webflux和protobuf都是新手。我一直在读一些东西,我发现它们之间有一些相似之处。喜欢 Spring webflow可以部署在netty上,gRPC也是如此。 两者都适用于流数据。 这两个框架都在某种程度上基于观察者设计模式,支持基于均匀的数据处理方法。 然而,我仍然找不到任何结合webflux(反应式编程)、gRPC(更快的数据编码和解码)和Spring(依赖注入)功

  • 我想通过代理服务器连接到Azure Service Bus消息队列。我在Spring应用程序中使用流绑定库 波姆。xml: 应用yml 我试图通过命令行参数提供HTTP和SOCKS代理设置,但这似乎不起作用。是否可以为“examplehost.servicebus.windows.net”的连接提供SOCKS或HTTP代理?