springboot mqtt emqx

滕夜洛
2023-12-01

1.依赖类

  <!-- mqtt -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

2.application.yml 配置文件

mqtt:
  hostUrl: tcp://****:1883
  username: admin
  password: public
  clientId: ***
  cleanSession: true
  reconnect: true
  timeout: 100
  keepAlive: 100
  defaultTopic: client:report:1
  isOpen: true
  qos: 2

3. MqttProperties 配置文件类

@Component
@ConfigurationProperties("mqtt")
@Data
public class MqttProperties {
    /**
     * 用户名
     */
    private String username;

    /**
     * 密码
     */
    private String password;

    /**
     * 连接地址
     */
    private String hostUrl;


    /**
     * 默认连接主题
     */
    private String defaultTopic;

    /**
     * 超时时间
     */
    private int timeout;

    /**
     * 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端
     * 发送个消息判断客户端是否在线,但这个方法并没有重连的机制
     */
    private int keepAlive;

    /**
     * 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连
     * 接记录,这里设置为true表示每次连接到服务器都以新的身份连接
     */
    private Boolean cleanSession;


    /**
     * 客户端Id,同一台服务器下,不允许出现重复的客户端id
     */
    private String clientId;

    /**
     * 是否断线重连
     */
    private Boolean reconnect;

    /**
     * 启动的时候是否关闭mqtt
     */
    private Boolean isOpen;

    /**
     * 连接方式
     */
    private Integer qos;
}

4.  MqttConfig  根据环境是否启动 mqtt

@Configuration
public class MqttConfig {
    @Autowired
    private MqttAcceptClient mqttAcceptClient;

    /**
     * 订阅mqtt
     * @Conditional 按照一定的条件进行判断,满足条件给容器注册bean。
     * @return
     */
    @Conditional(MqttCondition.class)
    @Bean
    public MqttAcceptClient getMqttPushClient() {
        mqttAcceptClient.connect();
        return mqttAcceptClient;
    }

}

5. springboot 启动配置 mqtt类

public class MqttCondition implements Condition {

    @Override
    public boolean matches(ConditionContext context, AnnotatedTypeMetadata annotatedTypeMetadata) {

        //获取当前环境信息
        Environment environment = context.getEnvironment();
        String isOpen = environment.getProperty("mqtt.isOpen");
        return Boolean.parseBoolean(isOpen);
    }
}

6.mqtt客户端类

@Component
public class MqttAcceptClient {

    private static final Logger logger = LoggerFactory.getLogger(MqttAcceptClient.class);

    public static MqttClient client;
    @Autowired
    private MqttAcceptCallback mqttAcceptCallback;
    @Autowired
    private MqttProperties mqttProperties;

    private static void setClient(MqttClient client) {
        MqttAcceptClient.client = client;
    }

    /**
     * 客户端连接
     */
    public void connect() {
        MqttClient client;
        try {
            client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepAlive());
            options.setAutomaticReconnect(mqttProperties.getReconnect());
            options.setCleanSession(mqttProperties.getCleanSession());
            MqttAcceptClient.setClient(client);
            // 设置回调
            client.setCallback(mqttAcceptCallback);
            client.connect(options);
        } catch (Exception e) {
            logger.error("[客户端连接初始化异常:{}]", e.toString());
        }
    }

    /**
     * 重新连接
     */
    public void reconnection() {
        //
        try {
            while (true) {
                client.close();
                this.connect();
                if (client.isConnected()) {
                    logger.info("MQTT重新连接成功:" + client);
                    break;
                }
                Thread.sleep(10000);
            }
        } catch (MqttException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 订阅某个主题
     *
     * @param topic 主题
     * @param qos   连接方式
     */
    public void subscribe(String topic, int qos) {
        logger.info("==============开始订阅主题==============" + topic);
        try {
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            logger.error("");
        }
    }

    /**
     * 取消订阅某个主题
     *
     * @param topic 主题
     */
    public void unsubscribe(String topic) {
        logger.info("==============开始取消订阅主题==============" + topic);
        try {
            client.unsubscribe(topic);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

7. mqtt消费端回调  类

@Component
public class MqttAcceptCallback implements MqttCallbackExtended {

    private static final Logger logger = LoggerFactory.getLogger(MqttAcceptCallback.class);

  
    @Autowired
    private MqttAcceptClient mqttAcceptClient;

    @Autowired
    private AlertVehicleService alertVehicleService;

    /**
     * 客户端断开后触发
     *
     * @param throwable 异常信息
     */
    @Override
    public void connectionLost(Throwable throwable) {
        logger.info("连接断开,可以做重连");
        if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) {
            logger.info("emqx重新连接....................................................");
            mqttAcceptClient.reconnection();
        }
    }

    /**
     * 客户端收到消息触发
     *
     * @param topic       主题
     * @param mqttMessage 消息
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) {
        logger.info("接收消息主题 : " + topic);
        String payLoad = new String(mqttMessage.getPayload());
        logger.info("接收消息 : " + payLoad );
    }

    /**
     * 发布消息成功
     *
     * @param token token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        String[] topics = token.getTopics();
        for (String topic : topics) {
            logger.info("向主题:" + topic + "发送消息成功!");
        }
        try {
            MqttMessage message = token.getMessage();
            byte[] payload = message.getPayload();
            String s = new String(payload, StandardCharsets.UTF_8);
            logger.info("消息的内容是:" + s);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 连接emq服务器后触发
     *
     * @param reconnect If true, the connection was the result of automatic reconnect
     * @param serverUri the server uri that the connection was made to
     */
    @Override
    public void connectComplete(boolean reconnect, String serverUri) {
        logger.info("--------------------ClientId:"
                + MqttAcceptClient.client.getClientId() + "客户端连接成功!--------------------");
        // 以/#结尾表示订阅所有以test开头的主题
         //需要填写你的主题
        //需要填写你的主题
        //需要填写你的主题
        //需要填写你的主题
        //需要填写你的主题
        mqttAcceptClient.subscribe(主题名称, 1);
    }
}

 类似资料:

相关阅读

相关文章

相关问答