dependencies {
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.0.2'
compile group: 'commons-codec', name: 'commons-codec', version: '1.5'
}
MqttRecvClient.java类
public class MqttRecvClient {
/**
* 设置当前用户私有的MQTT的接入点。例如此处示意使用XXX,实际使用请替换用户自己的接入点。接入点的获取方法是,在控制台申请MQTT实例,每个实例都会分配一个接入点域名。
*/
final String broker ="tcp://xxx.mqtt.aliyuncs.com:1883";
/**
* 设置阿里云的AccessKey,用于鉴权
*/
final String acessKey ="xxx";
/**
* 设置阿里云的SecretKey,用于鉴权
*/
final String secretKey ="xxx";
/**
* 发消息使用的一级Topic,需要先在MQ控制台里申请
*/
final String topic ="xxx";
/**
* MQTT的ClientID,一般由两部分组成,GroupID@@@DeviceID
* 其中GroupID在MQ控制台里申请
* DeviceID由应用方设置,可能是设备编号等,需要唯一,否则服务端拒绝重复的ClientID连接
*/
final String consumerClientId ="GID_abc@@@"+System.currentTimeMillis();
String sign;
MemoryPersistence persistence = new MemoryPersistence();
MqttClient sampleClient;
MqttConnectOptions connOpts;
MqttThread mqttThread;
private static MqttRecvClient mqttRecvClient;
MessageListener messageListener;
private static boolean openLog=true;
private MqttRecvClient(@NonNull MessageListener messageListener){
if (mqttThread==null){
mqttThread=new MqttThread();
}
this.messageListener=messageListener;
mqttThread.start();
}
public static void init(Context context){
Intent intent=new Intent(context,MQTTService.class);
context.startService(intent);
}
public static void create(@NonNull MessageListener messageListener){
if (mqttRecvClient==null){
mqttRecvClient=new MqttRecvClient(messageListener);
}
}
class MqttThread extends Thread{
@Override
public void run() {
super.run();
if (sampleClient==null){
execut();
}
}
}
private void execut() {
try {
sampleClient = new MqttClient(broker, consumerClientId, persistence);
connOpts = new MqttConnectOptions();
log(tag,"Connecting to broker: " + broker);
/**
* 计算签名,将签名作为MQTT的password。
* 签名的计算方法,参考工具类MacSignature,第一个参数是ClientID的前半部分,即GroupID
* 第二个参数阿里云的SecretKey
*/
sign = MacSignature.macSignature(consumerClientId.split("@@@")[0], secretKey);
// /**
// * 设置订阅方订阅的Topic集合,此处遵循MQTT的订阅规则,可以是一级Topic,二级Topic,P2P消息不需要显式订阅,
// */
// final String[] topicFilters=new String[]{topic};
// final int[]qos={0};
/**
* 设置订阅方订阅的Topic集合,此处遵循MQTT的订阅规则,可以是一级Topic,二级Topic,P2P消息请订阅/p2p
*/
final String[] topicFilters=new String[]{topic+"/notice/",topic+"/p2p"};
final int[]qos={0,0};
connOpts.setUserName(acessKey);
connOpts.setServerURIs(new String[] { broker });
connOpts.setPassword(sign.toCharArray());
connOpts.setCleanSession(true);
connOpts.setKeepAliveInterval(100);
sampleClient.setCallback(new MqttCallback() {
public void connectionLost(Throwable throwable) {
log(tag,"mqtt connection lost");
throwable.printStackTrace();
while(!sampleClient.isConnected()){
try {
sampleClient.connect(connOpts);
sampleClient.subscribe(topicFilters,qos);
} catch (MqttException e) {
e.printStackTrace();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void messageArrived(String topic, MqttMessage mqttMessage) {
log(tag,"messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
if (messageListener!=null){
messageListener.onMessageArrived(topic, mqttMessage);
}
}
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log(tag,"deliveryComplete:" + iMqttDeliveryToken.getMessageId());
}
});
sampleClient.connect(connOpts);
sampleClient.subscribe(topicFilters,qos);
//Thread.sleep(Integer.MAX_VALUE);
log(tag,"end method");
} catch (Exception me) {
me.printStackTrace();
}
}
public interface MessageListener{
void onMessageArrived(String topic,MqttMessage mqttMessage);
}
public static void openLog(boolean bool){
openLog=bool;
}
private void log(String tag,String msg){
if (openLog) {
Log.d(tag, msg);
}
}
private String tag="MqttRecvClient--";
}
加密工具类:MacSignature.java
import org.apache.commons.codec.binary.Base64;
import java.nio.charset.Charset;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
public class MacSignature {
/**
* @param text 要签名的文本
* @param secretKey 阿里云MQ secretKey
* @return 加密后的字符串
* @throws InvalidKeyException
* @throws NoSuchAlgorithmException
*/
public static String macSignature(String text, String secretKey) throws InvalidKeyException, NoSuchAlgorithmException {
Charset charset = Charset.forName("UTF-8");
String algorithm = "HmacSHA1";
Mac mac = Mac.getInstance(algorithm);
mac.init(new SecretKeySpec(secretKey.getBytes(charset), algorithm));
byte[] bytes = mac.doFinal(text.getBytes(charset));
return new String(Base64.encodeBase64(bytes), charset);
}
/**
* 发送方签名方法
*
* @param clientId Mqtt ClientId
* @param secretKey 阿里云MQ secretKey
* @return 加密后的字符串
* @throws NoSuchAlgorithmException
* @throws InvalidKeyException
*/
public static String publishSignature(String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
return macSignature(clientId, secretKey);
}
/**
* 订阅方签名方法
*
* @param topics 要订阅的Topic集合
* @param clientId Mqtt ClientId
* @param secretKey 阿里云MQ secretKey
* @return 加密后的字符串
* @throws NoSuchAlgorithmException
* @throws InvalidKeyException
*/
public static String subSignature(List<String> topics, String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
Collections.sort(topics); //以字典顺序排序
String topicText = "";
for (String topic : topics) {
topicText += topic + "\n";
}
String text = topicText + clientId;
return macSignature(text, secretKey);
}
/**
* 订阅方签名方法
*
* @param topic 要订阅的Topic
* @param clientId Mqtt ClientId
* @param secretKey 阿里云MQ secretKey
* @return 加密后的字符串
* @throws NoSuchAlgorithmException
* @throws InvalidKeyException
*/
public static String subSignature(String topic, String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
List<String> topics = new ArrayList<String>();
topics.add(topic);
return subSignature(topics, clientId, secretKey);
}
}
MQTTService.java类
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MQTTService extends Service {
public MQTTService() {
}
@Override
public void onCreate() {
super.onCreate();
MqttRecvClient.create(new MqttRecvClient.MessageListener() {
@Override
public void onMessageArrived(String topic, MqttMessage mqttMessage) {
if(BuildConfig.DEBUG) {
Log.d("MQTTService ", "收到消息,topic=" + topic + ",message=" + new String(mqttMessage.getPayload()));
}
}
});
}
@Override
public IBinder onBind(Intent intent) {
return new MyBinder();
}
class MyBinder extends Binder{
public MQTTService getService(){
return MQTTService.this;
}
}
}
在AndroidManifest.xml中添加一个service:
<application
android:name=".App"
android:allowBackup="true"
android:icon="@mipmap/ic_launcher"
android:label="@string/app_name"
android:supportsRtl="true"
android:theme="@style/AppTheme">
<!--mqttservice-->
<service android:name=".mqtt.MQTTService">
android:enabled="true"
android:exported="true"
</service>
</application>
使用方法:在mainActivity.java中调用:
MqttRecvClient.init(context);