三。hivemq-mqtt-client 源码地址
三。hivemq-mqtt-client 文档说明
1.什么是MQTT? 一个协议
mqtt是一种协议,客户端服务器发布/订阅消息传递协议
它重量轻,开放,简单并且设计得易于实施
适用于机器对机器(M2M)和物联网(IoT)上下文中的通信
2.什么是HiveMQ? 一个消息传递平台
HiveMQ是基于MQTT协议的消息传递平台,
HiveMQ完全支持所有标准MQTT功能,并提供扩展功能
3.什么是hivemq-mqtt-client?
连接HiveMq平台
发布者——hivemq服务器——订阅者
//docker方式
1.下载并运行hivemq
docker run -p 8080:8080 -p 1883:1883 hivemq/hivemq4
2.浏览器输入http://localhost:8080
用户:admin 密码:hivemq
1.通信项目pom导入hivemq-mqtt-client依赖
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
<version>1.2.1</version>
</dependency>
1,服务端
package com.test.platform.mqtt;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
* @author LuHuanCheng
* 类说明:服务端
*/
@Component
public class HiveMQServer {
private static volatile Mqtt5Client server;
//获取服务端
@PostConstruct
public static Mqtt5Client getInstance() {
if (server == null) {
synchronized (Mqtt5Client.class) {
if (server == null) {
//1.服务端
server = Mqtt5Client.builder()
.identifier("platform-10.10.35.85") // use a unique identifier
.serverHost("192.168.61.140") // use the public HiveMQ broker
.automaticReconnectWithDefaultConfig() // the client automatically reconnects
.build();
//2.连接
server.toBlocking().connectWith()
.cleanStart(false)
.sessionExpiryInterval(TimeUnit.HOURS.toSeconds(1)) // buffer messages
.send();
// 3. 订阅消息
server.toAsync().subscribeWith()
.topicFilter("home/client/#")
.callback(publish -> {
System.out.println("接收到topic消息,topic名称:" + publish.getTopic() + " topic值:" +
new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8));
})
.send();
}
}
}
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return server;
}
//发送消息
public void send() throws InterruptedException {
//发送亮度topic给设备
server.toBlocking().publishWith()
.topic("home/server/info")
.payload("你好客户端,我是服务端".getBytes())
.send();
TimeUnit.MILLISECONDS.sleep(500);
}
}
//控制器调用发送信息
@RestController
@RequestMapping("/test")
public class TestController {
@Autowired
HiveMQServer hiveMQServer;
@ResponseBody
@GetMapping("/send")
public void test() throws InterruptedException {
hiveMQServer.send();
}
}
2.客户端
package com.test.device.mqtt;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
@Component
public class HiveMQClient {
private static volatile Mqtt5Client client;
@PostConstruct
public static Mqtt5Client getInstance() {
if (client == null) {
synchronized (Mqtt5Client.class) {
if (client == null) {
//1.客户端
client = Mqtt5Client.builder()
.identifier("device-10.10.35.85") // use a unique identifier
.serverHost("192.168.61.140") // use the public HiveMQ broker
.automaticReconnectWithDefaultConfig() // the client automatically reconnects
.build();
//2.连接
client.toBlocking().connectWith()
.cleanStart(false)
.sessionExpiryInterval(TimeUnit.HOURS.toSeconds(1)) // buffer messages
.send();
// 3. 订阅消息
client.toAsync().subscribeWith()
.topicFilter("home/server/#")
.callback(publish -> {
System.out.println("接收到topic消息,topic名称:" + publish.getTopic() + " topic值:" +
new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8));
})
.send();
}
}
}
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return client;
}
//发送消息给服务端
public void send() throws InterruptedException {
client.toBlocking().publishWith()
.topic("home/client/info")
.payload("你好服务端,我是客户端".getBytes())
.send();
TimeUnit.MILLISECONDS.sleep(500);
}
}
//控制器调用发送信息
@RestController
@RequestMapping("/test")
public class TestController {
@Autowired
HiveMQServer hiveMQServer;
@ResponseBody
@GetMapping("/send")
public void test() throws InterruptedException {
hiveMQServer.send();
}
}