当前位置: 首页 > 知识库问答 >
问题:

如何使 ActiveMQ 代理删除脱机持久订阅者

宁浩博
2023-03-14

我们有一个ActiveMQ代理,它使用JMS、AMQP和MQTT从非常不同的客户端连接到。出于某种原因,我们还没有弄清楚一组特定的MQTT客户端经常(不总是)持久订阅。这是一个测试环境,客户端经常被添加和删除,后者有时通过拔掉插头或重新启动嵌入式设备,因此它们无法正确取消订阅。效果(IIUC)是代理为可能再也见不到的设备堆积“离线持久订阅”(我可以在超文本传输协议下看到这些),永远保留关于这些主题的消息,直到它最终因自己的内存占用而崩溃。

这里的问题是订户的订阅是持久的,我们需要找出为什么会这样。然而,我们也决定,客户这样做(无意中)不应该让代理陷入停滞,所以我们需要独立解决这个问题。

我发现有离线持久订阅超时的设置,并将它们放入我们的代理配置中(最后两行):

<broker
  xmlns="http://activemq.apache.org/schema/core" 
  brokerName="my_broker"
  dataDirectory="${activemq.data}" 
  useJmx="true"
  advisorySupport="false" 
  persistent="false"
  offlineDurableSubscriberTimeout="1800000"
  offlineDurableSubscriberTaskSchedule="60000">

如果我没理解错的话,上面应该是每分钟检查一次,把它半个小时没见的客户辞退。然而,与文档相反,这似乎不起作用:我几天前已经订阅然后拔掉插头的消费者仍然在离线持久订户列表中可见,代理的内存占用量不断增加,如果我在代理的web界面中手动删除订户,我可以看到内存占用量下降。

所以这是我的问题:

    < li >什么决定了ActiveMQ broker上对某个主题的MQTT订阅是否持久? < li >在ActiveMQ设置中设置删除脱机持久订阅的超时时,我做错了什么?

共有1个答案

娄建义
2023-03-14

我提取了删除超时持久订阅的相关代码(< code>doCleanup())。

在成功案例中,它执行:

    LOG.info("Destroying durable subscriber due to inactivity: {}", sub);

在故障情况下,它执行:

    LOG.error("Failed to remove inactive durable subscriber", e);

在日志文件中查找上面的日志行,并将其与您使用admin/subscribers观察到的详细信息相匹配。jsp查看器。如果它不打印任何行,订阅可能由于某种原因仍然处于活动状态,或者您可能遇到了错误。

另外,如果可以的话,你能尝试删除经纪人名称中的下划线(_)吗?该手册讨论了代理名称中下划线的问题。

法典:

public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
   super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
   if (broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 && broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1) {
      this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true);
      this.cleanupTask = new TimerTask() {
         @Override
         public void run() {
            doCleanup();
         }
      };
      this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(),broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule());
   }
}

public void doCleanup() {
   long now = System.currentTimeMillis();
   for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : durableSubscriptions.entrySet()) {
      DurableTopicSubscription sub = entry.getValue();
      if (!sub.isActive()) {
         long offline = sub.getOfflineTimestamp();
         if (offline != -1 && now - offline >= broker.getBrokerService().getOfflineDurableSubscriberTimeout()) {
            LOG.info("Destroying durable subscriber due to inactivity: {}", sub);
            try {
               RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
               info.setClientId(entry.getKey().getClientId());
               info.setSubscriptionName(entry.getKey().getSubscriptionName());
               ConnectionContext context = new ConnectionContext();
               context.setBroker(broker);
               context.setClientId(entry.getKey().getClientId());
               removeSubscription(context, info);
            } catch (Exception e) {
               LOG.error("Failed to remove inactive durable subscriber", e);
            }
         }
      }
   }
}

// The toString method for DurableTopicSubscription class
@Override
public synchronized String toString() {
    return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount() + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension();
}
 类似资料:
  • 我想配置一个持久主题,但我想配置Apache ActiveMQ Artemis将为该主题的非活动持久订阅服务器保留消息的时间。 例如,类似“为非活动的持久订阅服务器保存持久消息长达30秒”的内容。如果订阅服务器在30秒内没有变为活动状态,则当订阅服务器变为活动状态时,消息将不再可用。

  • 我有一个持久订阅ActiveMQ的JMS代理,在WSO2 ESB V4.5.1和V4.7中运行良好 我试图在WSO2 ESB V4.8.0(里程碑3或4)中使用它,但没有成功 /repository/conf/axis2/axis2.xml中的JMS配置: -->我一直使用tcp://localhost:61616和nio://localhost:61616来实现sames效果 在ESB控制台中看

  • 我正在尝试编写一个基本的 ActiveMQ 客户端来侦听一个主题。我正在使用Spring启动活动MQ。我有一个基于各种教程构建的实现,它使用默认Jms列表容器工厂,但是我在使其正常工作时遇到了一些问题。 一切都很好,直到我尝试获得一个持久的订阅。当我这样做时,我发现在容器工厂中设置了客户机id,我得到了一个关于如何在共享连接上设置客户机id的错误。 <code>原因:共享连接的代理不支持setCl

  • 在我们的业务需求中,我们需要将更新传输到分布在全国各地的数千个客户端。问题是,许多这些客户端使用3G网络连接到我们,因此,发生了许多连接/断开连接...我们需要提供的更新是诸如“企业A不能再兑现”或“企业B能够再次兑现”之类的东西,我们正在考虑使用ActiveMQ持久主题来提供这些更新。我的理解是,一旦客户端连接到持久主题,即使他断开连接,每当他回来时,他都会在脱机时收到发送到该主题的消息。最大的

  • 我需要为ActiveMQ创建一个主题和一个持久订阅者,我的问题是我不知道在哪里指定它。我可以创建主题并使用消息,但是当我关闭订阅者然后继续发送消息并再次打开订阅者时,它不会读取它们。 这是我目前掌握的情况: 发送消息: 接收消息: 我已经阅读了这篇文章,我明白我需要创建持久订阅者。 我也读过spring文档 我认为它与(我没有实现,我使用的是默认配置)有关,文档显示: 但是我似乎找不到在哪里创建持

  • 我有一个Red Hat AMQ(基于ActiveMQ Artemis)代理,我希望使用持久订阅(或等效)特性,这样我将有多个OpenWire JMS订阅者订阅我们的应用程序的事件,这些事件将可靠地交付给他们。 我想预先配置订户,这样就可以省去我在初始应用程序启动时的麻烦。我希望避免初始应用程序启动的情况,即在持久订阅服务器执行初始订阅之前,主应用程序开始运行并发布事件。 有什么方法可以预先配置持久