当前位置: 首页 > 工具软件 > mqttc > 使用案例 >

Mqtt C实现记录,流程分析

向苗宣
2023-12-01

文章中的API使用流程都是以samples中的实例程序作为参考
本文基于开源的mqtt c实现:


1. 访问模式

Mqtt C中分为同步访问异步访问模式

  • 同步访问的时候,请求会阻塞到访问处,知道有结果返回;
  • 异步访问的时候,会将请求委托给Mqtt c client然后直接返回(零等待),最后结果返回之后,会回调对应的回调函数。
  • 文章主要讲异步模式

2. 主要数据结构

1. MQTTAsync

  • MQTTAsync 是一个void*类型的指针,表示一个 Mqtt client(context) 句柄, 即MQTTAsyncs
/**
 * A handle representing an MQTT client. A valid client handle is available
 * following a successful call to MQTTAsync_create().
 */
typedef void* MQTTAsync;

2. MQTTAsyncs

用于封装请求的信息,包括URL,端口,回调函数等访问参数

typedef struct MQTTAsync_struct
{
	char* serverURI; //服务器url
	int ssl;
	int websocket;
	Clients* c;

	/* "Global", to the client, callback definitions */
	//连接失效的回调函数
    //函数原型: typedef void MQTTAsync_connectionLost(void* context, char* cause);
	MQTTAsync_connectionLost* cl; 

    //消息到来的回调函数
    //原型:typedef int MQTTAsync_messageArrived(void* context, char* topicName, int topicLen,
    // MQTTAsync_message* message);
	MQTTAsync_messageArrived* ma; 

    //消息发送成功的回调通知函数
    //原型:typedef void MQTTAsync_deliveryComplete(void* context, MQTTAsync_token token);
	MQTTAsync_deliveryComplete* dc; 

	void* clContext; /* the context to be associated with the conn lost callback*/
	void* maContext; /* the context to be associated with the msg arrived callback*/
	void* dcContext; /* the context to be associated with the deliv complete callback*/

    //连接成功的回调函数
    //原型:typedef void MQTTAsync_connected(void* context, char* cause);
	MQTTAsync_connected* connected;
	void* connected_context; /* the context to be associated with the connected callback*/

    //连接断开的回调函数
	//原型:typedef void MQTTAsync_disconnected(void* context, MQTTProperties* properties,
		//enum MQTTReasonCodes reasonCode);
	MQTTAsync_disconnected* disconnected; 
	void* disconnected_context; /* the context to be associated with the disconnected callback*/

	/* Each time connect is called, we store the options that were used.  These are reused in
	   any call to reconnect, or an automatic reconnect attempt */
	MQTTAsync_command  connect;		/* Connect operation properties */
	MQTTAsync_command disconnect;		/* Disconnect operation properties */
	MQTTAsync_command* pending_write;       /* Is there a socket write pending? */

	List* responses;
	unsigned int command_seqno;

	MQTTPacket* pack;

	/* added for offline buffering */
	MQTTAsync_createOptions* createOptions;
	int shouldBeConnected;

	/* added for automatic reconnect */
	int automaticReconnect;
	int minRetryInterval;
	int maxRetryInterval;
	int serverURIcount;
	char** serverURIs;
	int connectTimeout;

	int currentInterval;
	START_TIME_TYPE lastConnectionFailedTime;
	int retrying;
	int reconnectNow;

	/* MQTT V5 properties */
	MQTTProperties* connectProps;
	MQTTProperties* willProps;

} MQTTAsyncs;

3. MQTTAsync_message

Mqtt的Message结构体

typedef struct
{
	/** The eyecatcher for this structure.  must be MQTM. */
	char struct_id[4];
	/** The version number of this structure.  Must be 0 or 1.
	 *  0 indicates no message properties */
	int struct_version;
	/** The length of the MQTT message payload in bytes. */
	int payloadlen;
	/** A pointer to the payload of the MQTT message. */
	void* payload;
	/**
     * The quality of service (QoS) assigned to the message.
     * There are three levels of QoS:
     * <DL>
     * <DT><B>QoS0</B></DT>
     * <DD>Fire and forget - the message may not be delivered</DD>
     * <DT><B>QoS1</B></DT>
     * <DD>At least once - the message will be delivered, but may be
     * delivered more than once in some circumstances.</DD>
     * <DT><B>QoS2</B></DT>
     * <DD>Once and one only - the message will be delivered exactly once.</DD>
     * </DL>
     */
	int qos;
	/**
     * The retained flag serves two purposes depending on whether the message
     * it is associated with is being published or received.
     *
     * <b>retained = true</b><br>
     * For messages being published, a true setting indicates that the MQTT
     * server should retain a copy of the message. The message will then be
     * transmitted to new subscribers to a topic that matches the message topic.
     * For subscribers registering a new subscription, the flag being true
     * indicates that the received message is not a new one, but one that has
     * been retained by the MQTT server.
     *
     * <b>retained = false</b> <br>
     * For publishers, this indicates that this message should not be retained
     * by the MQTT server. For subscribers, a false setting indicates this is
     * a normal message, received as a result of it being published to the
     * server.
     */
	int retained;
	/**
      * The dup flag indicates whether or not this message is a duplicate.
      * It is only meaningful when receiving QoS1 messages. When true, the
      * client application should take appropriate action to deal with the
      * duplicate message.
      */
	int dup;
	/** The message identifier is normally reserved for internal use by the
      * MQTT client and server.
      */
	int msgid;
	/**
	 * The MQTT V5 properties associated with the message.
	 */
	MQTTProperties properties;
} MQTTAsync_message;

4. MQTTAsync_responseOptions

  • MQTTAsync_responseOptions表示每次操作的响应配置,包括 操作成功,操作失败 等操作;
  • 同类的还有:
  • MQTTAsync_disconnectOptions : 断开连接选项
  • MQTTAsync_connectOptions: 连接选项
  • MQTTAsync_init_options:初始化选项 等等
typedef struct MQTTAsync_responseOptions
{
	/** The eyecatcher for this structure.  Must be MQTR */
	char struct_id[4];
	/** The version number of this structure.  Must be 0 or 1
	 *   if 0, no MQTTV5 options */
	int struct_version;
	/**
    * A pointer to a callback function to be called if the API call successfully
    * completes.  Can be set to NULL, in which case no indication of successful
    * completion will be received.
    */
	MQTTAsync_onSuccess* onSuccess;
	/**
    * A pointer to a callback function to be called if the API call fails.
    * Can be set to NULL, in which case no indication of unsuccessful
    * completion will be received.
    */
	MQTTAsync_onFailure* onFailure;
	/**
    * A pointer to any application-specific context. The
    * the <i>context</i> pointer is passed to success or failure callback functions to
    * provide access to the context information in the callback.
    */
	void* context;
	/**
    * A token is returned from the call.  It can be used to track
    * the state of this request, both in the callbacks and in future calls
    * such as ::MQTTAsync_waitForCompletion.
    */
	MQTTAsync_token token;
	/**
    * A pointer to a callback function to be called if the API call successfully
    * completes.  Can be set to NULL, in which case no indication of successful
    * completion will be received.
    */
	MQTTAsync_onSuccess5* onSuccess5;
	/**
    * A pointer to a callback function to be called if the API call successfully
    * completes.  Can be set to NULL, in which case no indication of successful
    * completion will be received.
    */
	MQTTAsync_onFailure5* onFailure5;
	/**
	 * MQTT V5 input properties
	 */
	MQTTProperties properties;
	/*
	 * MQTT V5 subscribe options, when used with subscribe only.
	 */
	MQTTSubscribe_options subscribeOptions;
	/*
	 * MQTT V5 subscribe option count, when used with subscribeMany only.
	 * The number of entries in the subscribe_options_list array.
	 */
	int subscribeOptionsCount;
	/*
	 * MQTT V5 subscribe option array, when used with subscribeMany only.
	 */
	MQTTSubscribe_options* subscribeOptionsList;
} MQTTAsync_responseOptions;

5. List

List是一个链表, Mqttc中的commands信令队列就是用该链表存储/获取的

  • static List* commands = NULL; //Commands信令队列;
  • static List* handles = NULL; //clients队列,记录所有的client(MQTTAsync)对象;
/**
 * Structure to hold all data for one list
 */
typedef struct
{
	ListElement *first,	/**< first element in the list */
				*last,	/**< last element in the list */
				*current;	/**< current element in the list, for iteration */
	int count;  /**< no of items */
	size_t size;  /**< heap storage used */
} List;

2. 流程

  • Mqtt c的同步没什么可说的,类似于调用一个RPC接口一样的,主要看异步请求的流程
  • Mqtt C实现的src/samples文件夹下有异步的subscribe和publish的例子,从这里可以作为切入点来看MQTTC
    的请求流程

1. 订阅

  1. MQTTAsync_subscribe.c
//MQTTAsync_subscribe.c

//main()
int main(int argc, char* argv[])
{
    //[*1] 创建一个MQTTAsync的handle
	MQTTAsync client; 
	//设置连接选项参数,这里默认给了初始化的参数,mqtt-c自己提供的
	MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;//[*2]
    //设置断开连接选项参数,这里默认给了初始化的参数,mqtt-c自己提供的
	MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;//[*3s]
	int rc;
	int ch;

    //真正创建一个MQTTAsync 的结构体(这里其实是MQTTAsyncs结构体,下面解释)
	MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);

    //设置回调函数
	MQTTAsync_setCallbacks(
	client/*MQTTAsync handle*/,
	client/*context*/,
	connlost/*MQTTAsync_connectionLost*/,
	msgarrvd/*MQTTAsync_messageArrived*/,
	NULL/*MQTTAsync_deliveryComplete*/);

	conn_opts.keepAliveInterval = 20; //心跳间隔
	conn_opts.cleansession = 1; //在connect/disconnect的时候,是否清除上一个连接的seesion状态信息缓存
	                           //(重启一个新的)。这个session状态用来确认消息质量保证(至少一次或者只有一次)
	conn_opts.onSuccess = onConnect; //连接成功的回调函数
	conn_opts.onFailure = onConnectFailure; //连接失败的回调函数
	conn_opts.context = client; //上下文对象,即MQTTAsync
	//开始请求连接
	if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
	{
		printf("Failed to start connect, return code %d\n", rc);
		exit(EXIT_FAILURE);
	}
   
	while	(!subscribed)
		#if defined(WIN32)
			Sleep(100);
		#else
		    //等待10ms, 一定时间内确保subscribe动作完成
			usleep(10000L);
		#endif

	if (finished)
		goto exit;

	do 
	{
		ch = getchar();
	} while (ch!='Q' && ch != 'q');

	disc_opts.onSuccess = onDisconnect;
	if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
	{
		printf("Failed to start disconnect, return code %d\n", rc);
		exit(EXIT_FAILURE);
	}
 	while	(!disc_finished)
		#if defined(WIN32)
			Sleep(100);
		#else
			usleep(10000L);
		#endif

exit:
	MQTTAsync_destroy(&client);
 	return rc;
}
//--------------------------------------------------------------------------------------

//连接成功回调
void onConnect(void* context, MQTTAsync_successData* response)
{
	MQTTAsync client = (MQTTAsync)context;
	MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;//获取默认的Option
	int rc;

	printf("Successful connection\n");

	printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
           "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
	opts.onSuccess = onSubscribe; //订阅成功时回调
	                             //原型:typedef void MQTTAsync_onSuccess(void* context, 
	                                  //MQTTAsync_successData* response);
	opts.onFailure = onSubscribeFailure; //订阅失败时回调
	                                     //原型:typedef void MQTTAsync_onFailure(void* context, 
	                                         // MQTTAsync_failureData* response);
	opts.context = client;

	deliveredtoken = 0;

    //开始订阅
	if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
	{
		printf("Failed to start subscribe, return code %d\n", rc);
		exit(EXIT_FAILURE);
	}
}
//--------------------------------------------------------------------------------------

//订阅成功
void onSubscribe(void* context, MQTTAsync_successData* response)
{
	printf("Subscribe succeeded\n");
	subscribed = 1; //标记订阅成功
}
  1. MQTTAsync_create
int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId,
		int persistence_type, void* persistence_context)
{
	return MQTTAsync_createWithOptions(handle, serverURI, clientId, persistence_type,
		persistence_context, NULL);
}
//--------------------------------------------------------------------------------------

int MQTTAsync_createWithOptions(MQTTAsync* handle, const char* serverURI, const char* clientId,
		int persistence_type, void* persistence_context,  MQTTAsync_createOptions* options)
{
    int rc = 0;
	MQTTAsyncs *m = NULL;
	...
	//初始化全局对象
	if (!global_initialized)
	{
		#if defined(HEAP_H)
			Heap_initialize();
		#endif
		Log_initialize((Log_nameValue*)MQTTAsync_getVersionInfo());
		bstate->clients = ListInitialize(); // 初始化ClientStates列表
		Socket_outInitialize();
		Socket_setWriteCompleteCallback(MQTTAsync_writeComplete); //Socket写入完成回调
		handles = ListInitialize(); // 创建handles队列
		commands = ListInitialize(); //command列表,代表mqtt信令请求的队列
	...
	m = malloc(sizeof(MQTTAsyncs));
	*handle = m;//为handle申请内存,解释类型为MQTTAsyncs
    memset(m, '\0', sizeof(MQTTAsyncs));

    //为m设置内容
    m->serverURI = MQTTStrdup(serverURI);
	m->responses = ListInitialize();
	//添加该handle到handles链表队列中
	ListAppend(handles, m, sizeof(MQTTAsyncs));
	m->c = malloc(sizeof(Clients));
	memset(m->c, '\0', sizeof(Clients));
	m->c->context = m;
	m->c->outboundMsgs = ListInitialize();
	m->c->inboundMsgs = ListInitialize();
	m->c->messageQueue = ListInitialize();
	m->c->clientID = MQTTStrdup(clientId);
	m->c->MQTTVersion = MQTTVERSION_DEFAULT;

	m->shouldBeConnected = 0;
	if (options)
	{
		m->createOptions = malloc(sizeof(MQTTAsync_createOptions));
		memcpy(m->createOptions, options, sizeof(MQTTAsync_createOptions));
		m->c->MQTTVersion = options->MQTTVersion;
	}
	...
	//将新建的client,添加到ClientStates列表中
	ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List));
   //m代替handle,设置完成了所有请求的参数,MqttAsync对象创建完毕(MqttAsyncs)
}

Subscribe的过程: 创建连接的参数 -> 然后连接 -> 连接成功 -> 订阅 -> 订阅成功 -> 标记订阅状态(成功);

2. 发布

//MQTTAsync_publish.c

//main()
//流程和Subscribe一样,在main中主要是connect操作,请求mqtt服务器连接
int main(int argc, char* argv[])
{
	MQTTAsync client;
	MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
	int rc;

	MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
	MQTTAsync_setCallbacks(client, NULL, connlost, NULL, NULL);

	conn_opts.keepAliveInterval = 20;
	conn_opts.cleansession = 1;
	conn_opts.onSuccess = onConnect;
	conn_opts.onFailure = onConnectFailure;
	conn_opts.context = client;
	if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
	{
		printf("Failed to start connect, return code %d\n", rc);
		exit(EXIT_FAILURE);
	}

	printf("Waiting for publication of %s\n"
         "on topic %s for client with ClientID: %s\n",
         PAYLOAD, TOPIC, CLIENTID);
	while (!finished)
		#if defined(WIN32)
			Sleep(100);
		#else
			usleep(10000L);
		#endif

	MQTTAsync_destroy(&client);
 	return rc;
}
//--------------------------------------------------------------------------------------

