MQTT是个轻量级的消息订阅/发布协议,基于TCP协议,在物联网中应用较广,当然也有的公司拿MQTT协议来做Push或IM。MQTT协议有很多客户端/服务端的实现,如Eclipse Paho就是其中一个。本文不对MQTT协议本身做介绍,而是主要分析下一个Paho MQTT客户端的代码实现。
/**
*发布端
*/
public class PublishSample {
public static void main(String[] args) {
String topic = "mqtt/test";
String content = "hello 哈哈";
int qos = 1;
String broker = "tcp://iot.eclipse.org:1883";
String userName = "test";
String password = "test";
String clientId = "pubClient";
// 内存存储
MemoryPersistence persistence = new MemoryPersistence();
try {
// 创建客户端
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
// 创建链接参数
MqttConnectOptions connOpts = new MqttConnectOptions();
// 在重新启动和重新连接时记住状态
connOpts.setCleanSession(false);
// 设置连接的用户名
connOpts.setUserName(userName);
connOpts.setPassword(password.toCharArray());
// 建立连接
sampleClient.connect(connOpts);
// 创建消息
MqttMessage message = new MqttMessage(content.getBytes());
// 设置消息的服务质量
message.setQos(qos);
// 发布消息
sampleClient.publish(topic, message);
// 断开连接
sampleClient.disconnect();
// 关闭客户端
sampleClient.close();
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
/**
*订阅端
*/
public class SubscribeSample {
public static void main(String[] args) throws MqttException {
String HOST = "tcp://iot.eclipse.org:1883";
String TOPIC = "mqtt/test";
int qos = 1;
String clientid = "subClient";
String userName = "test";
String passWord = "test";
try {
// host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence());
// MQTT的连接设置
MqttConnectOptions options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
// 设置连接的用户名
options.setUserName(userName);
// 设置连接的密码
options.setPassword(passWord.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
// 设置回调函数
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println("connectionLost");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("topic:"+topic);
System.out.println("Qos:"+message.getQos());
System.out.println("message content:"+new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------"+ token.isComplete());
}
});
client.connect(options);
//订阅消息
client.subscribe(TOPIC, qos);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Mqtt内部源码中提供了两种外部使用的Client,MqttClient和MqttAsyncClient,接口功能基本相同,一个实现了IMqttClient接口,一个实现了IMqttAsyncClient接口。IMqttAsyncClient相当于IMqttCLient方法参数里增加了操作回调接口,因为是异步的嘛,所以需要接口回调通知。这里可以理解MqttClient是MqttAsyncClient的代理类,IMClient接口和IMqttAsyncClient提供了基本的订阅、解除订阅、连接、断开连接、重新连接、设置回调接口等功能,MqttClient和MqttAsyncClient比较,从名字可以看出来一个是异步的一个是同步的。实际上真正负责干活的是MqttAsyncClient,MqttClient对其MqttAsyncClient的API做了一层阻塞式的包装,我们可以从源码中对MqttClient的注释看出,如下:
/**
* Lightweight client for talking to an MQTT server using methods that block until an operation completes.
* <p>This class implements the blocking {@link IMqttClient} client interface where all actions block until they have completed (or timed out).
* </p>
* @see IMqttClient
*/
public class MqttClient implements IMqttClient {
在看下源码对MqttAsyncClient的描述
/**
* Lightweight client for talking to an MQTT server using non-blocking methods that allow an operation to run in the background.
* <p>
* <p>This class implements the non-blocking {@link IMqttAsyncClient} client interface allowing applications to initiate MQTT actions and then carry on working while the MQTT action completes on a background thread.
* @see IMqttAsyncClient
*/
public class MqttAsyncClient implements IMqttAsyncClient {
然后可以随便看一个MqttClient的subscribe方法
public void subscribe(String[] topicFilters, int[] qos) throws MqttException {
IMqttToken tok = aClient.subscribe(topicFilters, qos, null, null);
tok.waitForCompletion(getTimeToWait());
int[] grantedQos = tok.getGrantedQos();
for (int i = 0; i < grantedQos.length; ++i) {
qos[i] = grantedQos[i];
}
if (grantedQos.length == 1 && qos[0] == 0x80) {
throw new MqttException(MqttException.REASON_CODE_SUBSCRIBE_FAILED);
}
}
关键代码
tok.waitForCompletion(getTimeToWait());
可以看出MqttClient确实是对MqttAsyncClient的阻塞式封装。其它connect方法也是一样,就不看了。总的来说,使用MqttAsyncClient的相关api,都是异步的,必须依赖于回调的结果,才知道操作是否是成功的。而MqttClient是阻塞式,能顺序执行结束,不发生异常,则说明操作是成功的。
…
MqttClient mqttClient=new MqttClient(host,clientId,persistence);
//MqttClient的构造函数
public MqttClient(String serverURI, String clientId, MqttClientPersistence persistence) throws MqttException {
aClient = new MqttAsyncClient(serverURI, clientId, persistence);
}
实际上内部其实构造了一个MqttAsyncClient对象,进一步可以说明,MqttClient是MqttAsyncClient的一个代理类
接下来看下MqttAsyncClient的构造函数
public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence persistence, MqttPingSender pingSender) throws MqttException {
...
this.comms = new ClientComms(this, this.persistence, pingSender);
...
}
省略了很多,这里我们可以看到构建了一个ClientComms对象,这也是个核心类
接下来看ClientComms的构造函数
public ClientComms(IMqttAsyncClient client, MqttClientPersistence persistence, MqttPingSender pingSender) throws MqttException {
...
this.callback = new CommsCallback(this);
this.clientState = new ClientState(persistence, tokenStore, this.callback, this, pingSender);
callback.setClientState(clientState);
}
这里可以看到构造了一个CommsCallback和ClientState对象,然后CommsCallback持有ClientState的引用
到此MqttClient基本构造结束,可以说就是构造一些基本的配置
接下来分析MqttClient的connect方法,连接至服务器,MqttClient调用connect方法最终走到了MqttAsyncClient的connect方法,接着走到ClientComms的connect方法
public void connect(MqttConnectOptions options, MqttToken token) throws MqttException {
...
ConnectBG conbg = new ConnectBG(this, token, connect);
conbg.start();
}
省略相关代码,这里直接看到创建了一个ConnectBG的后台任务,走进去这个ConnectBG,看它的run方法
public void run() {
...
// start the background processing threads before sending the connect
// packet.
NetworkModule networkModule = networkModules[networkModuleIndex];
networkModule.start();
receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());
receiver.start("MQTT Rec: "+getClient().getClientId());
sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());
sender.start("MQTT Snd: "+getClient().getClientId());
callback.start("MQTT Call: "+getClient().getClientId());
internalSend(conPacket, conToken);
} catch (MqttException ex) {
//@TRACE 212=connect failed: unexpected exception
mqttEx = ex;
} catch (Exception ex) {
//@TRACE 209=connect failed: unexpected exception
mqttEx = ExceptionHelper.createMqttException(ex);
}
if (mqttEx != null) {
shutdownConnection(conToken, mqttEx);
}
}
}
可以看到创建了NetworkModule、CommsReceiver、CommsSender,然后分别start了
NetworkModule主要处理底层TCP通信的,Socket啥的,暂且不说底层,CommsReceiver这个用来接受消息。CommsSender用来发送消息。CommsCallabck,这个之前分析过,它的所用主要是作为CommsReceiver接收到消息后和Client的一个转换,通过它将内部接收的消息以回调的形式反馈给客户端。继续分析CommsReceiver,分析它的run方法
public void run() {
...
while (running && (in != null)) {
MqttWireMessage message = in.readMqttWireMessage();
if (message != null) {
// A new message has arrived
try {
clientState.notifyReceivedMsg(message);
} catch (Exception e) {
MqttLog.printStackTrace(e);
}
}
}
}
省略相关代码,看核心代码,只要线程运行状态并且读消息的流不为空,这里将会发一个通知notifyReceivedMsg通知收到了消息,继续跟进 clientState.notifyReceivedMsg(message);方法
protected void notifyReceivedMsg(MqttWireMessage message) throws MqttException {
...
if (callback != null) {
callback.messageArrived(send);
}
...
}
省略相关代码,注意这里的callback就是我们之前说的CommsCallback。如果不为空,这里直接调用CommsCallback的messageArrived()方法,继续跟进这个方法
public void messageArrived(MqttPublish sendMessage) {
final String methodName = "messageArrived";
if (mqttCallback != null || callbacks.size() > 0) {
// If we already have enough messages queued up in memory, wait
// until some more queue space becomes available. This helps
// the client protect itself from getting flooded by messages
// from the server.
synchronized (spaceAvailable) {
while (running && !quiescing && messageQueue.size() >= INBOUND_QUEUE_SIZE) {
try {
// @TRACE 709=wait for spaceAvailable
spaceAvailable.wait(200);
} catch (InterruptedException ex) {
}
}
}
if (!quiescing) {
messageQueue.addElement(sendMessage);
// Notify the CommsCallback thread that there's work to do...
synchronized (workAvailable) {
// @TRACE 710=new msg avail, notify workAvailable
workAvailable.notifyAll();
}
}
}
}
仔细看下代码,其实是一个生产者和消费者模型,CommsCallback内部确实是这种,内部通过一个消息队列messageQueue来存储消息的,继续跟进核心代码messageQueue.addElement(sendMessage);说明直接插入消息到消息队列中了,整理下,CommsReceiver接收线程收到消息后,通知CommsCallabck,直接将消息插入到其内部的消息队列中,我们在看下,CommsCallabck内部如何读取这个队列消息的。
public void run() {
final String methodName = "run";
while (running) {
...
MqttPublish message = null;
synchronized (messageQueue) {
if (!messageQueue.isEmpty()) {
message = (MqttPublish) messageQueue.elementAt(0);
messageQueue.removeElementAt(0);
}
}
if (null != message) {
handleMessage(message);
}
...
}
}
}
省略了相关代码,可以看出来内部是一个死循环,不断的从消息队列中读取消息。消息读取之后,从队列删除,接着交给handleMessage方法处理,继续跟进这个handleMessage方法
private void handleMessage(MqttPublish publishMessage)
throws MqttException, Exception {
...
// @TRACE 713=call messageArrived key={0} topic={1}
deliverMessage(destName, publishMessage.getMessageId(),
publishMessage.getMessage());
...
}
省略相关代码,可以看出调用了deliverMessage方法,继续跟进
protected boolean deliverMessage(String topicName, int messageId, MqttMessage aMessage) throws Exception{
if (mqttCallback != null && !delivered) {
aMessage.setId(messageId);
mqttCallback.messageArrived(topicName, aMessage);
delivered = true;
}
return delivered;
}
可以清晰的看到,最终回调了我们最熟悉的MqttCallback的messageArrived()方法,客户端开始进行消息处理,流程大概结束。
由以上流程分析,这个开源Mqtt客户端内部大概有以下几种线程
此外还有一个重要的底层角色,NetworkModule,他负责处理底层的Socket TCP通信相关,这个未分析
每个Mqtt客户端的clientId不能一样,要保证唯一性,否则连接时Broker代理服务器会将上一个相同的clientId踢掉断开!并且会出现EOFException!