MQTT-Android订阅和发布

狄冠宇
2023-12-01

订阅和接收

//    final String serverUri = "tcp://iot.eclipse.org:1883";
    final String serverUri = "tcp://ip:port";
    String clientId = "ExampleAndroidClient";
    final String subscriptionTopic = "subscribe_topic";
    final String publishTopic = "publish_topic";
    final String publishMessage = "Hello World!";
    private static String userName = "admin";
    private static String passWord = "password";
private void initConnect() {
        //封装好的MQTTClient供操作,发送和接手等设置都用它
        mqttAndroidClient = new MqttAndroidClient(getApplicationContext(), serverUri, clientId);
        //设置回调
        mqttAndroidClient.setCallback(new MqttCallbackExtended() {

            /**
             * 连接完成回调
             * @param reconnect true 断开重连,false 首次连接
             * @param serverURI 服务器URI
             */
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {

                if (reconnect) {
                    addToHistory("Reconnected to : " + serverURI);
                    // Because Clean Session is true, we need to re-subscribe
                    subscribeToTopic();
                } else {
                    addToHistory("Connected to: " + serverURI);
                }
            }

            /**
             * @desc 连接断开回调
             * 可在这里做一些重连等操作
             */
            @Override
            public void connectionLost(Throwable cause) {
                addToHistory("The Connection was lost.");
            }

            /**
             * 消息接收,如果在订阅的时候没有设置IMqttMessageListener,那么收到消息则会在这里回调。
             * 如果设置了IMqttMessageListener,则消息回调在IMqttMessageListener中
             * @param topic 该消息来自的订阅主题
             * @param message 消息内容
             */
            @Override
            public void messageArrived(String topic, MqttMessage message) {
                addToHistory("Incoming message: " + new String(message.getPayload()));
            }

            /**
             * 交付完成回调。在publish消息的时候会收到此回调.
             * qos:
             * 0 发送完则回调
             * 1 或 2 会在对方收到时候回调
             * @param token
             */
            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                addToHistory("deliveryComplete,token:" + token);
            }
        });

        //mqtt连接参数设置
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        //设置自动重连
        mqttConnectOptions.setAutomaticReconnect(true);
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录
        // 这里设置为true表示每次连接到服务器都以新的身份连接
        mqttConnectOptions.setCleanSession(false);
        //设置连接的用户名
        mqttConnectOptions.setUserName(userName);
        //设置连接的密码
        mqttConnectOptions.setPassword(passWord.toCharArray());
        // 设置超时时间 单位为秒
        mqttConnectOptions.setConnectionTimeout(10);
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
        mqttConnectOptions.setKeepAliveInterval(20);
        try {
            //设置好相关参数后,开始连接
            mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    Log.i(TAG, "onSuccess,asyncActionToken");
                    /*连接成功之后设置连接断开的缓冲配置*/
                    DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
                    //开启
                    disconnectedBufferOptions.setBufferEnabled(true);
                    //离线后最多缓存100调
                    disconnectedBufferOptions.setBufferSize(100);
                    //不一直持续留存
                    disconnectedBufferOptions.setPersistBuffer(false);
                    //删除旧消息
                    disconnectedBufferOptions.setDeleteOldestMessages(false);
                    mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
                    //订阅主题
                    subscribeToTopic();
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    addToHistory("Failed to connect to: " + serverUri);
                }
            });


        } catch (MqttException ex) {
            ex.printStackTrace();
        }
    }


    /**
     * 订阅主题
     */
    public void subscribeToTopic() {
        try {
            //主题、QOS、context,订阅监听,消息监听
            mqttAndroidClient.subscribe(subscriptionTopic, 0, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    addToHistory("Subscribed!");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    addToHistory("Failed to subscribe");
                }
            }, new IMqttMessageListener() {
                /**
                 * @desc 消息到达回调
                 * @param topic
                 * @param message
                 */
                @Override
                public void messageArrived(String topic, MqttMessage message) {
                    // message Arrived!
                    final String messageRecive = new String(message.getPayload());
                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            addToHistory("recive message:" + messageRecive);
                        }
                    });
                    System.out.println("Message: " + topic + " : " + messageRecive);

                }
            });

        } catch (MqttException ex) {
            System.err.println("Exception whilst subscribing");
            ex.printStackTrace();
        }
    }

