Mosquitto 客户端的 socket 网络程序与 MQTT 协议紧密集成,实现了 MQTT 协议的各种控制报文的发送和接收,以及 QoS 等级、会话状态等功能。
Mosquitto 客户端在进行网络程序设计时有许多值得借鉴的技巧,比如发送队列、socketpair 等。以下是一些值得借鉴的关键技巧:
struct mosquitto__packet
结构体链表struct mosquitto__packet
的主要字段包括:
next:指向链表中下一个 mosquitto__packet 的指针。
command:表示 MQTT 控制报文的类型,如 CONNECT、PUBLISH、SUBSCRIBE 等。
remaining_count 和 remaining_mult:表示剩余长度字段的解码相关信息。
remaining_length:表示 MQTT 控制报文的剩余长度。
packet_length:表示 MQTT 控制报文的总长度。
payload:表示 MQTT 控制报文的有效载荷(这里是需要发送的数据报文)。
to_process 和 pos:表示已处理和待处理的字节数。
发送队列是由 struct mosquitto__packet 结构组成的链表。每个 mosquitto__packet 结构代表一个待发送的 MQTT 控制报文。
mosquitto
在完成组包后,通常会先将数据包添加到发送队列(使用struct mosquitto__packet
结构体链表)。然后,在适当的时机(如主线程从select或pselect的阻塞中被唤醒时),主线程会从发送队列中取出数据包,并将其发送到服务器。这种机制可以有效地管理和优化数据包的发送,以提高性能和效率。
发送队列由 packet_queue()
函数管理,该函数将待发送的 MQTT 控制报文插入到链表中。
在mosquitto
客户端源码中,出队操作是由mosquitto_loop()
函数执行的。在mosquitto_loop()
函数中,会调用mosquitto__packet_write()
函数来处理发送队列中的数据包。这个函数会从发送队列中取出数据包并尝试将它们发送到服务器。因此,可以说mosquitto__packet_write()是执行出队操作的函数。
mosq->sockpairR
和 mosq->sockpairW
在Mosquitto源码中,mosq->sockpairR
和mosq->sockpairW
是两个特殊的文件描述符,它们用于在主线程和子线程之间进行通信。它们通过socketpair()
函数创建,形成一个全双工的socket对。子线程可以通过写入mosq->sockpairW
向主线程发送信号,而主线程通过监听mosq->sockpairR
来检测是否有新的数据需要处理。
这种设计允许子线程在有新数据需要发送时唤醒主线程,从而确保及时的通信。当子线程需要发送数据时,它会向mosq->sockpairW
写入一个字节的数据。而主线程在select()
或poll()
系统调用中监听mosq->sockpairR
,当它检测到可读事件时,意味着子线程有新的数据需要处理。这时,主线程会处理发送队列中的数据包并将其发送给服务器。
通过这种机制,Mosquitto避免了潜在的阻塞和资源竞争问题,提高了通信的实时性。
在mosquitto_reinitialise()
函数中,会调用net__socketpair()函数来创建一对全双工的socket
,分别对应文件描述符mosq->sockpairR 和 mosq->sockpairW。mosquitto_reinitialise()是用于初始化或重新初始化mosquitto客户端实例的函数。
int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_start, void *userdata)
{
...
/* This must be after pthread_mutex_init(), otherwise the log mutex may be
* used before being initialised. */
if(net__socketpair(&mosq->sockpairR, &mosq->sockpairW)){
log__printf(mosq, MOSQ_LOG_WARNING,
"Warning: Unable to open socket pair, outgoing publish commands may be delayed.");
}
return MOSQ_ERR_SUCCESS;
}
int net__socketpair(mosq_sock_t *pairR, mosq_sock_t *pairW)
{
...
}
当消息包加入输出队列(使用struct mosquitto__packet结构体链表),packet__queue()函数会向mosq->sockpairW写入一个字节的数据。这将可能处于阻塞状态的主线程唤醒,以便主线程处理输出队列中的消息包。
int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet)
{
...
/* Write a single byte to sockpairW (connected to sockpairR) to break out
* of select() if in threaded mode. */
if(mosq->sockpairW != INVALID_SOCKET){
#ifndef WIN32
if(write(mosq->sockpairW, &sockpair_data, 1)){
}
#else
send(mosq->sockpairW, &sockpair_data, 1, 0);
#endif
}
...
}
客户端主线程使用mosquitto_loop函数在mosq->sockpairR上监听,当mosq->sockpairR接收到数据(子线程向mosq->sockpairW写入数据),这意味着有新的消息包需要发送,主线程会从select或pselect的阻塞中被唤醒。随后,主线程会处理输出队列中的消息包并将其发送到服务器。
int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
{
...
if(mosq->sockpairR != INVALID_SOCKET){
/* sockpairR is used to break out of select() before the timeout, on a
* call to publish() etc. */
FD_SET(mosq->sockpairR, &readfds);
if((int)mosq->sockpairR > maxfd){
maxfd = mosq->sockpairR;
}
}
...
#ifdef HAVE_PSELECT
fdcount = pselect(maxfd+1, &readfds, &writefds, NULL, &local_timeout, NULL);
#else
fdcount = select(maxfd+1, &readfds, &writefds, NULL, &local_timeout);
#endif
...
if(mosq->sockpairR != INVALID_SOCKET && FD_ISSET(mosq->sockpairR, &readfds)){
...
recv(mosq->sockpairR, &pairbuf, 1, 0);
...
}
}
mosquitto_loop() 函数是 Mosquitto 客户端 socket 通信代码部分的核心函数。它负责处理客户端与 MQTT 服务器之间的网络通信,包括发送和接收 MQTT 控制报文。此函数在客户端的主循环中被调用,确保了 Mosquitto 客户端与服务器之间的持续交互。
函数参数说明:
struct mosquitto *mosq
: mosquitto客户端实例指针。int timeout
: 超时时间(ms),如果没有读取到任何消息,则等待超时后返回。int max_packets
: 可以一次读取并处理的最大MQTT消息包数。我们采取逐段加注释的方式,分析mosquitto_loop()函数
int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
{
#ifdef HAVE_PSELECT
struct timespec local_timeout;
#else
struct timeval local_timeout;
#endif
fd_set readfds, writefds;
fd_set readfds, writefds;定义了两个文件描述符集合,用于监听文件描述符上的事件;
在mosquitto_loop函数中,readfds用于监听可读事件,而writefds用于监听可写事件。后面会将mosq->sock和mosq->sockpairR加入到readfds集合,而mosq->sock加入到writefds。
int fdcount;
int rc;
char pairbuf;
int maxfd = 0;
time_t now;
time_t timeout_ms;
if(!mosq || max_packets < 1) return MOSQ_ERR_INVAL;
#ifndef WIN32
if(mosq->sock >= FD_SETSIZE || mosq->sockpairR >= FD_SETSIZE){
return MOSQ_ERR_INVAL;
}
#endif
上述代码定义了需要的临时变量,然后判断输入参数有效性。
FD_ZERO(&readfds);
FD_ZERO(&writefds);
if(mosq->sock != INVALID_SOCKET){
maxfd = mosq->sock;
FD_SET(mosq->sock, &readfds);
pthread_mutex_lock(&mosq->current_out_packet_mutex);
pthread_mutex_lock(&mosq->out_packet_mutex);
if(mosq->out_packet || mosq->current_out_packet){
FD_SET(mosq->sock, &writefds);
}
上述代码功能如下:
FD_ZERO(&readfds);
和 FD_ZERO(&writefds);
初始化了用于读操作和写操作的文件描述符集合。mosq->sock
有效,则加入读文件描述符集合readfds中, 以便后续使用select或pselect。mosq->sock
加入写文件描述符集合 writefds。#ifdef WITH_TLS
if(mosq->ssl){
if(mosq->want_write){
FD_SET(mosq->sock, &writefds);
}
}
#endif
pthread_mutex_unlock(&mosq->out_packet_mutex);
pthread_mutex_unlock(&mosq->current_out_packet_mutex);
}
如果mosquitto 使用了TLS加密通信,则系统有可能需要发送一些额外的与加密和解密相关的通信包,这些通信包并不在发送消息缓冲队列里,当需要发送这些特殊通信包时,系统会将mosq->want_write赋值true。
当发生这种需求时,将mosq->sock
加入写文件描述符集合 writefds。
另外需要说明的是,为了防止不同的线程同时访问输出缓冲器,需要加互斥锁:
-pthread_mutex_lock(&mosq->current_out_packet_mutex);
加锁对current_out_packet进行互斥访问。
-pthread_mutex_lock(&mosq->out_packet_mutex);
加锁对out_packet的互斥访问。
-pthread_mutex_unlock(&mosq->out_packet_mutex);
解锁对out_packet的互斥访问。
-pthread_mutex_unlock(&mosq->current_out_packet_mutex);
解锁对current_out_packet的互斥访问。
else{
#ifdef WITH_SRV
if(mosq->achan){
if(mosquitto__get_state(mosq) == mosq_cs_connect_srv){
rc = ares_fds(mosq->achan, &readfds, &writefds);
if(rc > maxfd){
maxfd = rc;
}
}else{
return MOSQ_ERR_NO_CONN;
}
}
#else
return MOSQ_ERR_NO_CONN;
#endif
}
由于我们不启用 WITH_SRV 选项,所以上面的代码不予讨论。
if(mosq->sockpairR != INVALID_SOCKET){
/* sockpairR is used to break out of select() before the timeout, on a
* call to publish() etc. */
FD_SET(mosq->sockpairR, &readfds);
if((int)mosq->sockpairR > maxfd){
maxfd = mosq->sockpairR;
}
}
如果mosq->sockpairR
有效,则加入到读文件描述符集合readfds中,
timeout_ms = timeout;
if(timeout_ms < 0){
timeout_ms = 1000;
}
now = mosquitto_time();
pthread_mutex_lock(&mosq->msgtime_mutex);
if(mosq->next_msg_out && now + timeout_ms/1000 > mosq->next_msg_out){
timeout_ms = (mosq->next_msg_out - now)*1000;
}
pthread_mutex_unlock(&mosq->msgtime_mutex);
if(timeout_ms < 0){
/* There has been a delay somewhere which means we should have already
* sent a message. */
timeout_ms = 0;
}
local_timeout.tv_sec = timeout_ms/1000;
#ifdef HAVE_PSELECT
local_timeout.tv_nsec = (timeout_ms-local_timeout.tv_sec*1000)*1000000;
#else
local_timeout.tv_usec = (timeout_ms-local_timeout.tv_sec*1000)*1000;
#endif
上面的代码是对超时时间的设定,不加赘述。
#ifdef HAVE_PSELECT
fdcount = pselect(maxfd+1, &readfds, &writefds, NULL, &local_timeout, NULL);
#else
fdcount = select(maxfd+1, &readfds, &writefds, NULL, &local_timeout);
#endif
上面的代码使用pselect函数或select函数监控文件描述符集合。
根据前面对代码的分析,我们知道一共监视了2个文件描述符mosq->sock、 mosq->sockpairR,此时监视了两个事件:
注意:mosq->sock是按需加入&writefds集合的。如果没有发送包入队,mosq->sock不会加入&writefds集合,也就不会有mosq->sock可写事件。
if(fdcount == -1){
#ifdef WIN32
errno = WSAGetLastError();
#endif
if(errno == EINTR){
return MOSQ_ERR_SUCCESS;
}else{
return MOSQ_ERR_ERRNO;
}
}else{
if(mosq->sock != INVALID_SOCKET){
if(FD_ISSET(mosq->sock, &readfds)){
rc = mosquitto_loop_read(mosq, max_packets);
if(rc || mosq->sock == INVALID_SOCKET){
return rc;
}
}
发生了mosq->sockpairR可读事件可读事件,调用mosquitto_loop_read()函数接收数据。
if(mosq->sockpairR != INVALID_SOCKET && FD_ISSET(mosq->sockpairR, &readfds)){
#ifndef WIN32
if(read(mosq->sockpairR, &pairbuf, 1) == 0){
}
#else
recv(mosq->sockpairR, &pairbuf, 1, 0);
#endif
/* Fake write possible, to stimulate output write even though
* we didn't ask for it, because at that point the publish or
* other command wasn't present. */
if(mosq->sock != INVALID_SOCKET)
FD_SET(mosq->sock, &writefds);
}
发生了mosq->sockpairR可读事件,将mosq->sock加入&writefds集合。
if(mosq->sock != INVALID_SOCKET && FD_ISSET(mosq->sock, &writefds)){
rc = mosquitto_loop_write(mosq, max_packets);
if(rc || mosq->sock == INVALID_SOCKET){
return rc;
}
}
}
发生了mosq->sock可写事件,表示TCP套接字的发送缓冲区有足够的空间可以发送数据。调用mosquitto_loop_write()函数,向服务器发送数据。
#ifdef WITH_SRV
if(mosq->achan){
ares_process(mosq->achan, &readfds, &writefds);
}
#endif
}
return mosquitto_loop_misc(mosq);
}