SpringBoot整合MQTT( Emqx)Demo

穆彬郁
2023-12-01

1.下载Emqx安装包,配置Emqx环境

下载地址:免费试用 EMQ 产品

下载压缩包解压,打开cmd,进入emqx/bin目录,输入emqx start,启动服务。

2.创建SpringBoot项目Demo,添加pom引入jar包

<!-- mqtt -->
<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-integration</artifactId>
</dependency>

3.配置application.yml文件,设置Emqx参数

#MQTT Config
  mqtt:
  #MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:61613,tcp://192.168.2.133:61613
    host: tcp://127.0.0.1:11883
    #MQTT-连接服务器默认客户端ID
    clientid: mqtt_id
    #MQTT-用户名
    username: admin
    #MQTT-密码
    password: admin
    #MQTT-默认的消息推送主题,实际可在调用接口时指定
    topic: test
    #连接超时
    timeout: 1000
    #设置会话心跳时间
    keepalive: 100

4.代码

(1)消费监听

package com.example.demo.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;

/**
 * @author yangkai
 * @description 消费监听
 * @date 2022/2/10 9:40
 */
@Component
public class PushCallback implements MqttCallback {

    private static MqttClient client;

    @Override
    public void connectionLost(Throwable throwable) {
        if (client == null || !client.isConnected()) {
            System.out.println("连接断开,正在重连....");
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println("接收消息主题 : " + topic);
        System.out.println("接收消息Qos : " + message.getQos());
        System.out.println("接收消息内容 : " + new String(message.getPayload()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }
}

(2)mqtt客户端

package com.example.demo.mqtt;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author yangkai
 * @description mqtt客户端
 * @date 2022/2/10 9:39
 */
@Slf4j
@Component
public class MqttCustomerClient {
    @Autowired
    private PushCallback pushCallback;


    private static MqttClient client;

    public  static MqttClient getClient(){
        return  client;
    }

    public static void setClient(MqttClient client){
        MqttCustomerClient.client=client;
    }

    /**
     * 客户端连接
     *
     * @param host      ip+端口
     * @param clientID  客户端Id
     * @param username  用户名
     * @param password  密码
     * @param timeout   超时时间
     * @param keeplive 保留数
     */
    public void connect(String host,String clientID,String username,String password,int timeout,int keeplive){
        MqttClient client;

        try {
            client=new MqttClient(host,clientID,new MemoryPersistence());
            MqttConnectOptions options=new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keeplive);
            MqttCustomerClient.setClient(client);
            try {
                client.setCallback(pushCallback);
                client.connect(options);
            }catch (Exception e){
                e.printStackTrace();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 发布,默认qos为0,非持久化
     * @param topic
     * @param pushMessage
     */
    public void pushlish(String topic,String pushMessage){
        pushlish(0,false,topic,pushMessage);
    }

    /**
     * 发布
     *
     * @param qos         连接方式
     * @param retained    是否保留
     * @param topic       主题
     * @param pushMessage 消息体
     */
    public void pushlish(int qos,boolean retained,String topic,String pushMessage){
        MqttMessage message=new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mqttTopic= MqttCustomerClient.getClient().getTopic(topic);
        if(null== mqttTopic){
            log.error("topic not exist");
        }
        MqttDeliveryToken token;
        try {
            token=mqttTopic.publish(message);
            token.waitForCompletion();
        }catch (MqttPersistenceException e){
            e.printStackTrace();
        }catch (MqttException e){
            e.printStackTrace();
        }
    }

    /**
     * 订阅某个主题,qos默认为0
     * @param topic
     */
    public void subscribe(String topic){
        log.error("开始订阅主题" + topic);
        subscribe(topic,0);
    }

    public void subscribe(String topic,int qos){
        try {
            MqttCustomerClient.getClient().subscribe(topic,qos);
        }catch (MqttException e){
            e.printStackTrace();
        }
    }
}

(3)mqtt订阅

package com.example.demo.mqtt;

import lombok.Data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

/**
 * @author yangkai
 * @description mqtt订阅
 * @date 2022/2/10 9:36
 */
@Component
@Configuration
@Data
@ConfigurationProperties("mqtt")
public class MqttConfiguration {
    @Autowired
    private MqttCustomerClient mqttCustomerClient;

    private String host;
    private String clientid;
    private String username;
    private String password;
    private String topic;
    private int timeout;
    private int keepalive;

    @Bean
    public MqttCustomerClient getMqttCustomerClient() {
        mqttCustomerClient.connect(host, clientid, username, password, timeout,keepalive);
        // 以/#结尾表示订阅所有以test开头的主题
        mqttCustomerClient.subscribe("test/#");
        return mqttCustomerClient;
    }
}

5.测试类

package com.example.demo.mqtt;

import com.example.demo.EmqxDemoApplication;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest(classes = EmqxDemoApplication.class)
public class MqttCustomerClientTest {
    @Autowired
    private MqttCustomerClient mqttCustomerClient;

    @Test
    void pushlish() {
        for (int i = 0; i < 10; i++) {
            mqttCustomerClient.pushlish("test/device1", "hello mqtt............" + i);
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

 类似资料: