当前位置: 首页 > 知识库问答 >
问题:

未清理的MQTT订阅队列

那绪
2023-03-14

我的发现

根据我的研究,每个队列用于存储发送到每个客户端订阅的主题的消息。例如,当同一个主题(例如1/2/3)被3个不同的客户端(例如AB、&C_1/2/3)订阅时,将有3个订阅队列(即A_1/2/3、&C_1/2/3)。因此,当消息发送到主题1/2/3时,Artemis将把这些消息放在订阅队列中A_1/2/3B_1/2/3C_1/2/3

实际问题

我们的broker.xml

<configuration xmlns="urn:activemq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:xi="http://www.w3.org/2001/XInclude"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

   <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="urn:activemq:core ">

      <name>0.0.0.0</name>
      <security-enabled>true</security-enabled>


      <persistence-enabled>false</persistence-enabled>

      <!-- this could be ASYNCIO, MAPPED, NIO
           ASYNCIO: Linux Libaio
           MAPPED: mmap files
           NIO: Plain Java Files
       -->
      <journal-type>ASYNCIO</journal-type>

      <paging-directory>data/paging</paging-directory>

      <bindings-directory>data/bindings</bindings-directory>

      <journal-directory>data/journal</journal-directory>

      <large-messages-directory>data/large-messages</large-messages-directory>

      <journal-datasync>true</journal-datasync>

      <journal-min-files>2</journal-min-files>

      <journal-pool-files>10</journal-pool-files>

      <journal-file-size>10M</journal-file-size>
      
      <journal-buffer-timeout>64000</journal-buffer-timeout>


      <!--
        When using ASYNCIO, this will determine the writing queue depth for libaio.
       -->
      <journal-max-io>4096</journal-max-io>
      <!--
        You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
         <network-check-NIC>theNicName</network-check-NIC>
        -->

      <!--
        Use this to use an HTTP server to validate the network
         <network-check-URL-list>http://www.apache.org</network-check-URL-list> -->

      <!-- <network-check-period>10000</network-check-period> -->
      <!-- <network-check-timeout>1000</network-check-timeout> -->

      <!-- this is a comma separated list, no spaces, just DNS or IPs
           it should accept IPV6

           Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
                    Using IPs that could eventually disappear or be partially visible may defeat the purpose.
                    You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
      <!-- <network-check-list>10.0.0.1</network-check-list> -->

      <!-- use this to customize the ping used for ipv4 addresses -->
      <!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->

      <!-- use this to customize the ping used for ipv6 addresses -->
      <!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->




      <!-- how often we are looking for how many bytes are being used on the disk in ms -->
      <disk-scan-period>5000</disk-scan-period>

      <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
           that won't support flow control. -->
      <max-disk-usage>99</max-disk-usage>

      <!-- should the broker detect dead locks and other issues -->
      <critical-analyzer>true</critical-analyzer>

      <critical-analyzer-timeout>120000</critical-analyzer-timeout>

      <critical-analyzer-check-period>60000</critical-analyzer-check-period>

      <critical-analyzer-policy>HALT</critical-analyzer-policy>
      

      <acceptors>

         <!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
         <!-- amqpCredits: The number of credits sent to AMQP producers -->
         <!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->

         <!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
                    "anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
                    See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->

         <!-- MQTT Acceptor -->
         <acceptor name="mqtt">tcp://0.0.0.0:${activeMQ-mqtt-port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,MQTT;useEpoll=true</acceptor>
         <acceptor name="netty-ssl-acceptor">tcp://0.0.0.0:${activeMQ-mqtts-port}?sslEnabled=true;keyStorePath=${activeMQ-keystore-path};keyStorePassword=${activeMQ-keyStore-password};protocols=CORE,MQTT</acceptor>

      </acceptors>


      <security-settings>
         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="amq"/>
            <permission type="deleteNonDurableQueue" roles="amq"/>
            <permission type="createDurableQueue" roles="amq"/>
            <permission type="deleteDurableQueue" roles="amq"/>
            <permission type="createAddress" roles="amq"/>
            <permission type="deleteAddress" roles="amq"/>
            <permission type="consume" roles="amq"/>
            <permission type="browse" roles="amq"/>
            <permission type="send" roles="amq"/>
            <!-- we need this otherwise ./artemis data imp wouldn't work -->
            <permission type="manage" roles="amq"/>
         </security-setting>
      </security-settings>

      <address-settings>
         <!-- if you define auto-create on certain queues, management has to be auto-create -->
         <address-setting match="activemq.management#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
         <!--default for catch all-->
         <address-setting match="#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
      </address-settings>

      <addresses>
         <address name="DLQ">
            <anycast>
               <queue name="DLQ" />
            </anycast>
         </address>
         <address name="ExpiryQueue">
            <anycast>
               <queue name="ExpiryQueue" />
            </anycast>
         </address>

      </addresses>

      <broker-plugins>
         <broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
            <property key="LOG_ALL_EVENTS" value="true"/>
            <property key="LOG_CONNECTION_EVENTS" value="true"/>
            <property key="LOG_SESSION_EVENTS" value="true"/>
            <property key="LOG_CONSUMER_EVENTS" value="true"/>
            <property key="LOG_DELIVERING_EVENTS" value="true"/>
            <property key="LOG_SENDING_EVENTS" value="true"/>
            <property key="LOG_INTERNAL_EVENTS" value="true"/>
         </broker-plugin>
      </broker-plugins>
   </core>
</configuration>

共有1个答案

斜成济
2023-03-14

当订阅完成时,它实际上是由订阅服务器来删除它的订阅。换句话说,MQTT客户端应该在创建新的订阅服务器之前取消对现有订阅服务器的订阅。代理无法知道订阅服务器是否计划稍后重新连接以从其订阅中获取消息。

如果确实希望代理删除订阅队列,可以使用以下地址设置:

  • :必须true.
  • :这是从最后一个使用者断开连接到代理删除订阅队列的延迟(毫秒)。
  • :必须为-1,才能忽略订阅队列中的任何消息。
 类似资料:
  • 更新时间:2019-10-31 10:53:29 节点简介 MQTT订阅节点开启对设备的监听,设备的属性,事件,工作状态等都可以进行监听,可以用来获取设备的实时信息 使用场景 可以用来监听设备当前的工作状态,是否正常,例如设备离线了,设备异常,用户可以方便的知道设备的实时情况 配置项 Topic配置 增加需要侦听的Topic 支持订阅项目下产品/设备的所有有发布权限的Topic,输入格式示例:/s

  • 有人知道我在哪里可以得到一些示例MQTT客户端Go(golang)代码,它在无限循环中发布和订阅? null 下面是我正在使用的代码: 我翻阅了GoDocs寻找一些关于如何保持连接畅通的提示,但似乎没有什么相关的。我当然可以在subscribe上执行无限循环,但这似乎效率低下。

  • 我正在尝试实现Spring与MQTT的集成。我正在使用Mosquitto作为MQTT经纪人。并在下面的链接中提供了文档的引用。我已经创建了一个项目并添加了所有所需的jar文件。当我执行时。 当我通过MQTT代理发布消息时,我收到以下错误。 添加@SpringBootApplication注释后,请找到下面的堆栈跟踪

  • 我在云上有一个cumulocity物联网平台。 我不明白如何将cumulocity应用程序连接和订阅到代理MQTT。

  • 问题内容: 我正在使用Paho发送和接收mqtt消息。到目前为止,发送消息一直没有问题,我正在使用mosquitto接收消息。 现在,我想使用Java客户端读取消息,并且注意到关于接收消息的文档越来越少。 我实现了MqttCallback接口,但仍然无法弄清楚如何阅读已订阅的主题的消息。 到目前为止,这是我的源代码,我可以使用mosquitto_sub读取消息。 问题答案: 您将在代理有时间将消息