当前位置: 首页 > 工具软件 > mqtt-spy > 使用案例 >

MQTT订阅发布主题

咸承教
2023-12-01

MQTT订阅发布主题

前言:

  • 因为tcp协议正常会出现丢包、卡死等现象,所以最近需要在项目中添加mqtt协议,mqtt协议的qos机制保证在网络条件比较差的情况下也能保持良好通信,反正它的各种好处网上有很多,所以学习了一下。
  • 网上对于mqtt的订阅发布的例子大多将mqtt客户端分为client和server,一个发布主题,一个订阅主题,在我看来并没有什么client和server之分,一个客户端既可以发布主题也可以订阅主题,而服务端应该是类似于emqx这种的消息服务器,所以这里实现的是同一个客户端的订阅发布主题。

准备:

  • mqtt服务端: emqx

  • mqtt客户端模拟器: mqtt-spy

  • springBoot配置:

    mqtt:
        # 是否启用
        enable: true
        # mqtt服务地址
        broker: tcp://localhost:1883
        # mqtt服务用户名
        username: admin
        # mqtt服务密码
        password: public
  • maven依赖:

    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>1.2.4</version>
    </dependency>

对于软件的安装使用及其他依赖(lombok、springBoot等)网上有很多,这里就不做说明了。

实现:

  • 参数配置类:

    @Getter
    @Component
    public class IoVariable {
    
        /** 是否启用MQTT */
        @Value("${mqtt.enable:true}")
        private boolean mqttEnable;
        /** mqtt服务ip和端口 */
        @Value("${mqtt.broker:tcp://localhost:1883}")
        private String broker;
        /** mqtt服务用户名 */
        @Value("${mqtt.host.userName:admin}")
        private String userName;
        /** mqtt服务密码 */
        @Value("${mqtt.host.passWord:public}")
        private String passWord;
    
    }
  • mqtt客户端:

    • 对外只提供两个方法。

      • 创建客户端并订阅主题
      • 发布主题
    • 因为创建客户端耗费资源,所以订阅发布主题用同一个客户端(后续进行连接池改造)

      • 定义一个

        private static MqttClient client;

        这种方式不符合面向对象的编程思想,所以弃用

      • 这里用实现接口并依赖的方式

    @FunctionalInterface
    public interface IMqttPublish {
    
        /**
         * mqtt发布主题
         * @param topic 主题
         * @param content 主题内容
         */
        void publish(String topic, String content);
    }
    public class ClientMqtt implements IMqttPublish{
    
        /** mqtt连接 */
        private MqttClient client;
        /** mqtt服务ip和端口 */
        private final String broker;
        /** mqtt客户端id */
        private final String clientId;
        /** mqtt其他客户端发布的主题 */
        private final String terminalTopic;
        /** mqtt系统主题 */
        private final String systemTopic;
        /** mqtt服务用户名 */
        private final String userName;
        /** mqtt服务密码 */
        private final String passWord;
    
        /**
         * 构造函数
         */
        public ClientMqtt(String broker, String clientId, String terminalTopic, String systemTopic,
                          String userName, String passWord, TerminalManager manager) {
            this.broker = broker;
            this.clientId = clientId;
            this.terminalTopic = terminalTopic;
            this.systemTopic = systemTopic;
            this.userName = userName;
            this.passWord = passWord;
            this.manager = manager;
        }
    
    
        /**
         * 创建客户端连接服务端并订阅主题(这里可以拆成两个函数)
         * @return mqtt客户端
         */
        public void connect(){
            try {
                client = new MqttClient(broker, clientId, new MemoryPersistence());
                client.connect(getOptions());
                //将自身注入回调就可以调用发布主题了
                client.setCallback(new PushCallback(this));
                //qos级别
                int[] qos = {1, 1};
                String[] topics = {terminalTopic, systemTopic};
                client.subscribe(topics, qos);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 客户端连接服务器options参数设置
         * @return options
         */
        private MqttConnectOptions getOptions(){
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(false);
            options.setUserName(userName);
            options.setPassword(passWord.toCharArray());
            // 设置超时时间
            options.setConnectionTimeout(10);
            // 设置会话心跳时间
            options.setKeepAliveInterval(5);
            //自动重连
            options.setAutomaticReconnect(true);
            return options;
        }
    
        @Override
        public void publish(String topic, String content){
            try {
                //mqttMessage配置
                MqttMessage mqttMessage = new MqttMessage();
                mqttMessage.setQos(1);
                mqttMessage.setRetained(true);
                mqttMessage.setPayload(content.getBytes());
                //发布主题
                MqttDeliveryToken token = client.getTopic(topic).publish(mqttMessage);
                token.waitForCompletion();
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }
  • 回调函数:接受到消息在这里处理

    • 这里就可以用同一个客户端发布主题了
    • 其他对象需要使用发布主题用相同的方式即可。这里在处理其他客户端发布的主题时创建发布主题的对象
    @Slf4j
    public class PushCallback implements MqttCallbackExtended {
    
        /** mqtt发布主题 */
        private final IMqttPublish mqttPublish;
        
        public PushCallback(IMqttPublish mqttPublish) {
            this.mqttPublish = mqttPublish;
        }
    
        @Override
        public void connectionLost(Throwable cause) {
            log.error("mqtt连接断开,正在重新连接:" + cause);
        }
    
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            log.debug("通信完成");
        }
    
        @Override
        public void messageArrived(String topic, MqttMessage mqttMessage) {
            String systemTopicFlag = "$SYS";
            // 处理系统主题(客户端上下线)
            if (topic.startsWith(systemTopicFlag)) {
                systemTopicHandler(topic);
                return;
            }
            // 处理其他客户端发布的主题
            terminalTopicHandler(mqttMessage);
        }
    
        @Override
        public void connectComplete(boolean reconnect, String serverUrl) {
            log.debug("已重新连接");
        }
    
        /**
         * 处理系统主题
         * @param topic 上下线主题
         */
        private void systemTopicHandler(String topic) {
            String[] topicSplit = topic.split("/");
            // 系统主题拆分后长度:$SYS/brokers/emqx@127.0.0.1/clients/{clientId}/{clientState}
            int systemTopicLength = 6;
            if (topicSplit.length < systemTopicLength) {
                return;
            }
            // 客户端id
            String clientIdInTopic = topicSplit[4];
            // 主题上下线类型
            String clientState = topicSplit[5];
            // 处理上下线消息
            switch (ClientStateEnum.valueOf(clientState)) {
                // 客户端上线
                case connected:
                    log.debug("客户端上线:" + clientIdInTopic);
                    break;
                // 客户端下线
                case disconnected:
                    log.debug("客户端下线:" + clientIdInTopic);
                    break;
                default:
                    break;
            }
        }
    
        /**
         * 处理其他客户端发布的主题
         * @param mqttMessage 消息内容
         */
        private void terminalTopicHandler(MqttMessage mqttMessage) {
            PublishTest publishTest = new PublishTest(mqttPublish);
        }
    
        /**
         * 客户端上下线状态
         */
        private enum ClientStateEnum {
            /** 上线 */
            connected,
            /** 下线 */
            disconnected
        }
    }
  • 启动项目时创建mqtt客户端并订阅主题

    @Slf4j
    @Component
    @Order(value = 1)
    public class IoAppReadyListener implements ApplicationListener<ApplicationReadyEvent> {
    
        @Autowired
        private IoVariable variable;
    
        /** mqtt其他客户端发布的主题 */
        private static final String TERMINAL_TOPIC = "自定义主题";
    
        /** mqtt系统主题 */
        private static final String SYSTEM_TOPIC = "$SYS/brokers/+/clients/#";
    
        @Override
        public void onApplicationEvent(ApplicationReadyEvent event) {
            if (variable.isMqttEnable()){
                startMqtt();
            }
        }
        
        /**
         * mqtt连接
         */
        private void startMqtt(){
            String clientId = IdUtil.fastSimpleUUID();
            MqttClientSingle mqttClientSingle = new MqttClientSingle(variable.getBroker(), clientId,
                    TERMINAL_TOPIC, SYSTEM_TOPIC, variable.getUserName(),
                    variable.getPassWord(), manager);
            mqttClientSingle.connect();
        }
    }

发布主题:

public class PublishTest{

    /** mqtt发布主题 */
    private final IMqttPublish mqttPublish;

    /**
     * 构造函数
     */
    public MqttTerminal(IMqttPublish mqttPublish) {
        this.mqttPublish = mqttPublish;
    }

    /**
     * 发布主题
     */
    public void publishTest(){
        String content = "主题内容";
        String topic = "自定义主题";
        mqttPublish.publish(topic, content);
    }
}

另:

  1. 代码业务部分有删改,可能会有部分错误。
  2. 由于作者水平有限,若有不足之处,欢迎指正。

参考资料

emqx官方文档

 类似资料: