android集成阿里MQTT

魏书
2023-12-01
  • 在项目app build.gradle中,
dependencies {
    compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.0.2'
    compile group: 'commons-codec', name: 'commons-codec', version: '1.5'
   }

MqttRecvClient.java类


public class MqttRecvClient {
    /**
     * 设置当前用户私有的MQTT的接入点。例如此处示意使用XXX,实际使用请替换用户自己的接入点。接入点的获取方法是,在控制台申请MQTT实例,每个实例都会分配一个接入点域名。
     */
    final String broker ="tcp://xxx.mqtt.aliyuncs.com:1883";
    /**
     * 设置阿里云的AccessKey,用于鉴权
     */
    final String acessKey ="xxx";
    /**
    * 设置阿里云的SecretKey,用于鉴权
    */
    final String secretKey ="xxx";
    /**
     * 发消息使用的一级Topic,需要先在MQ控制台里申请
     */
   final String topic ="xxx";
    /**
     * MQTT的ClientID,一般由两部分组成,GroupID@@@DeviceID
     * 其中GroupID在MQ控制台里申请
     * DeviceID由应用方设置,可能是设备编号等,需要唯一,否则服务端拒绝重复的ClientID连接
     */
    final String consumerClientId ="GID_abc@@@"+System.currentTimeMillis();

    String sign;
    MemoryPersistence persistence = new MemoryPersistence();
    MqttClient sampleClient;
    MqttConnectOptions connOpts;
    MqttThread mqttThread;
    private static MqttRecvClient mqttRecvClient;
    MessageListener messageListener;
    private static boolean openLog=true;
    private MqttRecvClient(@NonNull MessageListener messageListener){
        if (mqttThread==null){
            mqttThread=new MqttThread();
        }
        this.messageListener=messageListener;
        mqttThread.start();
    }
    public static void init(Context context){
        Intent intent=new Intent(context,MQTTService.class);
        context.startService(intent);
    }

    public static void create(@NonNull MessageListener messageListener){
        if (mqttRecvClient==null){
            mqttRecvClient=new MqttRecvClient(messageListener);
        }
    }



    class MqttThread extends Thread{
        @Override
        public void run() {
            super.run();
            if (sampleClient==null){
                execut();
            }
        }
    }

    private   void execut() {

        try {
              sampleClient = new MqttClient(broker, consumerClientId, persistence);
              connOpts = new MqttConnectOptions();
            log(tag,"Connecting to broker: " + broker);
            /**
             * 计算签名,将签名作为MQTT的password。
             * 签名的计算方法,参考工具类MacSignature,第一个参数是ClientID的前半部分,即GroupID
             * 第二个参数阿里云的SecretKey
             */
            sign = MacSignature.macSignature(consumerClientId.split("@@@")[0], secretKey);
//            /**
//             * 设置订阅方订阅的Topic集合,此处遵循MQTT的订阅规则,可以是一级Topic,二级Topic,P2P消息不需要显式订阅,
//             */
//            final String[] topicFilters=new String[]{topic};
//            final int[]qos={0};
            /**
             * 设置订阅方订阅的Topic集合,此处遵循MQTT的订阅规则,可以是一级Topic,二级Topic,P2P消息请订阅/p2p
             */
            final String[] topicFilters=new String[]{topic+"/notice/",topic+"/p2p"};
            final int[]qos={0,0};
            connOpts.setUserName(acessKey);
            connOpts.setServerURIs(new String[] { broker });
            connOpts.setPassword(sign.toCharArray());
            connOpts.setCleanSession(true);
            connOpts.setKeepAliveInterval(100);
            sampleClient.setCallback(new MqttCallback() {
                public void connectionLost(Throwable throwable) {
                    log(tag,"mqtt connection lost");
                    throwable.printStackTrace();
                    while(!sampleClient.isConnected()){
                        try {
                            sampleClient.connect(connOpts);
                            sampleClient.subscribe(topicFilters,qos);
                        } catch (MqttException e) {
                            e.printStackTrace();
                        }
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
                public void messageArrived(String topic, MqttMessage mqttMessage)  {
                    log(tag,"messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
                    if (messageListener!=null){
                        messageListener.onMessageArrived(topic, mqttMessage);
                    }
                }
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    log(tag,"deliveryComplete:" + iMqttDeliveryToken.getMessageId());
                }
            });
            sampleClient.connect(connOpts);
            sampleClient.subscribe(topicFilters,qos);
            //Thread.sleep(Integer.MAX_VALUE);
            log(tag,"end method");

        } catch (Exception me) {
            me.printStackTrace();
        }
    }

    public interface MessageListener{
        void onMessageArrived(String topic,MqttMessage mqttMessage);
    }

    public static void openLog(boolean bool){
        openLog=bool;
    }

    private void log(String tag,String msg){
        if (openLog) {
            Log.d(tag, msg);
        }
    }
    private String tag="MqttRecvClient--";
}

加密工具类:MacSignature.java


import org.apache.commons.codec.binary.Base64;

import java.nio.charset.Charset;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;

public class MacSignature {
    /**
     * @param text      要签名的文本
     * @param secretKey 阿里云MQ secretKey
     * @return 加密后的字符串
     * @throws InvalidKeyException
     * @throws NoSuchAlgorithmException
     */
    public static String macSignature(String text, String secretKey) throws InvalidKeyException, NoSuchAlgorithmException {
        Charset charset = Charset.forName("UTF-8");
        String algorithm = "HmacSHA1";
        Mac mac = Mac.getInstance(algorithm);
        mac.init(new SecretKeySpec(secretKey.getBytes(charset), algorithm));
        byte[] bytes = mac.doFinal(text.getBytes(charset));
        return new String(Base64.encodeBase64(bytes), charset);
    }
    /**
     * 发送方签名方法
     *
     * @param clientId  Mqtt ClientId
     * @param secretKey 阿里云MQ secretKey
     * @return 加密后的字符串
     * @throws NoSuchAlgorithmException
     * @throws InvalidKeyException
     */
    public static String publishSignature(String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
        return macSignature(clientId, secretKey);
    }
    /**
     * 订阅方签名方法
     *
     * @param topics    要订阅的Topic集合
     * @param clientId  Mqtt ClientId
     * @param secretKey 阿里云MQ secretKey
     * @return 加密后的字符串
     * @throws NoSuchAlgorithmException
     * @throws InvalidKeyException
     */
    public static String subSignature(List<String> topics, String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
        Collections.sort(topics); //以字典顺序排序
        String topicText = "";
        for (String topic : topics) {
            topicText += topic + "\n";
        }
        String text = topicText + clientId;
        return macSignature(text, secretKey);
    }
    /**
     * 订阅方签名方法
     *
     * @param topic     要订阅的Topic
     * @param clientId  Mqtt ClientId
     * @param secretKey 阿里云MQ secretKey
     * @return 加密后的字符串
     * @throws NoSuchAlgorithmException
     * @throws InvalidKeyException
     */
    public static String subSignature(String topic, String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
        List<String> topics = new ArrayList<String>();
        topics.add(topic);
        return subSignature(topics, clientId, secretKey);
    }
}

MQTTService.java类


import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MQTTService extends Service {
    public MQTTService() {
    }

    @Override
    public void onCreate() {
        super.onCreate();
        MqttRecvClient.create(new MqttRecvClient.MessageListener() {
            @Override
            public void onMessageArrived(String topic, MqttMessage mqttMessage) {
                if(BuildConfig.DEBUG) {
                    Log.d("MQTTService ", "收到消息,topic=" + topic + ",message=" + new String(mqttMessage.getPayload()));
                }
            }
        });
    }

    @Override
    public IBinder onBind(Intent intent) {
       return new MyBinder();
    }

    class MyBinder extends Binder{
        public MQTTService getService(){
            return MQTTService.this;
        }
    }
}

在AndroidManifest.xml中添加一个service:

 <application
        android:name=".App"
        android:allowBackup="true"
        android:icon="@mipmap/ic_launcher"
        android:label="@string/app_name"
        android:supportsRtl="true"
        android:theme="@style/AppTheme">

        <!--mqttservice-->  
        <service android:name=".mqtt.MQTTService">
            android:enabled="true"
            android:exported="true"
        </service>
    </application>

使用方法:在mainActivity.java中调用:

MqttRecvClient.init(context);
 类似资料: