1.依赖类
<!-- mqtt -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2.application.yml 配置文件
mqtt:
hostUrl: tcp://****:1883
username: admin
password: public
clientId: ***
cleanSession: true
reconnect: true
timeout: 100
keepAlive: 100
defaultTopic: client:report:1
isOpen: true
qos: 2
3. MqttProperties 配置文件类
@Component
@ConfigurationProperties("mqtt")
@Data
public class MqttProperties {
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 连接地址
*/
private String hostUrl;
/**
* 默认连接主题
*/
private String defaultTopic;
/**
* 超时时间
*/
private int timeout;
/**
* 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端
* 发送个消息判断客户端是否在线,但这个方法并没有重连的机制
*/
private int keepAlive;
/**
* 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连
* 接记录,这里设置为true表示每次连接到服务器都以新的身份连接
*/
private Boolean cleanSession;
/**
* 客户端Id,同一台服务器下,不允许出现重复的客户端id
*/
private String clientId;
/**
* 是否断线重连
*/
private Boolean reconnect;
/**
* 启动的时候是否关闭mqtt
*/
private Boolean isOpen;
/**
* 连接方式
*/
private Integer qos;
}
4. MqttConfig 根据环境是否启动 mqtt
@Configuration
public class MqttConfig {
@Autowired
private MqttAcceptClient mqttAcceptClient;
/**
* 订阅mqtt
* @Conditional 按照一定的条件进行判断,满足条件给容器注册bean。
* @return
*/
@Conditional(MqttCondition.class)
@Bean
public MqttAcceptClient getMqttPushClient() {
mqttAcceptClient.connect();
return mqttAcceptClient;
}
}
5. springboot 启动配置 mqtt类
public class MqttCondition implements Condition {
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata annotatedTypeMetadata) {
//获取当前环境信息
Environment environment = context.getEnvironment();
String isOpen = environment.getProperty("mqtt.isOpen");
return Boolean.parseBoolean(isOpen);
}
}
6.mqtt客户端类
@Component
public class MqttAcceptClient {
private static final Logger logger = LoggerFactory.getLogger(MqttAcceptClient.class);
public static MqttClient client;
@Autowired
private MqttAcceptCallback mqttAcceptCallback;
@Autowired
private MqttProperties mqttProperties;
private static void setClient(MqttClient client) {
MqttAcceptClient.client = client;
}
/**
* 客户端连接
*/
public void connect() {
MqttClient client;
try {
client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setConnectionTimeout(mqttProperties.getTimeout());
options.setKeepAliveInterval(mqttProperties.getKeepAlive());
options.setAutomaticReconnect(mqttProperties.getReconnect());
options.setCleanSession(mqttProperties.getCleanSession());
MqttAcceptClient.setClient(client);
// 设置回调
client.setCallback(mqttAcceptCallback);
client.connect(options);
} catch (Exception e) {
logger.error("[客户端连接初始化异常:{}]", e.toString());
}
}
/**
* 重新连接
*/
public void reconnection() {
//
try {
while (true) {
client.close();
this.connect();
if (client.isConnected()) {
logger.info("MQTT重新连接成功:" + client);
break;
}
Thread.sleep(10000);
}
} catch (MqttException | InterruptedException e) {
e.printStackTrace();
}
}
/**
* 订阅某个主题
*
* @param topic 主题
* @param qos 连接方式
*/
public void subscribe(String topic, int qos) {
logger.info("==============开始订阅主题==============" + topic);
try {
client.subscribe(topic, qos);
} catch (MqttException e) {
logger.error("");
}
}
/**
* 取消订阅某个主题
*
* @param topic 主题
*/
public void unsubscribe(String topic) {
logger.info("==============开始取消订阅主题==============" + topic);
try {
client.unsubscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
7. mqtt消费端回调 类
@Component
public class MqttAcceptCallback implements MqttCallbackExtended {
private static final Logger logger = LoggerFactory.getLogger(MqttAcceptCallback.class);
@Autowired
private MqttAcceptClient mqttAcceptClient;
@Autowired
private AlertVehicleService alertVehicleService;
/**
* 客户端断开后触发
*
* @param throwable 异常信息
*/
@Override
public void connectionLost(Throwable throwable) {
logger.info("连接断开,可以做重连");
if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) {
logger.info("emqx重新连接....................................................");
mqttAcceptClient.reconnection();
}
}
/**
* 客户端收到消息触发
*
* @param topic 主题
* @param mqttMessage 消息
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
logger.info("接收消息主题 : " + topic);
String payLoad = new String(mqttMessage.getPayload());
logger.info("接收消息 : " + payLoad );
}
/**
* 发布消息成功
*
* @param token token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
String[] topics = token.getTopics();
for (String topic : topics) {
logger.info("向主题:" + topic + "发送消息成功!");
}
try {
MqttMessage message = token.getMessage();
byte[] payload = message.getPayload();
String s = new String(payload, StandardCharsets.UTF_8);
logger.info("消息的内容是:" + s);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 连接emq服务器后触发
*
* @param reconnect If true, the connection was the result of automatic reconnect
* @param serverUri the server uri that the connection was made to
*/
@Override
public void connectComplete(boolean reconnect, String serverUri) {
logger.info("--------------------ClientId:"
+ MqttAcceptClient.client.getClientId() + "客户端连接成功!--------------------");
// 以/#结尾表示订阅所有以test开头的主题
//需要填写你的主题
//需要填写你的主题
//需要填写你的主题
//需要填写你的主题
//需要填写你的主题
mqttAcceptClient.subscribe(主题名称, 1);
}
}