mqtt服务端: emqx
mqtt客户端模拟器: mqtt-spy
springBoot配置:
mqtt:
# 是否启用
enable: true
# mqtt服务地址
broker: tcp://localhost:1883
# mqtt服务用户名
username: admin
# mqtt服务密码
password: public
maven依赖:
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.4</version>
</dependency>
对于软件的安装使用及其他依赖(lombok、springBoot等)网上有很多,这里就不做说明了。
参数配置类:
@Getter
@Component
public class IoVariable {
/** 是否启用MQTT */
@Value("${mqtt.enable:true}")
private boolean mqttEnable;
/** mqtt服务ip和端口 */
@Value("${mqtt.broker:tcp://localhost:1883}")
private String broker;
/** mqtt服务用户名 */
@Value("${mqtt.host.userName:admin}")
private String userName;
/** mqtt服务密码 */
@Value("${mqtt.host.passWord:public}")
private String passWord;
}
mqtt客户端:
对外只提供两个方法。
因为创建客户端耗费资源,所以订阅发布主题用同一个客户端(后续进行连接池改造)
定义一个
private static MqttClient client;
这种方式不符合面向对象的编程思想,所以弃用
这里用实现接口并依赖的方式
@FunctionalInterface
public interface IMqttPublish {
/**
* mqtt发布主题
* @param topic 主题
* @param content 主题内容
*/
void publish(String topic, String content);
}
public class ClientMqtt implements IMqttPublish{
/** mqtt连接 */
private MqttClient client;
/** mqtt服务ip和端口 */
private final String broker;
/** mqtt客户端id */
private final String clientId;
/** mqtt其他客户端发布的主题 */
private final String terminalTopic;
/** mqtt系统主题 */
private final String systemTopic;
/** mqtt服务用户名 */
private final String userName;
/** mqtt服务密码 */
private final String passWord;
/**
* 构造函数
*/
public ClientMqtt(String broker, String clientId, String terminalTopic, String systemTopic,
String userName, String passWord, TerminalManager manager) {
this.broker = broker;
this.clientId = clientId;
this.terminalTopic = terminalTopic;
this.systemTopic = systemTopic;
this.userName = userName;
this.passWord = passWord;
this.manager = manager;
}
/**
* 创建客户端连接服务端并订阅主题(这里可以拆成两个函数)
* @return mqtt客户端
*/
public void connect(){
try {
client = new MqttClient(broker, clientId, new MemoryPersistence());
client.connect(getOptions());
//将自身注入回调就可以调用发布主题了
client.setCallback(new PushCallback(this));
//qos级别
int[] qos = {1, 1};
String[] topics = {terminalTopic, systemTopic};
client.subscribe(topics, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 客户端连接服务器options参数设置
* @return options
*/
private MqttConnectOptions getOptions(){
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
// 设置超时时间
options.setConnectionTimeout(10);
// 设置会话心跳时间
options.setKeepAliveInterval(5);
//自动重连
options.setAutomaticReconnect(true);
return options;
}
@Override
public void publish(String topic, String content){
try {
//mqttMessage配置
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(1);
mqttMessage.setRetained(true);
mqttMessage.setPayload(content.getBytes());
//发布主题
MqttDeliveryToken token = client.getTopic(topic).publish(mqttMessage);
token.waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
回调函数:接受到消息在这里处理
@Slf4j
public class PushCallback implements MqttCallbackExtended {
/** mqtt发布主题 */
private final IMqttPublish mqttPublish;
public PushCallback(IMqttPublish mqttPublish) {
this.mqttPublish = mqttPublish;
}
@Override
public void connectionLost(Throwable cause) {
log.error("mqtt连接断开,正在重新连接:" + cause);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.debug("通信完成");
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
String systemTopicFlag = "$SYS";
// 处理系统主题(客户端上下线)
if (topic.startsWith(systemTopicFlag)) {
systemTopicHandler(topic);
return;
}
// 处理其他客户端发布的主题
terminalTopicHandler(mqttMessage);
}
@Override
public void connectComplete(boolean reconnect, String serverUrl) {
log.debug("已重新连接");
}
/**
* 处理系统主题
* @param topic 上下线主题
*/
private void systemTopicHandler(String topic) {
String[] topicSplit = topic.split("/");
// 系统主题拆分后长度:$SYS/brokers/emqx@127.0.0.1/clients/{clientId}/{clientState}
int systemTopicLength = 6;
if (topicSplit.length < systemTopicLength) {
return;
}
// 客户端id
String clientIdInTopic = topicSplit[4];
// 主题上下线类型
String clientState = topicSplit[5];
// 处理上下线消息
switch (ClientStateEnum.valueOf(clientState)) {
// 客户端上线
case connected:
log.debug("客户端上线:" + clientIdInTopic);
break;
// 客户端下线
case disconnected:
log.debug("客户端下线:" + clientIdInTopic);
break;
default:
break;
}
}
/**
* 处理其他客户端发布的主题
* @param mqttMessage 消息内容
*/
private void terminalTopicHandler(MqttMessage mqttMessage) {
PublishTest publishTest = new PublishTest(mqttPublish);
}
/**
* 客户端上下线状态
*/
private enum ClientStateEnum {
/** 上线 */
connected,
/** 下线 */
disconnected
}
}
启动项目时创建mqtt客户端并订阅主题
@Slf4j
@Component
@Order(value = 1)
public class IoAppReadyListener implements ApplicationListener<ApplicationReadyEvent> {
@Autowired
private IoVariable variable;
/** mqtt其他客户端发布的主题 */
private static final String TERMINAL_TOPIC = "自定义主题";
/** mqtt系统主题 */
private static final String SYSTEM_TOPIC = "$SYS/brokers/+/clients/#";
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
if (variable.isMqttEnable()){
startMqtt();
}
}
/**
* mqtt连接
*/
private void startMqtt(){
String clientId = IdUtil.fastSimpleUUID();
MqttClientSingle mqttClientSingle = new MqttClientSingle(variable.getBroker(), clientId,
TERMINAL_TOPIC, SYSTEM_TOPIC, variable.getUserName(),
variable.getPassWord(), manager);
mqttClientSingle.connect();
}
}
发布主题:
public class PublishTest{
/** mqtt发布主题 */
private final IMqttPublish mqttPublish;
/**
* 构造函数
*/
public MqttTerminal(IMqttPublish mqttPublish) {
this.mqttPublish = mqttPublish;
}
/**
* 发布主题
*/
public void publishTest(){
String content = "主题内容";
String topic = "自定义主题";
mqttPublish.publish(topic, content);
}
}