//连接成功
void onConnect(void* context, MQTTAsync_successData* response)
{
	MQTTAsync client = (MQTTAsync)context;
	MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
    
    //创建要发送的Message MQTTAsync_message
	MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
	int rc;

	printf("Successful connection\n");
	
	//发送成功的回调
	opts.onSuccess = onSend;
	opts.context = client;

    //设置发送内容(payload)
	pubmsg.payload = PAYLOAD;
	pubmsg.payloadlen = (int)strlen(PAYLOAD);
	pubmsg.qos = QOS;//设置质量等级保证
	pubmsg.retained = 0;
	deliveredtoken = 0;

    //发送
	if ((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
	{
		printf("Failed to start sendMessage, return code %d\n", rc);
		exit(EXIT_FAILURE);
	}
}
//--------------------------------------------------------------------------------------

//发送成功(该用例中发送一次成功后直接断开了连接)
void onSend(void* context, MQTTAsync_successData* response)
{
	MQTTAsync client = (MQTTAsync)context;
	//创建断开连接选项
	MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
	int rc;

	printf("Message with token value %d delivery confirmed\n", response->token);

	opts.onSuccess = onDisconnect; //断开连接成功
	opts.context = client;

    //断开连接
	if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
	{
		printf("Failed to start sendMessage, return code %d\n", rc);
		exit(EXIT_FAILURE);
	}
}

3. Mqttc信令发送流程(以Connect为例)

1. 信令入队

//MQTTAsync.c
int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
{
	MQTTAsyncs* m = handle; //将void* 转换为MQTTAsyncs* 准备填充数据
	int rc = MQTTASYNC_SUCCESS;
	MQTTAsync_queuedCommand* conn; //执行单位, 这里代表连接
	...
	//将options的内容填充给m->connect
	m->connect.onSuccess = options->onSuccess;
	m->connect.onFailure = options->onFailure;
	...
	if (sendThread_state != STARTING && sendThread_state != RUNNING)
	{
		MQTTAsync_lock_mutex(mqttasync_mutex);
		sendThread_state = STARTING;
		//启动信令发送线程
		Thread_start(MQTTAsync_sendThread, NULL);
		MQTTAsync_unlock_mutex(mqttasync_mutex);
	}
	if (receiveThread_state != STARTING && receiveThread_state != RUNNING)
	{
		MQTTAsync_lock_mutex(mqttasync_mutex);
		receiveThread_state = STARTING;
		//启动信令接收线程
		Thread_start(MQTTAsync_receiveThread, handle);
		MQTTAsync_unlock_mutex(mqttasync_mutex);
	}
	...
	//将options的内容填充给m->c(Clients)
	m->c->keepAliveInterval = options->keepAliveInterval;
	...
	//填充m的其他参数
	...
	//为MQTTAsync_queuedCommand申请内存,初始化并将m赋值给conn->client;
	conn = malloc(sizeof(MQTTAsync_queuedCommand));
	memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
	conn->client = m;
	//将options的内容(回调等)赋值给conn->command;
    if (options)
	{
		conn->command.onSuccess = options->onSuccess;
		conn->command.onFailure = options->onFailure;
		conn->command.onSuccess5 = options->onSuccess5;
		conn->command.onFailure5 = options->onFailure5;
		conn->command.context = options->context;
	}
	conn->command.type = CONNECT;
	conn->command.details.conn.currentURI = 0;
    
    //将conn入队到commands列表(MQTTAsync_createWithOptions中初始化的全局列表)
	rc = MQTTAsync_addCommand(conn, sizeof(conn));
	...
}
//--------------------------------------------------------------------------------------

static int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_size)
{
    int rc = 0;
    ...
    if ((command->command.type != CONNECT) || (command->client->c->connect_state == NOT_IN_PROGRESS))
		command->command.start_time = MQTTAsync_start_clock();
	if (command->command.type == CONNECT ||
		(command->command.type == DISCONNECT && command->command.details.dis.internal))
	{//CONNECT  / DISCONNECT 的情况下
		MQTTAsync_queuedCommand* head = NULL;
		if (commands->first)
			head = (MQTTAsync_queuedCommand*)(commands->first->content);
		if (head != NULL && head->client == command->client && head->command.type == command->command.type)
			MQTTAsync_freeCommand(command); /* ignore duplicate connect or disconnect command */
		else
		    //对于Connect或者Disconnect优先处理, 将command插入到指定的index上,这里是插入到队头;
			ListInsert(commands, command, command_size, commands->first); /* add to the head of the list */
	}
	else
	{
	    //非Connect的情况下,将command续接到队尾;
		ListAppend(commands, command, command_size);
		...
	}
	...
	//通知发送线程接触等待状态,开始处理发送队列中的信令给Mqtt服务器
	// 这里对应唤醒的等待线程是MQTTAsync_sendThread
	rc = Thread_signal_cond(send_cond);
	...
}

2. 信令出队

在入队完毕之后,会唤醒MQTTAsync_sendThread并解除等待状态

//MQTTAsync.c
static thread_return_type WINAPI MQTTAsync_sendThread(void* n)
{
	...
	while (!tostop)
	{
		int rc;

		while (commands->count > 0)
		{
		    //循环处理commands队列中的信令,处理完成后,跳出循环等待,一直到队列有内容,会被唤醒并继续处理
			if (MQTTAsync_processCommand() == 0)
				break;  /* no commands were processed, so go into a wait */
		}
...
        //队列为空之后,等待...
		if ((rc = Thread_wait_cond(send_cond, 1)) != 0 && rc != ETIMEDOUT)
			Log(LOG_ERROR, -1, "Error %d waiting for condition variable", rc);
...
	}
	...
	return 0;
}

MQTTAsync_processCommand 函数中,就是从队列获取一个command信令发送给MQTT服务器,然后返回,并继续循环处理下一个,直到队列为空,返回0;

4. Mqtt C消息接受流程

接受流程在MQTTAsync_receiveThread 线程中, 不断的轮询, 以linux的selector 作为监听, 查询是否有已经准备好的消息的socket(有内容写入),然后解析, 分发接收到的信息.

 类似资料: