文章中的API使用流程都是以samples中的实例程序作为参考
本文基于开源的mqtt c实现:
- Github链接: eclipse/paho.mqtt.c
Mqtt C中分为同步访问
和异步访问
模式
- MQTTAsync 是一个void*类型的指针,表示一个 Mqtt client(context) 句柄, 即
MQTTAsyncs
/**
* A handle representing an MQTT client. A valid client handle is available
* following a successful call to MQTTAsync_create().
*/
typedef void* MQTTAsync;
用于封装请求的信息,包括URL,端口,回调函数等访问参数
typedef struct MQTTAsync_struct
{
char* serverURI; //服务器url
int ssl;
int websocket;
Clients* c;
/* "Global", to the client, callback definitions */
//连接失效的回调函数
//函数原型: typedef void MQTTAsync_connectionLost(void* context, char* cause);
MQTTAsync_connectionLost* cl;
//消息到来的回调函数
//原型:typedef int MQTTAsync_messageArrived(void* context, char* topicName, int topicLen,
// MQTTAsync_message* message);
MQTTAsync_messageArrived* ma;
//消息发送成功的回调通知函数
//原型:typedef void MQTTAsync_deliveryComplete(void* context, MQTTAsync_token token);
MQTTAsync_deliveryComplete* dc;
void* clContext; /* the context to be associated with the conn lost callback*/
void* maContext; /* the context to be associated with the msg arrived callback*/
void* dcContext; /* the context to be associated with the deliv complete callback*/
//连接成功的回调函数
//原型:typedef void MQTTAsync_connected(void* context, char* cause);
MQTTAsync_connected* connected;
void* connected_context; /* the context to be associated with the connected callback*/
//连接断开的回调函数
//原型:typedef void MQTTAsync_disconnected(void* context, MQTTProperties* properties,
//enum MQTTReasonCodes reasonCode);
MQTTAsync_disconnected* disconnected;
void* disconnected_context; /* the context to be associated with the disconnected callback*/
/* Each time connect is called, we store the options that were used. These are reused in
any call to reconnect, or an automatic reconnect attempt */
MQTTAsync_command connect; /* Connect operation properties */
MQTTAsync_command disconnect; /* Disconnect operation properties */
MQTTAsync_command* pending_write; /* Is there a socket write pending? */
List* responses;
unsigned int command_seqno;
MQTTPacket* pack;
/* added for offline buffering */
MQTTAsync_createOptions* createOptions;
int shouldBeConnected;
/* added for automatic reconnect */
int automaticReconnect;
int minRetryInterval;
int maxRetryInterval;
int serverURIcount;
char** serverURIs;
int connectTimeout;
int currentInterval;
START_TIME_TYPE lastConnectionFailedTime;
int retrying;
int reconnectNow;
/* MQTT V5 properties */
MQTTProperties* connectProps;
MQTTProperties* willProps;
} MQTTAsyncs;
Mqtt的Message结构体
typedef struct
{
/** The eyecatcher for this structure. must be MQTM. */
char struct_id[4];
/** The version number of this structure. Must be 0 or 1.
* 0 indicates no message properties */
int struct_version;
/** The length of the MQTT message payload in bytes. */
int payloadlen;
/** A pointer to the payload of the MQTT message. */
void* payload;
/**
* The quality of service (QoS) assigned to the message.
* There are three levels of QoS:
* <DL>
* <DT><B>QoS0</B></DT>
* <DD>Fire and forget - the message may not be delivered</DD>
* <DT><B>QoS1</B></DT>
* <DD>At least once - the message will be delivered, but may be
* delivered more than once in some circumstances.</DD>
* <DT><B>QoS2</B></DT>
* <DD>Once and one only - the message will be delivered exactly once.</DD>
* </DL>
*/
int qos;
/**
* The retained flag serves two purposes depending on whether the message
* it is associated with is being published or received.
*
* <b>retained = true</b><br>
* For messages being published, a true setting indicates that the MQTT
* server should retain a copy of the message. The message will then be
* transmitted to new subscribers to a topic that matches the message topic.
* For subscribers registering a new subscription, the flag being true
* indicates that the received message is not a new one, but one that has
* been retained by the MQTT server.
*
* <b>retained = false</b> <br>
* For publishers, this indicates that this message should not be retained
* by the MQTT server. For subscribers, a false setting indicates this is
* a normal message, received as a result of it being published to the
* server.
*/
int retained;
/**
* The dup flag indicates whether or not this message is a duplicate.
* It is only meaningful when receiving QoS1 messages. When true, the
* client application should take appropriate action to deal with the
* duplicate message.
*/
int dup;
/** The message identifier is normally reserved for internal use by the
* MQTT client and server.
*/
int msgid;
/**
* The MQTT V5 properties associated with the message.
*/
MQTTProperties properties;
} MQTTAsync_message;
- MQTTAsync_responseOptions表示每次操作的响应配置,包括
操作成功
,操作失败
等操作;- 同类的还有:
- MQTTAsync_disconnectOptions : 断开连接选项
- MQTTAsync_connectOptions: 连接选项
- MQTTAsync_init_options:初始化选项 等等
typedef struct MQTTAsync_responseOptions
{
/** The eyecatcher for this structure. Must be MQTR */
char struct_id[4];
/** The version number of this structure. Must be 0 or 1
* if 0, no MQTTV5 options */
int struct_version;
/**
* A pointer to a callback function to be called if the API call successfully
* completes. Can be set to NULL, in which case no indication of successful
* completion will be received.
*/
MQTTAsync_onSuccess* onSuccess;
/**
* A pointer to a callback function to be called if the API call fails.
* Can be set to NULL, in which case no indication of unsuccessful
* completion will be received.
*/
MQTTAsync_onFailure* onFailure;
/**
* A pointer to any application-specific context. The
* the <i>context</i> pointer is passed to success or failure callback functions to
* provide access to the context information in the callback.
*/
void* context;
/**
* A token is returned from the call. It can be used to track
* the state of this request, both in the callbacks and in future calls
* such as ::MQTTAsync_waitForCompletion.
*/
MQTTAsync_token token;
/**
* A pointer to a callback function to be called if the API call successfully
* completes. Can be set to NULL, in which case no indication of successful
* completion will be received.
*/
MQTTAsync_onSuccess5* onSuccess5;
/**
* A pointer to a callback function to be called if the API call successfully
* completes. Can be set to NULL, in which case no indication of successful
* completion will be received.
*/
MQTTAsync_onFailure5* onFailure5;
/**
* MQTT V5 input properties
*/
MQTTProperties properties;
/*
* MQTT V5 subscribe options, when used with subscribe only.
*/
MQTTSubscribe_options subscribeOptions;
/*
* MQTT V5 subscribe option count, when used with subscribeMany only.
* The number of entries in the subscribe_options_list array.
*/
int subscribeOptionsCount;
/*
* MQTT V5 subscribe option array, when used with subscribeMany only.
*/
MQTTSubscribe_options* subscribeOptionsList;
} MQTTAsync_responseOptions;
List是一个链表, Mqttc中的commands信令队列就是用该链表存储/获取的
- static List* commands = NULL; //Commands信令队列;
- static List* handles = NULL; //clients队列,记录所有的client(MQTTAsync)对象;
/**
* Structure to hold all data for one list
*/
typedef struct
{
ListElement *first, /**< first element in the list */
*last, /**< last element in the list */
*current; /**< current element in the list, for iteration */
int count; /**< no of items */
size_t size; /**< heap storage used */
} List;
- Mqtt c的同步没什么可说的,类似于调用一个RPC接口一样的,主要看异步请求的流程
- Mqtt C实现的src/samples文件夹下有异步的subscribe和publish的例子,从这里可以作为切入点来看MQTTC
的请求流程
//MQTTAsync_subscribe.c
//main()
int main(int argc, char* argv[])
{
//[*1] 创建一个MQTTAsync的handle
MQTTAsync client;
//设置连接选项参数,这里默认给了初始化的参数,mqtt-c自己提供的
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;//[*2]
//设置断开连接选项参数,这里默认给了初始化的参数,mqtt-c自己提供的
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;//[*3s]
int rc;
int ch;
//真正创建一个MQTTAsync 的结构体(这里其实是MQTTAsyncs结构体,下面解释)
MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
//设置回调函数
MQTTAsync_setCallbacks(
client/*MQTTAsync handle*/,
client/*context*/,
connlost/*MQTTAsync_connectionLost*/,
msgarrvd/*MQTTAsync_messageArrived*/,
NULL/*MQTTAsync_deliveryComplete*/);
conn_opts.keepAliveInterval = 20; //心跳间隔
conn_opts.cleansession = 1; //在connect/disconnect的时候,是否清除上一个连接的seesion状态信息缓存
//(重启一个新的)。这个session状态用来确认消息质量保证(至少一次或者只有一次)
conn_opts.onSuccess = onConnect; //连接成功的回调函数
conn_opts.onFailure = onConnectFailure; //连接失败的回调函数
conn_opts.context = client; //上下文对象,即MQTTAsync
//开始请求连接
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
while (!subscribed)
#if defined(WIN32)
Sleep(100);
#else
//等待10ms, 一定时间内确保subscribe动作完成
usleep(10000L);
#endif
if (finished)
goto exit;
do
{
ch = getchar();
} while (ch!='Q' && ch != 'q');
disc_opts.onSuccess = onDisconnect;
if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start disconnect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
while (!disc_finished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
exit:
MQTTAsync_destroy(&client);
return rc;
}
//--------------------------------------------------------------------------------------
//连接成功回调
void onConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;//获取默认的Option
int rc;
printf("Successful connection\n");
printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
"Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
opts.onSuccess = onSubscribe; //订阅成功时回调
//原型:typedef void MQTTAsync_onSuccess(void* context,
//MQTTAsync_successData* response);
opts.onFailure = onSubscribeFailure; //订阅失败时回调
//原型:typedef void MQTTAsync_onFailure(void* context,
// MQTTAsync_failureData* response);
opts.context = client;
deliveredtoken = 0;
//开始订阅
if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start subscribe, return code %d\n", rc);
exit(EXIT_FAILURE);
}
}
//--------------------------------------------------------------------------------------
//订阅成功
void onSubscribe(void* context, MQTTAsync_successData* response)
{
printf("Subscribe succeeded\n");
subscribed = 1; //标记订阅成功
}
int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId,
int persistence_type, void* persistence_context)
{
return MQTTAsync_createWithOptions(handle, serverURI, clientId, persistence_type,
persistence_context, NULL);
}
//--------------------------------------------------------------------------------------
int MQTTAsync_createWithOptions(MQTTAsync* handle, const char* serverURI, const char* clientId,
int persistence_type, void* persistence_context, MQTTAsync_createOptions* options)
{
int rc = 0;
MQTTAsyncs *m = NULL;
...
//初始化全局对象
if (!global_initialized)
{
#if defined(HEAP_H)
Heap_initialize();
#endif
Log_initialize((Log_nameValue*)MQTTAsync_getVersionInfo());
bstate->clients = ListInitialize(); // 初始化ClientStates列表
Socket_outInitialize();
Socket_setWriteCompleteCallback(MQTTAsync_writeComplete); //Socket写入完成回调
handles = ListInitialize(); // 创建handles队列
commands = ListInitialize(); //command列表,代表mqtt信令请求的队列
...
m = malloc(sizeof(MQTTAsyncs));
*handle = m;//为handle申请内存,解释类型为MQTTAsyncs
memset(m, '\0', sizeof(MQTTAsyncs));
//为m设置内容
m->serverURI = MQTTStrdup(serverURI);
m->responses = ListInitialize();
//添加该handle到handles链表队列中
ListAppend(handles, m, sizeof(MQTTAsyncs));
m->c = malloc(sizeof(Clients));
memset(m->c, '\0', sizeof(Clients));
m->c->context = m;
m->c->outboundMsgs = ListInitialize();
m->c->inboundMsgs = ListInitialize();
m->c->messageQueue = ListInitialize();
m->c->clientID = MQTTStrdup(clientId);
m->c->MQTTVersion = MQTTVERSION_DEFAULT;
m->shouldBeConnected = 0;
if (options)
{
m->createOptions = malloc(sizeof(MQTTAsync_createOptions));
memcpy(m->createOptions, options, sizeof(MQTTAsync_createOptions));
m->c->MQTTVersion = options->MQTTVersion;
}
...
//将新建的client,添加到ClientStates列表中
ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List));
//m代替handle,设置完成了所有请求的参数,MqttAsync对象创建完毕(MqttAsyncs)
}
Subscribe的过程: 创建连接的参数 -> 然后连接 -> 连接成功 -> 订阅 -> 订阅成功 -> 标记订阅状态(成功);
//MQTTAsync_publish.c
//main()
//流程和Subscribe一样,在main中主要是connect操作,请求mqtt服务器连接
int main(int argc, char* argv[])
{
MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTAsync_setCallbacks(client, NULL, connlost, NULL, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
printf("Waiting for publication of %s\n"
"on topic %s for client with ClientID: %s\n",
PAYLOAD, TOPIC, CLIENTID);
while (!finished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
MQTTAsync_destroy(&client);
return rc;
}
//--------------------------------------------------------------------------------------
//连接成功
void onConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
//创建要发送的Message MQTTAsync_message
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc;
printf("Successful connection\n");
//发送成功的回调
opts.onSuccess = onSend;
opts.context = client;
//设置发送内容(payload)
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = (int)strlen(PAYLOAD);
pubmsg.qos = QOS;//设置质量等级保证
pubmsg.retained = 0;
deliveredtoken = 0;
//发送
if ((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}
}
//--------------------------------------------------------------------------------------
//发送成功(该用例中发送一次成功后直接断开了连接)
void onSend(void* context, MQTTAsync_successData* response)
{
MQTTAsync client = (MQTTAsync)context;
//创建断开连接选项
MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
int rc;
printf("Message with token value %d delivery confirmed\n", response->token);
opts.onSuccess = onDisconnect; //断开连接成功
opts.context = client;
//断开连接
if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}
}
//MQTTAsync.c
int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
{
MQTTAsyncs* m = handle; //将void* 转换为MQTTAsyncs* 准备填充数据
int rc = MQTTASYNC_SUCCESS;
MQTTAsync_queuedCommand* conn; //执行单位, 这里代表连接
...
//将options的内容填充给m->connect
m->connect.onSuccess = options->onSuccess;
m->connect.onFailure = options->onFailure;
...
if (sendThread_state != STARTING && sendThread_state != RUNNING)
{
MQTTAsync_lock_mutex(mqttasync_mutex);
sendThread_state = STARTING;
//启动信令发送线程
Thread_start(MQTTAsync_sendThread, NULL);
MQTTAsync_unlock_mutex(mqttasync_mutex);
}
if (receiveThread_state != STARTING && receiveThread_state != RUNNING)
{
MQTTAsync_lock_mutex(mqttasync_mutex);
receiveThread_state = STARTING;
//启动信令接收线程
Thread_start(MQTTAsync_receiveThread, handle);
MQTTAsync_unlock_mutex(mqttasync_mutex);
}
...
//将options的内容填充给m->c(Clients)
m->c->keepAliveInterval = options->keepAliveInterval;
...
//填充m的其他参数
...
//为MQTTAsync_queuedCommand申请内存,初始化并将m赋值给conn->client;
conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
//将options的内容(回调等)赋值给conn->command;
if (options)
{
conn->command.onSuccess = options->onSuccess;
conn->command.onFailure = options->onFailure;
conn->command.onSuccess5 = options->onSuccess5;
conn->command.onFailure5 = options->onFailure5;
conn->command.context = options->context;
}
conn->command.type = CONNECT;
conn->command.details.conn.currentURI = 0;
//将conn入队到commands列表(MQTTAsync_createWithOptions中初始化的全局列表)
rc = MQTTAsync_addCommand(conn, sizeof(conn));
...
}
//--------------------------------------------------------------------------------------
static int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_size)
{
int rc = 0;
...
if ((command->command.type != CONNECT) || (command->client->c->connect_state == NOT_IN_PROGRESS))
command->command.start_time = MQTTAsync_start_clock();
if (command->command.type == CONNECT ||
(command->command.type == DISCONNECT && command->command.details.dis.internal))
{//CONNECT / DISCONNECT 的情况下
MQTTAsync_queuedCommand* head = NULL;
if (commands->first)
head = (MQTTAsync_queuedCommand*)(commands->first->content);
if (head != NULL && head->client == command->client && head->command.type == command->command.type)
MQTTAsync_freeCommand(command); /* ignore duplicate connect or disconnect command */
else
//对于Connect或者Disconnect优先处理, 将command插入到指定的index上,这里是插入到队头;
ListInsert(commands, command, command_size, commands->first); /* add to the head of the list */
}
else
{
//非Connect的情况下,将command续接到队尾;
ListAppend(commands, command, command_size);
...
}
...
//通知发送线程接触等待状态,开始处理发送队列中的信令给Mqtt服务器
// 这里对应唤醒的等待线程是MQTTAsync_sendThread
rc = Thread_signal_cond(send_cond);
...
}
在入队完毕之后,会唤醒MQTTAsync_sendThread
并解除等待状态
//MQTTAsync.c
static thread_return_type WINAPI MQTTAsync_sendThread(void* n)
{
...
while (!tostop)
{
int rc;
while (commands->count > 0)
{
//循环处理commands队列中的信令,处理完成后,跳出循环等待,一直到队列有内容,会被唤醒并继续处理
if (MQTTAsync_processCommand() == 0)
break; /* no commands were processed, so go into a wait */
}
...
//队列为空之后,等待...
if ((rc = Thread_wait_cond(send_cond, 1)) != 0 && rc != ETIMEDOUT)
Log(LOG_ERROR, -1, "Error %d waiting for condition variable", rc);
...
}
...
return 0;
}
MQTTAsync_processCommand 函数中,就是从队列获取一个command信令发送给MQTT服务器,然后返回,并继续循环处理下一个,直到队列为空,返回0;
接受流程在MQTTAsync_receiveThread 线程中, 不断的轮询, 以linux的selector 作为监听, 查询是否有已经准备好的消息的socket(有内容写入),然后解析, 分发接收到的信息.