1.配置加载
int network_mysqld_proxy_plugin_apply_config(chassis *chas, chassis_plugin_config *config);
这是proxy plugin配置加载函数。proxy plugin的配置通过一个指针指向主配置
config->chas = chas; //config的类型为chassis_plugin_config,每个插件都有各自不同的实现
backends的信息保存在主配置中:
chassis_private *g = chas->priv;
/**
* @author sohu-inc.com
* 为实现对cons的同步访问
*/
struct chassis_private {
GMutex cons_mutex;
GPtrArray *cons; /**< array(network_mysqld_con) */
network_backends_t *backends;
gint connection_id_sequence;
};
SOHO对读写请求分别配置负载均衡算法,可选的算法有两种,保存在主配置中
GString * (* lb_algo_func[2])( chassis *chas, proxy_rw conn_type);
最小连接数算法:
chas->lb_algo[PROXY_TYPE_WRITE] = "lc";
chas->lb_algo_func[PROXY_TYPE_WRITE] = loadbalance_lc_select; //针对写请求
chas->lb_algo[PROXY_TYPE_READ] = "lc";
chas->lb_algo_func[PROXY_TYPE_READ] = loadbalance_lc_select; //针对读请求
chas->lb_algo[PROXY_TYPE_WRITE] = "wrr";
chas->lb_algo_func[PROXY_TYPE_WRITE] = loadbalance_wrr_select;
chas->lb_algo[PROXY_TYPE_READ] = "wrr";
chas->lb_algo_func[PROXY_TYPE_READ] = loadbalance_wrr_select;
g_string_append(chas->listen_addresses[PROXY_TYPE_WRITE], ":4040");
g_string_append(chas->listen_addresses[PROXY_TYPE_READ], ":4242");
config->listen_configs[0] = g_ptr_array_new();//写监听地址,每个数组成员为一个chassis_plugin_config
config->listen_configs[1] = g_ptr_array_new();//读监听地址,<span style="font-family: Arial, Helvetica, sans-serif; font-size: 12px;">每个数组成员为一个chassis_plugin_config</span>
struct chassis_plugin_config *chassis_plugin_config_new_init_2(
const struct chassis_plugin_config *config,
const gchar *ip_port, proxy_rw type,
chassis *chas)
该函数创建监听连接并加载配置后将监听连接会被汇总到主配置中
g_hash_table_insert(chas->listen_cons[type], listen_key, con);//key是ip和port
gboolean config_backends_load(chassis *chas, proxy_rw rw_type)
该函数读取每台backends的配置信息后,将各台backends的信息添加到主配置:
int network_backends_add2(network_backends_t *bs, const gchar *address, backend_type_t type, backend_state_t state, const backend_config_t *backend_config)
每台backends的结构如下:
typedef struct backend_config_t {
gchar *ip_port;
guint rw_weight;
guint ro_weight;
backend_state_t state;
health_check_t health_check;
} backend_config_t;
struct network_backend_t {
network_address *addr;
backend_state_t state; /**< UP or DOWN */
backend_type_t type; /**< ReadWrite or ReadOnly */
GTimeVal state_since; /**< timestamp of the last state-change */
/**
* @author sohu-inc.com
*/
//原来的结构,现在需要两套连接池分读写
//network_connection_pool *pool; /**< the pool of open connections */
network_connection_pool *pool[2]; /**< the pool of open connections */
//原来的结构,现在需要统计两套分读写请求
//guint connected_clients; /**< number of open connections to this backend for SQF */
guint connected_clients[2]; /**</ 与该backend交互的活动连接数*/
GMutex mutex[2]; //用于实现对相应的connected_clients[]的同步访问
guint connect_w[2]; //主要是用于负载均衡,现在的做法是读写各自维护一套权重
GString *uuid; /**< the UUID of the backend */
guint port;
GString *ip;
/**健康检查配置*/
health_check_t health_check;
};
SOHO为每台backends单独创建一个线程用于其状态监测,这些线程由主配置统一管理:
chas->detect_threads = backend_detect_threads_new();
每个线程对象的结构如下:
struct backend_detect_thread_t {
GThread *thr;
struct event_base *event_base;
network_backend_t *backend; /**< 指向该线程负责检查的backend */
chassis *chas; /**< 基础变量 */
guint index; //索引
GString *name; //线程的名字,thread_index
detection_task *task;
};
typedef struct detection_task {
backend_detect_thread_t *detect_thread;
zabbix_socket *sock;
backend_state_t backend_check_state;
backend_result *backend_check_result;
/* 监测配置相关信息 */
gboolean on; /**< 是否是一直检测。没用 */
} detection_task;
SOHO通过zabbix_agented实现状态检测,需要在detection_task结构中配置相应的用户名和密码,以及后端检测命令:
backend = task->detect_thread->backend;
sock = task->sock;
gchar *user = "test";
gchar *password = "test";//都是写死的。。。
extern char *zabbixuser;
extern char *zabbixuserpassword;
if(zabbixuser != NULL && zabbixuserpassword != NULL)
{
<span style="white-space:pre"> </span>user = (gchar *)zabbixuser;
password = (gchar *)zabbixuserpassword;
}
// 生成后端检测的命令
zabbix_socket_set_agentd_cmd(sock, "pxc.checkStatus", backend->ip->str,backend->port, user, password);//pxc.checkStatus实际上是个脚本
// 设置socket 的地址
zabbix_socket_set_agentd_address(sock, "127.0.0.1", 10050);
void *backend_detect_thread_loop(backend_detect_thread_t *detect_thread)
首先,说明一下backend可能存在的状态:
typedef enum {
BACKEND_STATE_UNKNOWN,
BACKEND_STATE_PENDING,//处于这个状态的backend不需要检测
BACKEND_STATE_UP,
BACKEND_STATE_DOWN
} backend_state_t;
SOHO对于不同状态的后端采用不同的检测 策略:
if (backend->state == BACKEND_STATE_PENDING) {
adjust_backend(BACKEND_STATE_PENDING, chas, backend);
goto SLEEP;
}
/* 根据当前状态,取不同的间隔时间 */
if (backend->state == BACKEND_STATE_DOWN) {
<span style="white-space:pre"> </span>interval_seconds = backend->health_check.fastdowninter;
} else {
interval_seconds = backend->health_check.inter;
}
而zabbix通信过程的状态机共有8个状态:
typedef enum {
ZABBIX_CON_STATE_INIT = 0,
ZABBIX_CON_STATE_WRITE_HEAD = 1,
ZABBIX_CON_STATE_WRITE_LENGTH = 2,
ZABBIX_CON_STATE_WRITE_CMD = 3,
ZABBIX_CON_STATE_READ_HEAD = 4,
ZABBIX_CON_STATE_READ_LENGTH = 5,
ZABBIX_CON_STATE_READ_RESULT = 6,
ZABBIX_CON_STATE_CLOSE_CONNECT = 7
} zabbix_con_state;
若连接已建立,则检查连接状态,若连接尚未建立则创建连接连接成功后状态转化为:ZABBIX_CON_STATE_WRITE_HEAD,若失败则进入:ZABBIX_CON_STATE_CLOSE_CONNECT
ZABBIX_CON_STATE_WRITE_HEAD:
发送头部信息,成功:ZABBIX_CON_STATE_WRITE_LENGTH,失败进入:ZABBIX_CON_STATE_CLOSE_CONNECT
ZABBIX_CON_STATE_WRITE_LENGTH:
发送命令数据的长度信息,成功:ZABBIX_CON_STATE_WRITE_CMD,失败:ZABBIX_CON_STATE_CLOSE_CONNECT,暂时无数据写入则暂时退出状态机。
ZABBIX_CON_STATE_WRITE_CMD:
发送命令字符串,成功:ZABBIX_CON_STATE_READ_HEAD,失败:ZABBIX_CON_STATE_CLOSE_CONNECT
ZABBIX_CON_STATE_READ_HEAD:
读取zabbix agent返回的数据包包头,成功:ZABBIX_CON_STATE_READ_LENGTH,失败:ZABBIX_CON_STATE_CLOSE_CONNECT
ZABBIX_CON_STATE_READ_LENGTH:
读取结果长度,成功:ZABBIX_CON_STATE_READ_RESULT,失败:ZABBIX_CON_STATE_CLOSE_CONNECT
ZABBIX_CON_STATE_READ_RESULT:
读取结果,成功后将状态机的退出状态设为True,只有在这里才会设为true,错误退抓状态设为ZABBIX_STATUS_MACHINE_NETWORK_ERROR
sock->exit_status = ZABBIX_STATUS_MACHINE_SUCCESS;
sock->state = ZABBIX_CON_STATE_CLOSE_CONNECT;
从任何一个状态中退出时都会进入到本状态
sock->is_over = TRUE;
zabbix_agent_close(sock);
break;
4. 连接池配置
SOHO为针对读写请求分别设置了默认的连接池配置:
struct user_pool_config *default_pool_config[2]; <span style="font-family: Arial, Helvetica, sans-serif; font-size: 12px;">// 默认的用户连接池分读写 </span>
<pre name="code" class="cpp">struct user_pool_config{<span style="font-family: Arial, Helvetica, sans-serif; font-size: 12px;"> </span>
guint min_connections; //<最小连接数 guint max_connections; //<最大连接数 gint max_idle_interval; //<最大空闲时间 };
同时,针对每个用户,设定了相应的连接池配置:
GHashTable *pool_config_per_user[2]; //对应后端用户的配置列表,GHashTable<GString<username>, user_pool_config *config> ,也是分读写的...
GRWLock pool_conf_lock[2];
struct connection_scaler_thread_t {
GThread *thr;
struct event_base *event_base;
GAsyncQueue *event_queue;
chassis *chas;
};
该线程的回调函数为:
void *connection_scaler_thread_loop( connection_scaler_thread_t *connection_scaler_thread)
连接池的检查会针对不同backends和不同用户,先检查一台backend上的所有用户连接:
for (index = 0; index < network_backends_count(backends); index++){
...
for (type = PROXY_TYPE_WRITE; type <= PROXY_TYPE_READ; type = (proxy_rw) (type + 1)) {
...
removed = network_connection_scaler_check_users_pool_status(chas, backend, type, pool);
...
}
}
然后其针对每个用户的连接池配置开始检测,检测函数为:
static guint network_connection_scaler_check_user_pool_status(chassis *chas,
network_backend_t *backend, proxy_rw type,
network_connection_pool *pool, const GString *username,
user_pool_config *user_pool_conf,
connection_scaler_pool_statistics *pool_stats) {
...
//step 1. 取用户连接池
//step 2. 删除空的用户连接池
//step 3. 取出一个连接检查,跳过已检查过的
//step 4. 最多检查N个,防止长时间消耗cpu
//step 5. 检查用户是否存在
//step 6. 连接数是否超过最大连接数
//step 7. 检查是否超时、连通性是否正常
}
检测连接池状态结束后开始清理回收连接池,并更新连接池状态
/*清理垃圾回收连接池*/
network_connection_scaler_clean_up_backends_gc_pool(connection_scaler_thread);
update_global_connection_state(connection_scaler_thread->chas);
/** 初始化连接限制的代码 */
chas->rule_table = user_db_sql_rule_table_new(); //SQL语句规则,可在xml里面为每个用户单独配置,下面的在xml文件里面没有写,暂时先不管了
chas->para_limit_rules = para_exec_limit_rules_new();
chas->para_running_statistic_dic = statistic_dic_new();
chas->dura_limit_rules = dura_exec_limit_rules_new();
chas->inbytes_list = query_inbytes_list_new();
chas->outbytes_list = query_outbytes_list_new();
chas->query_rate_list = query_rate_list_new();
chas->query_dml_list = query_dml_list_new();
这里它又启动了一个线程
/** 初始化并启动sql限制统计内存线程*/
chas->sql_statistics_thread = sql_statistics_thread_new();
sql_statistics_thread_init(chas->sql_statistics_thread, chas);
sql_statistics_thread_start(chas->sql_statistics_thread);
config_sqlrules_load(chas, SQL_SINGLE);//全局单条语句规则
config_sqlrules_load(chas, SQL_TEMPLATE)//全局模板语句规则
config_slow_query_log_load(chas);//慢查询日志配置
config_table_engine_replaceable_flag_load(chas)//表引擎替换
config_balck_list_flag_load(chas)//黑名单过滤
config_dml_kind_load(chas)//dml语言限制