MQTT源码分析

屠建本
2023-12-01

MQTT介绍

MQTT是个轻量级的消息订阅/发布协议,基于TCP协议,在物联网中应用较广,当然也有的公司拿MQTT协议来做Push或IM。MQTT协议有很多客户端/服务端的实现,如Eclipse Paho就是其中一个。本文不对MQTT协议本身做介绍,而是主要分析下一个Paho MQTT客户端的代码实现。

Paho MQTT开源项目基本使用

  • 发布端代码案例
/**
 *发布端
 */
public class PublishSample {
    public static void main(String[] args) {

        String topic = "mqtt/test";
        String content = "hello 哈哈";
        int qos = 1;
        String broker = "tcp://iot.eclipse.org:1883";
        String userName = "test";
        String password = "test";
        String clientId = "pubClient";
        // 内存存储
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            // 创建客户端
            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            // 创建链接参数
            MqttConnectOptions connOpts = new MqttConnectOptions();
            // 在重新启动和重新连接时记住状态
            connOpts.setCleanSession(false);
            // 设置连接的用户名
            connOpts.setUserName(userName);
            connOpts.setPassword(password.toCharArray());
            // 建立连接
            sampleClient.connect(connOpts);
            // 创建消息
            MqttMessage message = new MqttMessage(content.getBytes());
            // 设置消息的服务质量
            message.setQos(qos);
            // 发布消息
            sampleClient.publish(topic, message);
            // 断开连接
            sampleClient.disconnect();
            // 关闭客户端
            sampleClient.close();
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }
}
  • 订阅端代码案例
/**
 *订阅端
 */
public class SubscribeSample {

    public static void main(String[] args) throws MqttException {   
        String HOST = "tcp://iot.eclipse.org:1883";
        String TOPIC = "mqtt/test";
        int qos = 1;
        String clientid = "subClient";
        String userName = "test";
        String passWord = "test";
        try {
            // host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
            MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence());
            // MQTT的连接设置
            MqttConnectOptions options = new MqttConnectOptions();
            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
            options.setCleanSession(true);
            // 设置连接的用户名
            options.setUserName(userName);
            // 设置连接的密码
            options.setPassword(passWord.toCharArray());
            // 设置超时时间 单位为秒
            options.setConnectionTimeout(10);
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            options.setKeepAliveInterval(20);
            // 设置回调函数
            client.setCallback(new MqttCallback() {

                public void connectionLost(Throwable cause) {
                    System.out.println("connectionLost");
                }

                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    System.out.println("topic:"+topic);
                    System.out.println("Qos:"+message.getQos());
                    System.out.println("message content:"+new String(message.getPayload()));
                    
                }

                public void deliveryComplete(IMqttDeliveryToken token) {
                    System.out.println("deliveryComplete---------"+ token.isComplete());
                }

            });
            client.connect(options);
            //订阅消息
            client.subscribe(TOPIC, qos);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

应用层源码分析

Mqtt内部源码中提供了两种外部使用的Client,MqttClient和MqttAsyncClient,接口功能基本相同,一个实现了IMqttClient接口,一个实现了IMqttAsyncClient接口。IMqttAsyncClient相当于IMqttCLient方法参数里增加了操作回调接口,因为是异步的嘛,所以需要接口回调通知。这里可以理解MqttClient是MqttAsyncClient的代理类,IMClient接口和IMqttAsyncClient提供了基本的订阅、解除订阅、连接、断开连接、重新连接、设置回调接口等功能,MqttClient和MqttAsyncClient比较,从名字可以看出来一个是异步的一个是同步的。实际上真正负责干活的是MqttAsyncClient,MqttClient对其MqttAsyncClient的API做了一层阻塞式的包装,我们可以从源码中对MqttClient的注释看出,如下:

/**
 * Lightweight client for talking to an MQTT server using methods that block until an operation completes.
 * <p>This class implements the blocking {@link IMqttClient} client interface where all actions block until they have completed (or timed out).
 * </p>
 * @see IMqttClient
 */
public class MqttClient implements IMqttClient {

在看下源码对MqttAsyncClient的描述

/**
 * Lightweight client for talking to an MQTT server using non-blocking methods that allow an operation to run in the background.
 * <p>
 * <p>This class implements the non-blocking {@link IMqttAsyncClient} client interface allowing applications to initiate MQTT actions and then carry on working while the MQTT action completes on a background thread.
 
 * @see IMqttAsyncClient
 */
public class MqttAsyncClient implements IMqttAsyncClient {

然后可以随便看一个MqttClient的subscribe方法

public void subscribe(String[] topicFilters, int[] qos) throws MqttException {
		IMqttToken tok = aClient.subscribe(topicFilters, qos, null, null);
		tok.waitForCompletion(getTimeToWait());
		int[] grantedQos = tok.getGrantedQos();
		for (int i = 0; i < grantedQos.length; ++i) {
			qos[i] = grantedQos[i];
		}
		if (grantedQos.length == 1 && qos[0] == 0x80) {
			throw new MqttException(MqttException.REASON_CODE_SUBSCRIBE_FAILED);
		}
	}

关键代码

tok.waitForCompletion(getTimeToWait());

可以看出MqttClient确实是对MqttAsyncClient的阻塞式封装。其它connect方法也是一样,就不看了。总的来说,使用MqttAsyncClient的相关api,都是异步的,必须依赖于回调的结果,才知道操作是否是成功的。而MqttClient是阻塞式,能顺序执行结束,不发生异常,则说明操作是成功的。

客户端操作接口

  • connect
  • disconnect
  • reconnect
  • subscribe
  • unsubscribe

分析客户端连接服务端做了哪些操作

MqttClient mqttClient=new MqttClient(host,clientId,persistence);
//MqttClient的构造函数
public MqttClient(String serverURI, String clientId, MqttClientPersistence persistence) throws MqttException {
		aClient = new MqttAsyncClient(serverURI, clientId, persistence);
	}
实际上内部其实构造了一个MqttAsyncClient对象,进一步可以说明,MqttClient是MqttAsyncClient的一个代理类
接下来看下MqttAsyncClient的构造函数
public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence persistence, MqttPingSender pingSender) throws MqttException {
        ...
        this.comms = new ClientComms(this, this.persistence, pingSender);
        ...
    }
省略了很多,这里我们可以看到构建了一个ClientComms对象,这也是个核心类
接下来看ClientComms的构造函数
public ClientComms(IMqttAsyncClient client, MqttClientPersistence persistence, MqttPingSender pingSender) throws MqttException {
	    ...
		this.callback 	= new CommsCallback(this);
		this.clientState = new ClientState(persistence, tokenStore, this.callback, this, pingSender);
		callback.setClientState(clientState);
	}
这里可以看到构造了一个CommsCallback和ClientState对象,然后CommsCallback持有ClientState的引用
到此MqttClient基本构造结束,可以说就是构造一些基本的配置
接下来分析MqttClient的connect方法,连接至服务器,MqttClient调用connect方法最终走到了MqttAsyncClient的connect方法,接着走到ClientComms的connect方法
public void connect(MqttConnectOptions options, MqttToken token) throws MqttException {
                ...
				ConnectBG conbg = new ConnectBG(this, token, connect);
				conbg.start();
	}
省略相关代码,这里直接看到创建了一个ConnectBG的后台任务,走进去这个ConnectBG,看它的run方法
public void run() {
		        ...
				// start the background processing threads before sending the connect
				// packet.
				NetworkModule networkModule = networkModules[networkModuleIndex];
				networkModule.start();
				receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());
				receiver.start("MQTT Rec: "+getClient().getClientId());
				sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());
				sender.start("MQTT Snd: "+getClient().getClientId());
				callback.start("MQTT Call: "+getClient().getClientId());				
				internalSend(conPacket, conToken);
			} catch (MqttException ex) {
				//@TRACE 212=connect failed: unexpected exception
				mqttEx = ex;
			} catch (Exception ex) {
				//@TRACE 209=connect failed: unexpected exception
				mqttEx =  ExceptionHelper.createMqttException(ex);
			}

			if (mqttEx != null) {
				shutdownConnection(conToken, mqttEx);
			}
		}
	}
可以看到创建了NetworkModule、CommsReceiver、CommsSender,然后分别start了
NetworkModule主要处理底层TCP通信的,Socket啥的,暂且不说底层,CommsReceiver这个用来接受消息。CommsSender用来发送消息。CommsCallabck,这个之前分析过,它的所用主要是作为CommsReceiver接收到消息后和Client的一个转换,通过它将内部接收的消息以回调的形式反馈给客户端。继续分析CommsReceiver,分析它的run方法
public void run() {
        ...
        while (running && (in != null)) {
           MqttWireMessage message = in.readMqttWireMessage();
                    if (message != null) {
                        // A new message has arrived
                        try {
                            clientState.notifyReceivedMsg(message);
                        } catch (Exception e) {
                            MqttLog.printStackTrace(e);
                        }
                    }
                }
    }
省略相关代码,看核心代码,只要线程运行状态并且读消息的流不为空,这里将会发一个通知notifyReceivedMsg通知收到了消息,继续跟进 clientState.notifyReceivedMsg(message);方法
protected void notifyReceivedMsg(MqttWireMessage message) throws MqttException {
       ...
		if (callback != null) {
				callback.messageArrived(send);
		}
		...
		
	}
省略相关代码,注意这里的callback就是我们之前说的CommsCallback。如果不为空,这里直接调用CommsCallback的messageArrived()方法,继续跟进这个方法
public void messageArrived(MqttPublish sendMessage) {
		final String methodName = "messageArrived";
		if (mqttCallback != null || callbacks.size() > 0) {
			// If we already have enough messages queued up in memory, wait
			// until some more queue space becomes available. This helps 
			// the client protect itself from getting flooded by messages 
			// from the server.
			synchronized (spaceAvailable) {
				while (running && !quiescing && messageQueue.size() >= INBOUND_QUEUE_SIZE) {
					try {
						// @TRACE 709=wait for spaceAvailable
						spaceAvailable.wait(200);
					} catch (InterruptedException ex) {
					}
				}
			}
			if (!quiescing) {
				messageQueue.addElement(sendMessage);
				// Notify the CommsCallback thread that there's work to do...
				synchronized (workAvailable) {
					// @TRACE 710=new msg avail, notify workAvailable
					workAvailable.notifyAll();
				}
			}
		}
	}
仔细看下代码,其实是一个生产者和消费者模型,CommsCallback内部确实是这种,内部通过一个消息队列messageQueue来存储消息的,继续跟进核心代码messageQueue.addElement(sendMessage);说明直接插入消息到消息队列中了,整理下,CommsReceiver接收线程收到消息后,通知CommsCallabck,直接将消息插入到其内部的消息队列中,我们在看下,CommsCallabck内部如何读取这个队列消息的。
public void run() {
		final String methodName = "run";
		while (running) {
				    ...
					MqttPublish message = null;
					synchronized (messageQueue) {
					    if (!messageQueue.isEmpty()) {
						  
							message = (MqttPublish) messageQueue.elementAt(0);
							messageQueue.removeElementAt(0);
					    }
					}
					if (null != message) {
						handleMessage(message);
					}
					...
				}
		
		}
	}
省略了相关代码,可以看出来内部是一个死循环,不断的从消息队列中读取消息。消息读取之后,从队列删除,接着交给handleMessage方法处理,继续跟进这个handleMessage方法
private void handleMessage(MqttPublish publishMessage)
			throws MqttException, Exception {
		  ...
		// @TRACE 713=call messageArrived key={0} topic={1}
		deliverMessage(destName, publishMessage.getMessageId(),
				publishMessage.getMessage());
		...
	}
省略相关代码,可以看出调用了deliverMessage方法,继续跟进
protected boolean deliverMessage(String topicName, int messageId, MqttMessage aMessage) throws Exception{		
		
		if (mqttCallback != null && !delivered) {
			aMessage.setId(messageId);
			mqttCallback.messageArrived(topicName, aMessage);
			delivered = true;
		}
		return delivered;
	}
可以清晰的看到,最终回调了我们最熟悉的MqttCallback的messageArrived()方法,客户端开始进行消息处理,流程大概结束。

内部线程梳理

由以上流程分析,这个开源Mqtt客户端内部大概有以下几种线程

  • Connect mqtt与服务器连接线程,线程名称为 "MQTT Con: "+getClient().getClientId()
  • Sender 消息发送线程,线程名称为 "MQTT Snd: "+getClient().getClientId()
  • Receiver 消息接收线程,线程名称为 "MQTT Rec: "+getClient().getClientId()
  • Callback 消息回调线程,线程名称为 "MQTT Call: "+getClient().getClientId()

此外还有一个重要的底层角色,NetworkModule,他负责处理底层的Socket TCP通信相关,这个未分析

注意点

每个Mqtt客户端的clientId不能一样,要保证唯一性,否则连接时Broker代理服务器会将上一个相同的clientId踢掉断开!并且会出现EOFException!

 类似资料: