我想在Python中的Pika客户端库中使用RabbitMQ的直接回复功能。它适用于基本消费者。但它引发了发电机使用者的以下例外情况:
鼠兔。例外情况。ChannelClosedByBroker:(406,“前提条件_失败-快速回复使用者不存在”)
有没有一种方法可以将直接回复功能用于生成器使用者?
使用基本消费者的示例客户端代码(它的工作原理):
import pika
def handle(channel, method, properties, body):
message = body.decode()
print("received:", message)
connection = pika.BlockingConnection()
channel = connection.channel()
with connection, channel:
message = "hello"
channel.basic_consume(queue="amq.rabbitmq.reply-to",
on_message_callback=handle, auto_ack=True)
channel.basic_publish(
exchange="", routing_key="test", body=message.encode(),
properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
print("sent:", message)
channel.start_consuming()
使用生成器消费者的示例客户端代码(它引发了异常):
import pika
def handle(channel, method, properties, body):
message = body.decode()
print("received:", message)
connection = pika.BlockingConnection()
channel = connection.channel()
with connection, channel:
message = "hello"
channel.basic_publish(
exchange="", routing_key="test", body=message.encode(),
properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
print("sent:", message)
for (method, properties, body) in channel.consume(
queue="amq.rabbitmq.reply-to", auto_ack=True):
handle(channel, method, properties, body)
环境Windows 10、RabbitMQ 3.7.13、CPython 3.7.3、Pika 1.0.1。
注-在使用基本使用者的示例客户端代码中的basic\u publish
方法之后调用basic\u consume
方法会引发与使用生成器使用者相同的异常:
import pika
def handle(channel, method, properties, body):
message = body.decode()
print("received:", message)
connection = pika.BlockingConnection()
channel = connection.channel()
with connection, channel:
message = "hello"
channel.basic_publish(
exchange="", routing_key="test", body=message.encode(),
properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
print("sent:", message)
channel.basic_consume(queue="amq.rabbitmq.reply-to",
on_message_callback=handle, auto_ack=True)
channel.start_consuming()
我推荐pika文档中给出的示例。
这对我也很管用。
这就是地点
正如Luke Bakken在这里所建议的,这就是诀窍:
import pika
def handle(channel, method, properties, body):
message = body.decode()
print("received:", message)
connection = pika.BlockingConnection()
channel = connection.channel()
with connection, channel:
message = "hello"
next(channel.consume(queue="amq.rabbitmq.reply-to", auto_ack=True,
inactivity_timeout=0.1))
channel.basic_publish(
exchange="", routing_key="test", body=message.encode(),
properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
print("sent:", message)
for (method, properties, body) in channel.consume(
queue="amq.rabbitmq.reply-to", auto_ack=True):
handle(channel, method, properties, body)
我想让flume代理位于hadoop集群之外,并想知道是否有可能使用flume通过WebHDFS向hadoop集群发送消息。 如果没有,是否有使用WebHDFS的替代方案?使用多层水槽层仍然需要我在hadoop集群中运行水槽代理。
是否可以在AWS Lambda中构建一个函数来创建websocket并将数据发送到订阅的应用程序? 类似这样: John在他的手机中打开了应用程序SuperPhotoApp,但决定使用桌面浏览器将照片上传到SuperPhotoApp服务(S3 Bucket),此事件执行创建套接字的Lambda函数。io服务器并将更新推送到所有订户,他的手机打开了应用程序,因此应用程序会自动更新新照片。 这可以通过
我一直使用ElasticsearchIntegrationTest在没有elasticsearch服务器的情况下测试我的代码。我的类中的私有成员很少,所以考虑使用Powermock访问私有成员。在运行测试时,我得到异常。 Powermock是否可以与ElasticSearchintegrationTest一起使用?? 代码: 例外情况: 原因:javassist.CanNotCompileExce
问题内容: 我对正则表达式很糟糕,但是我想知道是否可以将ng-pattern与变量一起使用 例如, 其中validationCode是控制器中附加到$ scope的变量 如果 则ng-pattern将是 但这不起作用,似乎我需要创建一个我真的不想要的自定义指令 问题答案: 需要一个正则表达式。 从Angular的文档中有关: 如果该值与模式表达式不匹配,则设置模式验证错误键。期望值用于内联模式或定
我对Spring webflux和protobuf都是新手。我一直在读一些东西,我发现它们之间有一些相似之处。喜欢 Spring webflow可以部署在netty上,gRPC也是如此。 两者都适用于流数据。 这两个框架都在某种程度上基于观察者设计模式,支持基于均匀的数据处理方法。 然而,我仍然找不到任何结合webflux(反应式编程)、gRPC(更快的数据编码和解码)和Spring(依赖注入)功
我想通过代理服务器连接到Azure Service Bus消息队列。我在Spring应用程序中使用流绑定库 波姆。xml: 应用yml 我试图通过命令行参数提供HTTP和SOCKS代理设置,但这似乎不起作用。是否可以为“examplehost.servicebus.windows.net”的连接提供SOCKS或HTTP代理?