下载地址:免费试用 EMQ 产品
下载压缩包解压,打开cmd,进入emqx/bin目录,输入emqx start,启动服务。
<!-- mqtt -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
#MQTT Config
mqtt:
#MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:61613,tcp://192.168.2.133:61613
host: tcp://127.0.0.1:11883
#MQTT-连接服务器默认客户端ID
clientid: mqtt_id
#MQTT-用户名
username: admin
#MQTT-密码
password: admin
#MQTT-默认的消息推送主题,实际可在调用接口时指定
topic: test
#连接超时
timeout: 1000
#设置会话心跳时间
keepalive: 100
(1)消费监听
package com.example.demo.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
/**
* @author yangkai
* @description 消费监听
* @date 2022/2/10 9:40
*/
@Component
public class PushCallback implements MqttCallback {
private static MqttClient client;
@Override
public void connectionLost(Throwable throwable) {
if (client == null || !client.isConnected()) {
System.out.println("连接断开,正在重连....");
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}
(2)mqtt客户端
package com.example.demo.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author yangkai
* @description mqtt客户端
* @date 2022/2/10 9:39
*/
@Slf4j
@Component
public class MqttCustomerClient {
@Autowired
private PushCallback pushCallback;
private static MqttClient client;
public static MqttClient getClient(){
return client;
}
public static void setClient(MqttClient client){
MqttCustomerClient.client=client;
}
/**
* 客户端连接
*
* @param host ip+端口
* @param clientID 客户端Id
* @param username 用户名
* @param password 密码
* @param timeout 超时时间
* @param keeplive 保留数
*/
public void connect(String host,String clientID,String username,String password,int timeout,int keeplive){
MqttClient client;
try {
client=new MqttClient(host,clientID,new MemoryPersistence());
MqttConnectOptions options=new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keeplive);
MqttCustomerClient.setClient(client);
try {
client.setCallback(pushCallback);
client.connect(options);
}catch (Exception e){
e.printStackTrace();
}
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 发布,默认qos为0,非持久化
* @param topic
* @param pushMessage
*/
public void pushlish(String topic,String pushMessage){
pushlish(0,false,topic,pushMessage);
}
/**
* 发布
*
* @param qos 连接方式
* @param retained 是否保留
* @param topic 主题
* @param pushMessage 消息体
*/
public void pushlish(int qos,boolean retained,String topic,String pushMessage){
MqttMessage message=new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mqttTopic= MqttCustomerClient.getClient().getTopic(topic);
if(null== mqttTopic){
log.error("topic not exist");
}
MqttDeliveryToken token;
try {
token=mqttTopic.publish(message);
token.waitForCompletion();
}catch (MqttPersistenceException e){
e.printStackTrace();
}catch (MqttException e){
e.printStackTrace();
}
}
/**
* 订阅某个主题,qos默认为0
* @param topic
*/
public void subscribe(String topic){
log.error("开始订阅主题" + topic);
subscribe(topic,0);
}
public void subscribe(String topic,int qos){
try {
MqttCustomerClient.getClient().subscribe(topic,qos);
}catch (MqttException e){
e.printStackTrace();
}
}
}
(3)mqtt订阅
package com.example.demo.mqtt;
import lombok.Data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
/**
* @author yangkai
* @description mqtt订阅
* @date 2022/2/10 9:36
*/
@Component
@Configuration
@Data
@ConfigurationProperties("mqtt")
public class MqttConfiguration {
@Autowired
private MqttCustomerClient mqttCustomerClient;
private String host;
private String clientid;
private String username;
private String password;
private String topic;
private int timeout;
private int keepalive;
@Bean
public MqttCustomerClient getMqttCustomerClient() {
mqttCustomerClient.connect(host, clientid, username, password, timeout,keepalive);
// 以/#结尾表示订阅所有以test开头的主题
mqttCustomerClient.subscribe("test/#");
return mqttCustomerClient;
}
}
package com.example.demo.mqtt;
import com.example.demo.EmqxDemoApplication;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest(classes = EmqxDemoApplication.class)
public class MqttCustomerClientTest {
@Autowired
private MqttCustomerClient mqttCustomerClient;
@Test
void pushlish() {
for (int i = 0; i < 10; i++) {
mqttCustomerClient.pushlish("test/device1", "hello mqtt............" + i);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}