TLDR;在主题交换和由使用者动态创建的队列的上下文中,当没有使用者使用消息时,如何重新传递消息/通知生产者?
我有以下组件:
routing_key=file_category
向exchange发送消息。现在--这工作正常,但它仍然有一个主要问题。当前,如果发布服务器发送的消息带有没有使用者绑定的路由密钥,则该消息将丢失。这是因为即使消费者创建的队列是持久的,一旦消费者断开连接,它就会被销毁,因为它对这个消费者来说是唯一的。
消费者代码(python):
channel.exchange_declare(exchange=exchange_name, type='topic', durable=True)
result = channel.queue_declare(exclusive = True, durable=True)
queue_name = result.method.queue
topics = [ "pictures.*", "videos.trending" ]
for topic in topics:
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=topic)
channel.basic_consume(my_handler, queue=queue_name)
channel.start_consuming()
但是,如果生产者被告知没有消费者收到消息,“丢失”消息就可以接受(在这种情况下,它可以稍后重新发送消息)。我发现强制字段可能会有所帮助,因为AMQP的规范规定:
此标志告诉服务器,如果消息无法路由到队列,该如何响应。如果设置了此标志,服务器将使用返回方法返回不可路由的消息。
这确实有效--在生产者中,我可以注册一个ReturnListener
:
rabbitMq.confirmSelect();
rabbitMq.addReturnListener( (int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) -> {
log.info("A message was returned by the broker");
});
rabbitMq.basicPublish(exchangeName, "pictures.profile", true /* mandatory */, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBytes);
rabbitMq.addConfirmListener(new ConfirmListener() {
void handleAck(long deliveryTag, boolean multiple) throws IOException {
log.info("ACK message {}, multiple = ", deliveryTag, multiple);
}
void handleNack(long deliveryTag, boolean multiple) throws IOException {
log.info("NACK message {}, multiple = ", deliveryTag, multiple);
}
});
这里的问题是ACK是由代理发送的,而不是由消费者本身发送的。因此,当生产者发送带有路由密钥k的消息时:
参考文档:
send a message
on receive ACK:
if no basic.return was received for this message
the message was correctly consumed
else
the message wasn't correctly consumed
on receive basic.return
the message wasn't correctly consumed
在生产者中有一个“响应超时”逻辑。制片人发出一条信息。它希望processing_results队列中的此消息有一个“答案”。一个解决方案是,如果消息在X秒后仍未得到答复,则重新发送该消息。我不喜欢它,但它会在生产者中创造一些额外的棘手逻辑。
生成TTL为0的消息,并让生产者监听死信交换。这是官方建议的解决方案,以取代在RabbitMQ3.0中删除的“立即”标志(参见删除“立即”标志的段落)。根据死信交换的文档,只能在每个队列中配置一个死信交换。所以在这里行不通
[编辑]我看到的最后一个解决方案是让每个消费者创建一个在断开连接时不会被破坏的持久队列,并让它监听。示例:consumer1
创建queue-consumer-1
,它绑定到具有路由键abcd
的myexchange
的消息。我预见到的问题是,它意味着为每个使用者应用程序实例找到唯一的标识符(例如,它所运行的机器的主机名)。
我很想对此有一些投入-谢谢!
与:有关的:
>
rabbitMQ:带有主题交换的持久消息(此处不适用,因为队列是“动态”创建的)
如前所述,我最终实现了使用basic.return的东西。实际上,实现起来并不困难,您只需确保生成消息的方法和处理基本返回的方法是同步的(如果不在同一个类中,则有一个共享锁),否则最终可能会出现交织的执行流,这将扰乱您的业务逻辑。
我相信,对于关于标识未路由消息的部分,备用交换将是最适合您的用例的。
当具有配置的AE的交换无法将消息路由到任何队列时,它会将消息发布到指定的AE。
基本上,在创建“主”交换时,您将为它配置一个备用交换。对于引用的备用交换,我倾向于使用扇出,然后创建绑定到它的队列(notroutedq)。这意味着任何未发布到至少一个绑定到“主”交换的队列的消息都将在notroutedq中结束
关于你的声明:
因为即使消费者创建的队列是持久的,一旦消费者断开连接,它就会被销毁,因为它对这个消费者是唯一的。
似乎您已将队列配置为自动删除设置为true。如果是这样,在断开连接的情况下,如您所述,队列将被破坏,队列中仍然存在的消息将丢失,而备用exchange配置不包括这种情况。
使用主题交换,我希望有一个具有以下特性的发布/订阅消息传递模式: 是否实现了“发布者确认”。 让使用者在处理完每条消息后也确认该消息。 使用路由密钥将邮件路由到一个或多个使用者。 具有持久的使用者队列,因此如果使用者应用程序暂时关闭,它可以在重新启动时从其队列中拾取消息。 所以我创建了2个控制台应用程序(发送和接收)来测试上面的内容。 } 接收 问题是我的Send程序中的OnBasicAcks只会
发布者创建reply_to队列并发布到路由密钥,其中包含一条消息,告诉消费者向队列发送响应(RPC协议),以及一个传回的相关id,以便所有未来的结果都与该唯一标识符相关联 Exchange向绑定到该路由密钥的所有队列发送消息。这里,有两个消费者的两个队列,每个都绑定到路由密钥“泵” 一段时间后,消费者回复回队列,然后确认消息,以便他们的唯一队列删除发送到其队列的消息。每个收到消息的消费者都会这样做
这里我的问题是,当一个消费者正在处理一条消息时,队列中的其他消息是否有可能被另一个消费者消费? 如果是,如何确保在给定的时间只有一个使用者在使用队列中的消息?
我是AMQP的新手,正在尝试为RabbitMQ系统制定一个通知架构。 我想要一个主题交换(通知交换,比方说),特别是因为我想灵活地使用主题交换附带的路由密钥和队列,以及将来扩展该主题的更多选项。不过,我可能是错的,因为... 我还想让两个或更多的消费者使用每个通知。作为基线,我希望发布的每个通知都在数据库中结束。此外,我希望每个通知都可以由客户端应用程序使用(例如,web应用程序使用并进一步通过套
RabbitMQ在下列情况下会循环分发消息吗? RabbitMQ配置: 交换类型-主题 路由密钥-通知# 制片人正在将消息推送到上面的交流中,并遵循以下不同的主题 - notify.log.# , notify.status.#, notify.priceChange.# 有4个消费者在不同的服务器上运行。 > 3个消费者在负载均衡器下执行相同的处理并在同一应用程序的不同实例上运行。(他们想消费生
我有两个组id相同的消费者服务器订阅了相同的主题。kafka服务器仅使用一个分区运行。据我所知,消息应该在这两个消费者服务器中随机使用。但现在似乎总是同一个消费者服务器A消费消息,另一个不消费消息。如果我停止消费者服务器A,另一个将正常工作。我所期望的是,他们可以随机消费信息。