发送

使用Apollo作为服务器来担当消息代理,又有mqttv3封装库,接收只需要订阅主题,发送只需往某个主题发送消息,在mqttv3中也已经封装好了。

/**
     * 发送消息¬
     */
    private static void publish() {
        //主题
        String topic = "主题";
        //消息内容
        String content = "来嘛,又给你推送消息";
        //设置qos
        int qos = 0;
        //服务器URI
        String broker = "tcp://ip:port"
        //客户端id
        String clientId = "JavaSample";

        MemoryPersistence persistence = new MemoryPersistence();
        try {
            //发送依旧使用MqttClient
            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            sampleClient.setCallback(new MqttCallback() {
                /**
                 * 连接断开回调
                 * @param throwable
                 */
                @Override
                public void connectionLost(Throwable throwable) {
                    System.out.println("connectionLost");
                }

                /**
                 * 消息接收回调,如果只发送,这里没关系
                 * @param s
                 * @param mqttMessage
                 * @throws Exception
                 */
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    System.out.println("messageArrived:" + s);
                }

                /**
                 * 交付完成回调。在publish消息的时候会收到此回调.
                 * qos:
                 * 0 发送完则回调
                 * 1 或 2 会在对方收到时候回调
                 * @param iMqttDeliveryToken id,可通过id来获取发送的消息内容
                 */
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    try {
                        System.out.println("deliveryComplete token:" + iMqttDeliveryToken.getMessage().toString());
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                }
            });
            //连接操作设置
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName(userName);
            connOpts.setPassword(passWord.toCharArray());
            connOpts.setCleanSession(false);
            System.out.println("Connecting to broker: " + broker);
            sampleClient.connect(connOpts);
            System.out.println("Connected");
            System.out.println("Publishing message: " + content);

            /*发送,使用MqttMessage封装类来进行发送*/
            MqttMessage message = new MqttMessage(content.getBytes());
            //设置优先级
            message.setQos(qos);
            //设置是否被服务器保留
            message.setRetained(true);
            //使用mqttClient进行发送到对应主题
            sampleClient.publish(topic, message);
            if (!sampleClient.isConnected()) {
                System.out.println(mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
            }
            System.out.println("Message published");
            //发送完则断开
//            sampleClient.disconnect();
            System.out.println("Disconnected");
            System.exit(0);
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }
  • 离线后,可以看到离线后发送消息是存在了缓存里,在设置了离线缓存配置后,缓存大小这些都由自己控制啦,待上线之后再自动进行发送
paho.mqtt.java.example I/System.out: LOG: Message Published
paho.mqtt.java.example I/System.out: LOG: 2 messages in buffer.
 paho.mqtt.java.example I/System.out: LOG: Message Published
paho.mqtt.java.example I/System.out: LOG: 3 messages in buffer.
paho.mqtt.java.example I/System.out: LOG: Message Published
paho.mqtt.java.example I/System.out: LOG: 4 messages in buffer.
paho.mqtt.java.example I/System.out: LOG: Message Published
paho.mqtt.java.example I/System.out: LOG: 5 messages in buffer.
paho.mqtt.java.example I/System.out: LOG: Message Published
paho.mqtt.java.example I/System.out: LOG: 6 messages in buffer.

设置了最大缓存条数,那么达到条数之后,就不会再存再里面咯。

  • QOS

qos代表服务质量,指的是交通优先级和资源预留控制机制,而不是接收的服务质量。 服务质量是为不同应用程序,用户或数据流提供的不同优先级的能力,或者也可以说是为数据流保证一定的性能水平的能力。
以下是每一个服务质量级别的具体描述
0 :最多一次传送 (只负责传送,发送过后就不管数据的传送情况)
1 : 至少一次传送 (确认数据交付)
2 :正好一次传送 (保证数据交付成功)

demo项目地址:https://github.com/nxSin/paho.mqtt.android。该项目frok自eclipese下项目,如果是Android使用,建议参考这个库来进行二次封装使用,内部对其封装的MqttAndroidClient使用了Service来进行处理连接和收发,完全适用Android场景。

 类似资料: