项目用到了HIVEMQ连接IOT设备,一开始使用了 spring-integration-mqtt,后来逛了逛HiveMq的官网发现他们有自己的SDK,所以记录一下。
Maven坐标:
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
<version>1.2.1</version>
</dependency>
异步代码示例:
package com.hive.mqtt.example;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
import com.hivemq.client.mqtt.mqtt3.Mqtt3Client;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import java.util.UUID;
import java.util.function.Consumer;
import static javax.xml.bind.DatatypeConverter.printHexBinary;
/**
* @author qingtaiJiang
* @date 2020/10/10 15:06
*/
public class MqttExample {
public static void main(final String[] args) {
//回调函数
Consumer<Mqtt3Publish> consumer = new Consumer<Mqtt3Publish>() {
@Override
public void accept(Mqtt3Publish mqtt3Publish) {
System.out.println("getPayload:" + mqtt3Publish.getPayload());
System.out.println("qos:" + mqtt3Publish.getQos());
System.out.println("extend:" + mqtt3Publish.extend());
System.out.println("AsBytes:" + printHexBinary(mqtt3Publish.getPayloadAsBytes()));
System.out.println("getTopic:" + mqtt3Publish.getTopic());
System.out.println("getType:" + mqtt3Publish.getType());
System.out.println("isRetain:" + mqtt3Publish.isRetain());
}
};
//和Mqtt服务建立连接
final Mqtt3AsyncClient client = Mqtt3Client.builder()
.identifier(UUID.randomUUID().toString())
.serverHost("zrcentos03")
.buildAsync();
client.connect().thenAccept(connAck -> System.out.println("connected:" + connAck));
//订阅主题
client.toAsync().subscribeWith()
.topicFilter("/test/aa") //指定订阅主题
.qos(MqttQos.AT_LEAST_ONCE) //指定订阅质量
.callback(consumer) //接收到订阅消息后的处理函数
.send();
//订阅主题
client.toAsync().subscribeWith()
.topicFilter("/test/bb")
.qos(MqttQos.AT_LEAST_ONCE)
.callback(consumer)
.send();
//取消订阅
client.toAsync().unsubscribeWith()
.topicFilter("/test/bb")
.send();
while (true) {
System.out.println("");
//发布消息
client.toAsync().publishWith()
.topic("/test/aa")
.payload("1".getBytes())
.qos(MqttQos.EXACTLY_ONCE)
.send();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}