当前位置: 首页 > 工具软件 > SOHU-DBProxy > 使用案例 >

SOHO-DBproxy源码分析-- proxy plugin配置加载

董光霁
2023-12-01

1.配置加载

int network_mysqld_proxy_plugin_apply_config(chassis *chas, chassis_plugin_config *config);
这是proxy plugin配置加载函数。proxy plugin的配置通过一个指针指向主配置

config->chas = chas; //config的类型为chassis_plugin_config,每个插件都有各自不同的实现

backends的信息保存在主配置中:

chassis_private *g = chas->priv;
/** 
 * @author sohu-inc.com
 * 为实现对cons的同步访问
 */     
struct chassis_private {
    GMutex cons_mutex;
    GPtrArray *cons;                          /**< array(network_mysqld_con) */
    network_backends_t *backends;
    gint connection_id_sequence;
};


SOHO对读写请求分别配置负载均衡算法,可选的算法有两种,保存在主配置中

GString * (* lb_algo_func[2])( chassis *chas, proxy_rw conn_type);

最小连接数算法:

chas->lb_algo[PROXY_TYPE_WRITE] = "lc";
chas->lb_algo_func[PROXY_TYPE_WRITE] = loadbalance_lc_select; //针对写请求

chas->lb_algo[PROXY_TYPE_READ] = "lc";
chas->lb_algo_func[PROXY_TYPE_READ] = loadbalance_lc_select; //针对读请求


权重算法:

chas->lb_algo[PROXY_TYPE_WRITE] = "wrr";
chas->lb_algo_func[PROXY_TYPE_WRITE] = loadbalance_wrr_select;


chas->lb_algo[PROXY_TYPE_READ] = "wrr";
chas->lb_algo_func[PROXY_TYPE_READ] = loadbalance_wrr_select;


默认情况下,选择最小连接数算法作为负载均衡算法。


SOHO对于读写请求也提供了不同的监听端口(支持动态绑定多虚ip),默认配置如下:

g_string_append(chas->listen_addresses[PROXY_TYPE_WRITE], ":4040");
g_string_append(chas->listen_addresses[PROXY_TYPE_READ], ":4242");

每个监听地址拥有不同的配置:

config->listen_configs[0] = g_ptr_array_new();//写监听地址,每个数组成员为一个chassis_plugin_config
config->listen_configs[1] = g_ptr_array_new();//读监听地址,<span style="font-family: Arial, Helvetica, sans-serif; font-size: 12px;">每个数组成员为一个chassis_plugin_config</span>


通过以下函数创建监听连接并加载其配置(大部分属性继承上层配置)

struct chassis_plugin_config *chassis_plugin_config_new_init_2(
        const struct chassis_plugin_config *config,
        const gchar *ip_port, proxy_rw type,
        chassis *chas)

该函数创建监听连接并加载配置后将监听连接会被汇总到主配置中

g_hash_table_insert(chas->listen_cons[type], listen_key, con);//key是ip和port


完成对监听连接和负载均衡算法的配置后,开始加载backends的信息:

gboolean config_backends_load(chassis *chas, proxy_rw rw_type)
该函数读取每台backends的配置信息后,将各台backends的信息添加到主配置:

int network_backends_add2(network_backends_t *bs, const gchar *address, backend_type_t type, backend_state_t state, const backend_config_t *backend_config)
每台backends的结构如下:

typedef struct backend_config_t {
    gchar *ip_port;
    guint rw_weight;
    guint ro_weight;
    backend_state_t state;
    health_check_t health_check;
} backend_config_t;

struct network_backend_t {
    network_address *addr;
   
    backend_state_t state;   /**< UP or DOWN */
    backend_type_t type;     /**< ReadWrite or ReadOnly */

    GTimeVal state_since;    /**< timestamp of the last state-change */
    
    /** 
     * @author sohu-inc.com
     */
    
    //原来的结构,现在需要两套连接池分读写
    //network_connection_pool *pool; /**< the pool of open connections */
    network_connection_pool *pool[2]; /**< the pool of open connections */
    
    //原来的结构,现在需要统计两套分读写请求
    //guint connected_clients; /**< number of open connections to this backend for SQF */
    guint connected_clients[2]; /**</ 与该backend交互的活动连接数*/
    
    GMutex mutex[2]; //用于实现对相应的connected_clients[]的同步访问
    guint connect_w[2]; //主要是用于负载均衡,现在的做法是读写各自维护一套权重
    
    GString *uuid;           /**< the UUID of the backend */
    guint port;
    GString *ip;

    /**健康检查配置*/
    health_check_t health_check;
};


2. backends状态监测

SOHO为每台backends单独创建一个线程用于其状态监测,这些线程由主配置统一管理:

chas->detect_threads = backend_detect_threads_new();
每个线程对象的结构如下:

