当前位置: 首页 > 工具软件 > HiveMQ > 使用案例 >

HiveMq-API示例

杨曜瑞
2023-12-01

项目用到了HIVEMQ连接IOT设备,一开始使用了 spring-integration-mqtt,后来逛了逛HiveMq的官网发现他们有自己的SDK,所以记录一下。

Maven坐标:

        <dependency>
            <groupId>com.hivemq</groupId>
            <artifactId>hivemq-mqtt-client</artifactId>
            <version>1.2.1</version>
        </dependency>

异步代码示例: 

package com.hive.mqtt.example;

import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
import com.hivemq.client.mqtt.mqtt3.Mqtt3Client;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;

import java.util.UUID;
import java.util.function.Consumer;

import static javax.xml.bind.DatatypeConverter.printHexBinary;

/**
 * @author qingtaiJiang
 * @date 2020/10/10 15:06
 */
public class MqttExample {

    public static void main(final String[] args) {

        //回调函数
        Consumer<Mqtt3Publish> consumer = new Consumer<Mqtt3Publish>() {
            @Override
            public void accept(Mqtt3Publish mqtt3Publish) {
                System.out.println("getPayload:" + mqtt3Publish.getPayload());
                System.out.println("qos:" + mqtt3Publish.getQos());
                System.out.println("extend:" + mqtt3Publish.extend());
                System.out.println("AsBytes:" + printHexBinary(mqtt3Publish.getPayloadAsBytes()));
                System.out.println("getTopic:" + mqtt3Publish.getTopic());
                System.out.println("getType:" + mqtt3Publish.getType());
                System.out.println("isRetain:" + mqtt3Publish.isRetain());
            }
        };

        //和Mqtt服务建立连接
        final Mqtt3AsyncClient client = Mqtt3Client.builder()
                .identifier(UUID.randomUUID().toString())
                .serverHost("zrcentos03")
                .buildAsync();
        client.connect().thenAccept(connAck -> System.out.println("connected:" + connAck));

        //订阅主题
        client.toAsync().subscribeWith()
                .topicFilter("/test/aa") //指定订阅主题
                .qos(MqttQos.AT_LEAST_ONCE) //指定订阅质量
                .callback(consumer)  //接收到订阅消息后的处理函数
                .send();

        //订阅主题
        client.toAsync().subscribeWith()
                .topicFilter("/test/bb")
                .qos(MqttQos.AT_LEAST_ONCE)
                .callback(consumer)
                .send();

        //取消订阅
        client.toAsync().unsubscribeWith()
                .topicFilter("/test/bb")
                .send();


        while (true) {
            System.out.println("");
            //发布消息
            client.toAsync().publishWith()
                    .topic("/test/aa")
                    .payload("1".getBytes())
                    .qos(MqttQos.EXACTLY_ONCE)
                    .send();
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

 

 类似资料: