struct mosq_config
和 struct mosquitto
并列为mosquitto客户端源码中的核心数据结构,分别用于存储mosquitto客户端的配置信息和运行时状态和信息。在mosquitto客户端的运行过程中,struct mosq_config
存储mosquitto客户端的配置信息,其中的选项和参数决定了mosquitto客户端的行为和功能,而 struct mosquitto
则记录了客户端的运行时状态和相关信息,包括客户端的连接状态、订阅的主题、接收和发送的消息等等,以便客户端能够正确地处理和响应来自服务器的消息和事件。
与前文配置信息
struct mosq_config
结构体对象cfg定义为全局变量不同,mosquitto2.0.15采用在入口函数main的开始位置定义了一个指向struct mosquitto
的指针*mosq。本文将详细介绍这个struct mosquitto
定义的对象从动态分配内存到客户端向服务器发送第一包MQTT-CONNETC包结束的整个初始化过程。
我们可以简单地理解为,struct mosq_config
的cfg
存储mosquitto
客户端的配置信息,属于静态数据;而struct mosquitto
的mosq
指针对象指向mosquitto
客户端的运行时数据,属于动态数据。
我们在《mosquitto客户端配置参数管理(mosquitto2.0.15客户端源码分析之一)》一文中已经详细介绍了其中struct mosq_config
定义的对象cfg
的设置过程。
本文将详细介绍了mosquitto客户端对象“struct mosquitto *mosq”的初始化过程,分为上下两篇,上篇介绍对象的创建、初始化,下篇为网络参数设置和注册回调函数,为正在阅读mosquitto源码的读者提供一点参考。
struct mosquitto *mosquitto_new
(const char *id, bool clean_start, void *userdata)是 Mosquitto 客户端源码中的一个函数,用于创建一个新的 struct mosquitto
结构体并初始化。
int main(int argc, char *argv[])
{
struct mosquitto *mosq = NULL;
...
mosq = mosquitto_new(cfg.id, cfg.clean_session, NULL);
...
}
函数的参数包括:
id
:客户端的 ID,类型为 const char *
,调用时使用配置参数cfg.id。clean_start
:类型为 bool
,调用时使用配置参数cfg.clean_session,这个参数此时为true,即客户端是以清空会话的方式连接到服务器(当设置为 false 时,客户端会尝试恢复与服务器的上一个连接状态,这是后话,不在本文讨论范围)。
bool clean_start
用于确定客户端是否以“清理会话”的模式启动。在MQTT中,客户端和服务器之间建立的连接被称为会话。会话可以保留某些信息,例如订阅的主题和发布的消息。清理会话是指当客户端与服务器重新连接时,服务器是否保留先前的会话信息。如果使用清理会话,则每次重新连接时,客户端必须重新订阅主题并重新发布任何未发布的消息。
如果clean_start
为true
,则客户端会使用清理会话模式启动,并且任何现有的会话信息都将被删除。如果为false
,则客户端会使用非清理会话模式启动,保留现有的会话信息。默认值为true
,这意味着每次客户端启动时都会删除先前的会话信息。
总之,clean_start参数的值控制着客户端启动时是否保留上一次连接断开前的状态,影响着会话信息的保留与删除。
userdata
:客户端的用户数据,类型为 void *
,初始化调用时为NULL。函数的返回值为指向 struct mosquitto
结构体的指针,赋值给mosq,如果创建失败则返回 NULL。
函数的功能包括:
创建一个新的 struct mosquitto
结构体,并将其初始化为默认值。
根据参数设置客户端的 ID、清空会话标志和用户数据。客户端的 ID 用于标识客户端,需要在 MQTT 代理中保证唯一。清空会话标志用于指示客户端是否以清空会话的方式连接到 MQTT 代理。用户数据是客户端的自定义数据,可以是任意类型的指针。
初始化 mosq->sock
、mosq->last_msg_in
、mosq->last_msg_out
等字段。这些字段包含了客户端的网络连接、消息缓存和心跳检测等信息。
原型定义如下:
int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_start, void *userdata);
函数参数:
该函数被mosquitto_new()在内部调用,在对象创建后,对里面的成员变量进行初始化。
...
struct mosquitto *mosquitto_new(const char *id, bool clean_start, void *userdata)
{
struct mosquitto *mosq = NULL;
int rc;
...
mosq = (struct mosquitto *)mosquitto__calloc(1, sizeof(struct mosquitto));
if(mosq){
...
rc = mosquitto_reinitialise(mosq, id, clean_start, userdata);
...
}else{
errno = ENOMEM;
}
return mosq;
}
...
函数返回成功时,主要对下面的成员进行了初始化(这里只列举了一部分成员变量,实际完成的初始化内容,请查阅源代码):
cfg
数据到*mosq
– client_opts_set()函数client_opts_set函数的功能是将mosq_config
结构体中的各个选项和配置参数应用到mosquitto
对象中,以便进行后续的MQTT通信。
在函数中,mosq
参数是将要被设置选项的Mosquitto客户端对象指针,cfg
是Mosquitto配置对象结构体。通过对该结构体进行逐个成员变量的复制,函数将配置参数中的数据应用到指定的Mosquitto客户端对象上,从而初始化Mosquitto客户端对象的各种属性。当客户端启动后,这些属性将用于客户端的连接、认证和消息传送等操作的执行。
下面我们会对函数中的主要片段详细分析。
int client_opts_set(struct mosquitto *mosq, struct mosq_config *cfg)
{
#if defined(WITH_TLS) || defined(WITH_SOCKS)
int rc;
#endif
mosquitto_int_option(mosq, MOSQ_OPT_PROTOCOL_VERSION, cfg->protocol_version);
上面这句代码调用mosquitto_int_option函数,按照配置参数中cfg->protocol_version给变量mosq->protocol 赋值(cfg->protocol_version是MQTT版本号)。
if(cfg->will_topic && mosquitto_will_set_v5(mosq, cfg->will_topic,
cfg->will_payloadlen, cfg->will_payload, cfg->will_qos,
cfg->will_retain, cfg->will_props)){
err_printf(cfg, "Error: Problem setting will.\n");
mosquitto_lib_cleanup();
return 1;
}
按照mqtt协议,如果客户端提供了遗嘱消息相关的配置信息(例如遗嘱主题、遗嘱消息内容等),那么就会在 MQTT 连接建立之后,向服务端注册一个遗嘱消息。当客户端异常断开连接时,服务端就会收到这个遗嘱消息。
上面这段代码调用函数mosquitto_will_set_v5
实现了 MQTT 协议中的遗嘱消息初始化功能,即将配置参数cfg中的遗嘱部分(如遗嘱主题,遗嘱消息的内容,遗嘱消息的 QoS 级别等)赋值给mosq所指的对象的对应数据项。
cfg->will_props = NULL;
if((cfg->username || cfg->password) && mosquitto_username_pw_set(mosq, cfg->username, cfg->password)){
err_printf(cfg, "Error: Problem setting username and/or password.\n");
mosquitto_lib_cleanup();
return 1;
}
本段代码调用mosquitto_username_pw_set
函数,设置连接到服务器时使用的用户名和密码。
需要注意的是,mosquitto_username_pw_se
t函数的做法是,在设置新的用户名和密码前,首先释放已经设置过用户名和密码相关内存空间,然后再动态分配内存,设置新的参数(源码对于所有字符串类型的变量处理都是类似的,我们不再加以说明
)。
#ifdef WITH_TLS
if(cfg->cafile || cfg->capath){
rc = mosquitto_tls_set(mosq, cfg->cafile, cfg->capath, cfg->certfile, cfg->keyfile, NULL);
if(rc){
if(rc == MOSQ_ERR_INVAL){
err_printf(cfg, "Error: Problem setting TLS options: File not found.\n");
}else{
err_printf(cfg, "Error: Problem setting TLS options: %s.\n", mosquitto_strerror(rc));
}
mosquitto_lib_cleanup();
return 1;
}
# ifdef FINAL_WITH_TLS_PSK
}else if(cfg->psk){
if(mosquitto_tls_psk_set(mosq, cfg->psk, cfg->psk_identity, NULL)){
err_printf(cfg, "Error: Problem setting TLS-PSK options.\n");
mosquitto_lib_cleanup();
return 1;
}
# endif
}else if(cfg->port == 8883){
mosquitto_int_option(mosq, MOSQ_OPT_TLS_USE_OS_CERTS, 1);
}
这段代码主要是设置MQTT连接的TLS选项。当配置文件中指定了CA证书文件(cafile
)或者CA证书路径(capath
)时,使用mosquitto_tls_set()
函数设置TLS选项;否则,当psk
存在时,使用mosquitto_tls_psk_set()
函数设置TLS-PSK选项;最后,当MQTT连接的端口为8883时,使用操作系统默认证书,这里调用了mosquitto_int_option()
函数完成设置。
具体设置如下:
cfg->cafile
与 cfg->capath
有效时,代码调用mosquitto_tls_set()函数设置了4个参数给mosq对象的相应数据项,cfg->cafile
与 cfg->capath
分别指定了 CA 证书文件和路径,客户端和服务器都需要使用该文件或路径中的证书来校验对方的身份。cfg->certfile
和 cfg->keyfile
分别指定了客户端使用的证书和私钥,用于向服务器证明客户端的身份。cfg->cafile
与 cfg->capath
无效,如果FINAL_WITH_TLS_PSK有效,并且定义了cfg->psk,则调用mosquitto_tls_psk_set()
函数为MQTT客户端mosq对象设置cfg->psk
和cfg->psk_identity
,这两个参数是用于设置使用PSK认证的客户端的密钥和身份信息的,以便在TLS握手中进行身份验证。 if(cfg->tls_use_os_certs){
mosquitto_int_option(mosq, MOSQ_OPT_TLS_USE_OS_CERTS, 1);
}
该函数的作用是设置mosquitto客户端是否使用操作系统的证书库。具体来说,当参数 MOSQ_OPT_TLS_USE_OS_CERTS
为1时,mosquitto客户端将使用操作系统的证书库来验证TLS/SSL证书。
if(cfg->insecure && mosquitto_tls_insecure_set(mosq, true)){
err_printf(cfg, "Error: Problem setting TLS insecure option.\n");
mosquitto_lib_cleanup();
return 1;
}
上面的代码就是在WITH_TLS有效时,设置mosq->tls_insecure = true;
if(cfg->tls_engine && mosquitto_string_option(mosq, MOSQ_OPT_TLS_ENGINE, cfg->tls_engine)){
err_printf(cfg, "Error: Problem setting TLS engine, is %s a valid engine?\n", cfg->tls_engine);
mosquitto_lib_cleanup();
return 1;
}
上面的代码就是在WITH_TLS有效时,设置 mosq->tls_engine = strdup(cfg->tls_engine);
if(cfg->keyform && mosquitto_string_option(mosq, MOSQ_OPT_TLS_KEYFORM, cfg->keyform)){
err_printf(cfg, "Error: Problem setting key form, it must be one of 'pem' or 'engine'.\n");
mosquitto_lib_cleanup();
return 1;
}
上面的代码就是在WITH_TLS有效并且禁用 OpenSSL 引擎模块时,将引擎模块指定为 mosq->tls_engine = strdup(cfg->tls_engine);
if(cfg->tls_engine_kpass_sha1 && mosquitto_string_option(mosq, MOSQ_OPT_TLS_ENGINE_KPASS_SHA1, cfg->tls_engine_kpass_sha1)){
err_printf(cfg, "Error: Problem setting TLS engine key pass sha, is it a 40 character hex string?\n");
mosquitto_lib_cleanup();
return 1;
}
上面的代码就是在WITH_TLS有效时,设置mosq->tls_engine_kpass_sha1 = strdup(cfg->tls_engine_kpass_sha1);
if(cfg->tls_alpn && mosquitto_string_option(mosq, MOSQ_OPT_TLS_ALPN, cfg->tls_alpn)){
err_printf(cfg, "Error: Problem setting TLS ALPN protocol.\n");
mosquitto_lib_cleanup();
return 1;
}
上面的代码就是在WITH_TLS有效时,设置mosq->tls_alpn = strdup(value);
if((cfg->tls_version || cfg->ciphers) && mosquitto_tls_opts_set(mosq, 1, cfg->tls_version, cfg->ciphers)){
err_printf(cfg, "Error: Problem setting TLS options, check the options are valid.\n");
mosquitto_lib_cleanup();
return 1;
}
#endif
mosquitto_max_inflight_messages_set(mosq, cfg->max_inflight);
上述代码结果:
#ifdef WITH_SOCKS
if(cfg->socks5_host){
rc = mosquitto_socks5_set(mosq, cfg->socks5_host, cfg->socks5_port, cfg->socks5_username, cfg->socks5_password);
if(rc){
mosquitto_lib_cleanup();
return rc;
}
}
#endif
上面的代码在用socket通信的的情况下,会进行下面的赋值:
mosq->socks5_host = strdup(host);
mosq->socks5_port = (uint16_t)cfg->socks5_port;
mosq->socks5_username = strdup(cfg->socks5_username);
mosq->socks5_password = strdup(cfg->socks5_password);
if(cfg->tcp_nodelay){
mosquitto_int_option(mosq, MOSQ_OPT_TCP_NODELAY, 1);
}
上面的代码执行结果: mosq->tcp_nodelay =1;
if(cfg->msg_count > 0 && cfg->msg_count < 20){
/* 20 is the default "receive maximum"
* If we don't set this, then we can receive > msg_count messages
* before we quit.*/
mosquitto_int_option(mosq, MOSQ_OPT_RECEIVE_MAXIMUM, cfg->msg_count);
}
上面的代码的功能是,在cfg->msg_count 大于0,小于20的情况下,mosq->msgs_in.inflight_maximum = (uint16_t)value;
其中20是默认的“最大接收数”
return MOSQ_ERR_SUCCESS;
}
mosquitto_int_option()
函数是Mosquitto客户端库中的一个函数,它适用于设置下面的情形:配置参数为enum mosq_opt_t 枚举类型,设置值为整数,完成后会将这个整型数赋值给客户端对象的相应成员。
int mosquitto_int_option(struct mosquitto *mosq, int option, int value)
该函数有三个参数:
mosq
:Mosquitto客户端结构体指针。option
:要设置的选项的标识符,为enum mosq_opt_t
类型。value
:选项的值,通常为int类型。该函数与上面的函数mosquitto_int_option()是相似的,mosquitto库中用来设置字符串类型选项的函数。它适用于设置下面的情形:配置参数为enum mosq_opt_t 枚举类型,设置值为字符串,完成后会将这个字符串赋值给客户端对象的相应成员。
int mosquitto_string_option(struct mosquitto *mosq, enum mosq_opt_t option, const char *value)
函数参数如下:
enum mosq_opt_t
类型。举个例子,如果要设置mqtt用户名,则可以使用此函数进行设置,如下所示:
mosquitto_string_option(mosq, MOSQ_OPT_USERNAME, "testuser");
mosquitto_will_set_v5函数是设置遗嘱消息的函数,支持MQTTv5版本。这个函数可以设置遗嘱信息的内容、QoS、Retain等信息,并且支持属性列表。该函数会将这些参数加入到mosquitto客户端实例的遗嘱消息结构体中。如果客户端已经设定了遗嘱消息,该函数会删除原来的遗嘱消息并用新的遗嘱消息替换它。如果没有设定遗嘱消息,那么该函数会新建一条遗嘱消息。
int mosquitto_will_set_v5(struct mosquitto *mosq, const char *topic, int payloadlen, const void *payload, int qos, bool retain, mosquitto_property *properties)
函数参数:
if(cfg->will_topic && mosquitto_will_set_v5(mosq, cfg->will_topic,
cfg->will_payloadlen, cfg->will_payload, cfg->will_qos,
cfg->will_retain, cfg->will_props)){
err_printf(cfg, "Error: Problem setting will.\n");
mosquitto_lib_cleanup();
return 1;
}
上面的代码片段是从client_opts_set()函数里面拷贝过来的。
如果服务器要求客户端使用TLS/SSL加密连接(WITH_TLS有效),则可通过 mosquitto_tls_set()
函数来设置TLS/SSL验证所需的证书和私钥文件,以及密码回调函数。如果你不需要进行双向认证,则只需要设置 cafile
参数即可。用于设置TLS/SSL加密连接所需要的证书和私钥等信息。
具体的参数解释如下:
mosq
:指向mosquitto实例结构体的指针。cafile
:指向PEM格式的CA证书文件的路径(可选)。capath
:指向CA证书目录的路径(可选)。certfile
:指向PEM格式的客户端证书文件的路径(可选)。keyfile
:指向PEM格式的客户端私钥文件的路径(可选)。pw_callback
:密码回调函数,用于为客户端私钥提供密码(目前实际调用NULL)。该函数用于设置客户端的用户名和密码,当连接到需要用户名和密码验证的MQTT服务器时使用。
int mosquitto_username_pw_set(struct mosquitto *mosq, const char *username, const char *password)
参数说明:
client_opts_set()函数里还调用了其他函数,都与上面的函数大同小异,这里不再一一列举。