当前位置: 首页 > 工具软件 > HiveMQ > 使用案例 >

mqtt HiveMQ hivemq-mqtt-client

昝欣可
2023-12-01

一。MQTT官网

二。HiveMQ官网
二。HiveMQ文档

三。hivemq-mqtt-client 源码地址
三。hivemq-mqtt-client 文档说明

一。概念说明

1.什么是MQTT?  一个协议
	mqtt是一种协议,客户端服务器发布/订阅消息传递协议
	它重量轻,开放,简单并且设计得易于实施
	适用于机器对机器(M2M)和物联网(IoT)上下文中的通信

2.什么是HiveMQ?   一个消息传递平台
	HiveMQ是基于MQTT协议的消息传递平台,
	HiveMQ完全支持所有标准MQTT功能,并提供扩展功能

3.什么是hivemq-mqtt-client? 
	连接HiveMq平台
	

二。下载安装并启动hivemq服务器

发布者——hivemq服务器——订阅者

//docker方式

1.下载并运行hivemq
	 docker run -p 8080:8080 -p 1883:1883 hivemq/hivemq4

2.浏览器输入http://localhost:8080
    用户:admin         密码:hivemq

三。通信案例

1.通信项目pom导入hivemq-mqtt-client依赖

  <dependency>
            <groupId>com.hivemq</groupId>
            <artifactId>hivemq-mqtt-client</artifactId>
            <version>1.2.1</version>
        </dependency>

1,服务端


package com.test.platform.mqtt;

import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

/**
 * @author LuHuanCheng
 * 类说明:服务端
 */
@Component
public class HiveMQServer {
    private static volatile Mqtt5Client server;

     //获取服务端
    @PostConstruct
    public static Mqtt5Client getInstance() {
        if (server == null) {
            synchronized (Mqtt5Client.class) {
                if (server == null) {
                    //1.服务端
                    server = Mqtt5Client.builder()
                            .identifier("platform-10.10.35.85") // use a unique identifier
                            .serverHost("192.168.61.140") // use the public HiveMQ broker
                            .automaticReconnectWithDefaultConfig() // the client automatically reconnects
                            .build();

                    //2.连接
                    server.toBlocking().connectWith()
                            .cleanStart(false)
                            .sessionExpiryInterval(TimeUnit.HOURS.toSeconds(1)) // buffer messages
                            .send();

                    // 3. 订阅消息
                    server.toAsync().subscribeWith()
                            .topicFilter("home/client/#")
                            .callback(publish -> {
                                System.out.println("接收到topic消息,topic名称:" + publish.getTopic() + "  topic值:" +
                                        new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8));
                            })
                            .send();
                }
            }
        }
        try {
            TimeUnit.MILLISECONDS.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return server;
    }

  	//发送消息
    public void send() throws InterruptedException {
        //发送亮度topic给设备
        server.toBlocking().publishWith()
                .topic("home/server/info")
                .payload("你好客户端,我是服务端".getBytes())
                .send();

        TimeUnit.MILLISECONDS.sleep(500);
    }
}


//控制器调用发送信息
@RestController
@RequestMapping("/test")
public class TestController {
    @Autowired
    HiveMQServer hiveMQServer;

    @ResponseBody
    @GetMapping("/send")
    public void test() throws InterruptedException {
        hiveMQServer.send();
    }
}

2.客户端

package com.test.device.mqtt;

import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;


@Component
public class HiveMQClient {
    private static volatile Mqtt5Client client;

    @PostConstruct
    public static Mqtt5Client getInstance() {
        if (client == null) {
            synchronized (Mqtt5Client.class) {
                if (client == null) {
                    //1.客户端
                    client = Mqtt5Client.builder()
                            .identifier("device-10.10.35.85") // use a unique identifier
                            .serverHost("192.168.61.140") // use the public HiveMQ broker
                            .automaticReconnectWithDefaultConfig() // the client automatically reconnects
                            .build();

                    //2.连接
                    client.toBlocking().connectWith()
                            .cleanStart(false)
                            .sessionExpiryInterval(TimeUnit.HOURS.toSeconds(1)) // buffer messages
                            .send();


                    // 3. 订阅消息
                    client.toAsync().subscribeWith()
                            .topicFilter("home/server/#")
                            .callback(publish -> {
                                System.out.println("接收到topic消息,topic名称:" + publish.getTopic() + "  topic值:" +
                                        new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8));
                            })
                            .send();
                }
            }
        }
        try {
            TimeUnit.MILLISECONDS.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return client;
    }

	//发送消息给服务端
    public void send() throws InterruptedException {
        client.toBlocking().publishWith()
                .topic("home/client/info")
                .payload("你好服务端,我是客户端".getBytes())
                .send();


        TimeUnit.MILLISECONDS.sleep(500);
    }
}

//控制器调用发送信息
@RestController
@RequestMapping("/test")
public class TestController {
    @Autowired
    HiveMQServer hiveMQServer;

    @ResponseBody
    @GetMapping("/send")
    public void test() throws InterruptedException {
        hiveMQServer.send();
    }
}
 类似资料: