当前位置: 首页 > 知识库问答 >
问题:

RabbitMQ在使用者重新启动后停止向使用者传递MQTT消息

孟福
2023-03-14

我有一只兔子。OpenShift上的6.10实例在启用MQTT插件的情况下运行。我有多个Spring Boot应用程序,使用EclipsePAHOMQTT实现,使用RabbitMQ队列中的消息。所有使用者都使用MqttDefaultFilePersistence。持久性数据被写入具有100M配额的持久性卷上的目录/tmp/mqtt。这是我为连接RabbitMQ队列而编写的代码

    MqttConnectOptions options = new MqttConnectOptions();
    options.setUserName(username);
    options.setPassword(password.toCharArray());
    options.setCleanSession(false);
    options.setAutomaticReconnect(reconnect);
    MqttClient mqttClient = new MqttClient(serverUri, "MyConsumerApp", new MqttDefaultFilePersistence("/tmp/mqtt"));
    mqttClient.connect(options);

现在我意识到,由于某些原因,如果重新启动消费应用程序,它将不再使用任何消息。正如我在RabbitMQ管理控制台中看到的那样,队列中的消息正在被填满。RabbitMQ日志中没有错误消息。在这种情况下唯一有效的解决方案是删除RabbitMQ管理控制台中的订阅队列,然后重新启动使用者应用程序。也许有人遇到过类似的问题?有人能告诉我这是怎么回事吗?也许RabbitMQ使用的MQTT插件和保存消息状态的PAHO实现存在问题。看起来RabbitMQ在消费者断开连接后不知道在哪里恢复,所以它只是停止传递消息。有点悲观的锁定…:-)

共有1个答案

杨乐
2023-03-14

您必须使用持久队列。如果队列填满是问题,那么您可以使用自动删除队列。但这有一个缺点,即一次尝试传递消息,如果没有消费者,则队列将被删除,消息将丢失。

对于你的情况,你应该看看死信交换(https://www.rabbitmq.com/dlx.html)

如果消费者在一段时间后被切断并重新连接,未处理的消息可以从死信交换服务。这样消息的顺序也不会丢失。

 类似资料:
  • 我在项目中使用solace作为JMS提供者。我使用spring CachingConnectionFactory检索连接。在这个连接上,我创建了一个新会话。我在那个会话中创建了一个消费者的线程。 我正在做一些故障转移测试。当我将服务器从网络连接上拔下时,它会失败。当我再次连接服务器时,仍会收到相同的异常: 更重要的是,CachingConnectionFactory默认将reConnectOnEx

  • 我有一个PHP应用程序,使用RabbitMQ。为了实现冗余,我创建了一对RabbitMQ服务器,并将它们连接到一个集群中。我也有一个VyOS故障转移群集运行HAProxy负载平衡连接,并在故障转移的情况下提供重新路由。 昨天,我们的VyOS集群决定需要故障转移(可能是短暂的网络中断)。在一个VyOS上停止了HA代理,虚拟IP被移动,并在另一个节点上重新启动HA代理。 之后,我查看了Rabbit中的

  • 我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:

  • 我们有一个用例,其中我们只创建一个消费者来处理队列中的消息。消息处理器在确认之前积累一定数量的消息。以异步方式接收消息并使用事务会话。消息的大小非常小。 在一定数量的消息之后,主动MQ停止向唯一的消费者发送进一步的消息,并等待确认。我们尝试过像consumer.prefetchSize,consumer . maximumpendingmessagelimit;但是什么都不管用。我们用一个只有一个

  • 我为RabbitMQ制作了一个消费者,作为一个用C#.NET编写的控制台应用程序。它被编程为永久监听队列,每当它在队列中发现消息时,它就处理它。使用者平均每秒处理35条消息。使用者被安排在系统启动时在任务计划程序中运行。消费者运行良好的3-4天。但是,它们继续运行,但不处理任何消息,尽管队列中有消息。当使用者停止并再次启动时,它再次开始正确处理消息。但是,当您手动重新启动时,数以百万计的消息排在队

  • 我正在使用RabbitMQ和Spring amqp,我希望它们不会丢失消息。通过对重试使用指数回退策略,我可能会阻止我的消费者,因为他们可能正在处理他们可以处理的消息。我想给失败的消息几天时间重试指数回退策略,但我不想让使用者阻塞几天,我想让它继续对其他消息工作。 我知道我们可以通过ActiveMQ(在将来的某个时刻重试消息(ActiveMQ))实现这种功能,但无法为RabbitMQ找到类似的解决