struct backend_detect_thread_t {
    GThread *thr;
    struct event_base *event_base;
            
    network_backend_t *backend; /**< 指向该线程负责检查的backend */
    chassis *chas; /**< 基础变量 */

    guint index; //索引
    GString *name; //线程的名字,thread_index
    
    detection_task *task;
};

其中detection_task对象为监测线程的关键部分,其结构如下
typedef struct detection_task {
    backend_detect_thread_t *detect_thread;
    zabbix_socket *sock;
    
    backend_state_t backend_check_state;
    backend_result *backend_check_result;

    /* 监测配置相关信息 */
    gboolean on; /**< 是否是一直检测。没用 */

} detection_task;
SOHO通过zabbix_agented实现状态检测,需要在detection_task结构中配置相应的用户名和密码,以及后端检测命令:

backend = task->detect_thread->backend;
sock = task->sock;
gchar *user = "test";
gchar *password = "test";//都是写死的。。。
    
extern char *zabbixuser;
extern char *zabbixuserpassword;
            
if(zabbixuser != NULL && zabbixuserpassword != NULL)
{   
<span style="white-space:pre">	</span>user = (gchar *)zabbixuser;
        password = (gchar *)zabbixuserpassword;
}   
// 生成后端检测的命令
zabbix_socket_set_agentd_cmd(sock, "pxc.checkStatus", backend->ip->str,backend->port, user, password);//pxc.checkStatus实际上是个脚本
// 设置socket 的地址
zabbix_socket_set_agentd_address(sock, "127.0.0.1", 10050);

到这里所有有关backend的配置信息就全部加在完了,启动检测线程,其执行函数为:

void *backend_detect_thread_loop(backend_detect_thread_t *detect_thread) 

3. 状态监测

首先,说明一下backend可能存在的状态:

typedef enum {
    BACKEND_STATE_UNKNOWN,
    BACKEND_STATE_PENDING,//处于这个状态的backend不需要检测
    BACKEND_STATE_UP, 
    BACKEND_STATE_DOWN
} backend_state_t;


SOHO对于不同状态的后端采用不同的检测 策略:

if (backend->state == BACKEND_STATE_PENDING) {
        adjust_backend(BACKEND_STATE_PENDING, chas, backend);
        goto SLEEP;
}

/* 根据当前状态,取不同的间隔时间 */
if (backend->state == BACKEND_STATE_DOWN) {
<span style="white-space:pre">	</span>interval_seconds = backend->health_check.fastdowninter;
} else {
        interval_seconds = backend->health_check.inter;
}
而zabbix通信过程的状态机共有8个状态:
typedef enum {
    ZABBIX_CON_STATE_INIT = 0,
    ZABBIX_CON_STATE_WRITE_HEAD = 1,
    ZABBIX_CON_STATE_WRITE_LENGTH = 2,
    ZABBIX_CON_STATE_WRITE_CMD = 3,
    ZABBIX_CON_STATE_READ_HEAD = 4,
    ZABBIX_CON_STATE_READ_LENGTH = 5,
    ZABBIX_CON_STATE_READ_RESULT = 6,
    ZABBIX_CON_STATE_CLOSE_CONNECT = 7
} zabbix_con_state;

ZABBIX_CON_STATE_INIT:

若连接已建立,则检查连接状态,若连接尚未建立则创建连接连接成功后状态转化为:ZABBIX_CON_STATE_WRITE_HEAD,若失败则进入:ZABBIX_CON_STATE_CLOSE_CONNECT


ZABBIX_CON_STATE_WRITE_HEAD:

发送头部信息,成功:ZABBIX_CON_STATE_WRITE_LENGTH,失败进入:ZABBIX_CON_STATE_CLOSE_CONNECT


ZABBIX_CON_STATE_WRITE_LENGTH:

发送命令数据的长度信息,成功:ZABBIX_CON_STATE_WRITE_CMD,失败:ZABBIX_CON_STATE_CLOSE_CONNECT,暂时无数据写入则暂时退出状态机。


ZABBIX_CON_STATE_WRITE_CMD:

发送命令字符串,成功:ZABBIX_CON_STATE_READ_HEAD,失败:ZABBIX_CON_STATE_CLOSE_CONNECT


ZABBIX_CON_STATE_READ_HEAD:

读取zabbix agent返回的数据包包头,成功:ZABBIX_CON_STATE_READ_LENGTH,失败:ZABBIX_CON_STATE_CLOSE_CONNECT


ZABBIX_CON_STATE_READ_LENGTH:

读取结果长度,成功:ZABBIX_CON_STATE_READ_RESULT,失败:ZABBIX_CON_STATE_CLOSE_CONNECT


ZABBIX_CON_STATE_READ_RESULT:

读取结果,成功后将状态机的退出状态设为True,只有在这里才会设为true,错误退抓状态设为ZABBIX_STATUS_MACHINE_NETWORK_ERROR

sock->exit_status = ZABBIX_STATUS_MACHINE_SUCCESS;
sock->state = ZABBIX_CON_STATE_CLOSE_CONNECT;

ZABBIX_CON_STATE_CLOSE_CONNECT:

从任何一个状态中退出时都会进入到本状态

sock->is_over = TRUE;
zabbix_agent_close(sock);
break;

4. 连接池配置

 SOHO为针对读写请求分别设置了默认的连接池配置:

struct user_pool_config *default_pool_config[2];  <span style="font-family: Arial, Helvetica, sans-serif; font-size: 12px;">// 默认的用户连接池分读写                           </span>

<pre name="code" class="cpp">struct user_pool_config{<span style="font-family: Arial, Helvetica, sans-serif; font-size: 12px;">                                                                                                           </span>
guint min_connections; //<最小连接数 guint max_connections; //<最大连接数 gint max_idle_interval; //<最大空闲时间 };

 

同时,针对每个用户,设定了相应的连接池配置:

GHashTable *pool_config_per_user[2]; //对应后端用户的配置列表,GHashTable<GString<username>, user_pool_config *config> ,也是分读写的...                                                                                                 
GRWLock pool_conf_lock[2];


SOHO专门创建了一个线程管理连接池,其结构如下:

struct connection_scaler_thread_t {                                                                           
    GThread *thr;                                                                                             
    struct event_base *event_base;                                                                            
    GAsyncQueue *event_queue;                                                                                 
    chassis *chas;                                                                                            
};
该线程的回调函数为:

void *connection_scaler_thread_loop( connection_scaler_thread_t *connection_scaler_thread)
连接池的检查会针对不同backends和不同用户,先检查一台backend上的所有用户连接:

for (index = 0; index < network_backends_count(backends); index++){
      ...
      for (type = PROXY_TYPE_WRITE; type <= PROXY_TYPE_READ; type =  (proxy_rw) (type + 1)) {
              ...
              removed = network_connection_scaler_check_users_pool_status(chas, backend, type, pool);
              ...
          }
}


然后其针对每个用户的连接池配置开始检测,检测函数为:

static guint network_connection_scaler_check_user_pool_status(chassis *chas,                                  
        network_backend_t *backend, proxy_rw type,                                                            
        network_connection_pool *pool, const GString *username,                                               
        user_pool_config *user_pool_conf,                                                                     
        connection_scaler_pool_statistics *pool_stats) {
       ...
       //step 1. 取用户连接池
       //step 2. 删除空的用户连接池
       //step 3. 取出一个连接检查,跳过已检查过的
       //step 4. 最多检查N个,防止长时间消耗cpu
       //step 5. 检查用户是否存在
       //step 6. 连接数是否超过最大连接数
       //step 7. 检查是否超时、连通性是否正常
}

检测连接池状态结束后开始清理回收连接池,并更新连接池状态

/*清理垃圾回收连接池*/                                                                                
network_connection_scaler_clean_up_backends_gc_pool(connection_scaler_thread);                        
update_global_connection_state(connection_scaler_thread->chas); 

5. 初始化其他连接限制

/** 初始化连接限制的代码  */                                                                              
    chas->rule_table = user_db_sql_rule_table_new(); //SQL语句规则,可在xml里面为每个用户单独配置,下面的在xml文件里面没有写,暂时先不管了
    chas->para_limit_rules = para_exec_limit_rules_new();                                                     
    chas->para_running_statistic_dic = statistic_dic_new();                                                   
    chas->dura_limit_rules = dura_exec_limit_rules_new();                                                     
    chas->inbytes_list = query_inbytes_list_new();                                                            
    chas->outbytes_list = query_outbytes_list_new();                                                          
    chas->query_rate_list = query_rate_list_new();                                                            
    chas->query_dml_list = query_dml_list_new();

这里它又启动了一个线程

/** 初始化并启动sql限制统计内存线程*/                                                                     
chas->sql_statistics_thread = sql_statistics_thread_new();                                                
sql_statistics_thread_init(chas->sql_statistics_thread, chas);                                            
sql_statistics_thread_start(chas->sql_statistics_thread);

加载一下配置:

config_sqlrules_load(chas, SQL_SINGLE);//全局单条语句规则
config_sqlrules_load(chas, SQL_TEMPLATE)//全局模板语句规则
config_slow_query_log_load(chas);//慢查询日志配置
config_table_engine_replaceable_flag_load(chas)//表引擎替换
config_balck_list_flag_load(chas)//黑名单过滤
config_dml_kind_load(chas)//dml语言限制

到这里,全部的配置就加载完了。。。。这之后SOHO又做了一些后端服务应进行的操作,包括更改进程属主、设置信号监听事件等,接下来就是进入工作线程中开始准备proxy的工作了。











 类似资料: