最近项目里发现了一个比较严重的bug,线上服务器游戏服创建了大量的mysql连接没有释放,轻松达到了设定的连接数上限,因为使用了第三方的驱动erlang_mysql_driver,排查的过程中就研究了下驱动的源码。
代码比较简单,四个文件组成:
mysql.erl:唯一一个behavior为gen_server的模块,主要封装了一些外部调用的api(比如执行sql语句)以及管理连接池。
mysql_conn.erl:负责处理一条与mysql数据库的连接,可单独使用,也可通过mysql模块管理多条连接,甚至可以连接到不同的服务器。sql语句的执行和返回结果也是由这个模块处理的。
mysql_recv.erl:负责实际与mysql的tcp连接,将结果返回给父进程(mysql_conn).
mysql_auth.erl:提供给mysql_conn模块调用的接口,认证相关,可以忽略。
mysql模块提供两种启动方式:start_link和start,实际上都是由start1函数处理,区别就是start1最终调用的是gen_server:start_link还是gen_server:start(前者可以启动在supervisor监督树下,后者则是启动独立的进程)。
要注意的是,mysql启动的进程别名不是mysql而是mysql_dispatcher.
init([PoolId, Host, Port, User, Password, Database, LogFun, Encoding]) ->
LogFun1 = if LogFun == undefined -> fun log/4; true -> LogFun end,
case mysql_conn:start(Host, Port, User, Password, Database, LogFun1,
Encoding, PoolId) of
{ok, ConnPid} ->
Conn = new_conn(PoolId, ConnPid, true, Host, Port, User, Password,
Database, Encoding),
State = #state{log_fun = LogFun1},
{ok, add_conn(Conn, State)};
{error, _Reason} ->
?Log(LogFun1, error,
"failed starting first MySQL connection handler, "
"exiting"),
C = #conn{pool_id = PoolId,
reconnect = true,
host = Host,
port = Port,
user = User,
password = Password,
database = Database,
encoding = Encoding},
start_reconnect(C, LogFun),
{ok, #state{log_fun = LogFun1}}
end.
init函数主要做了两件事:
一是启动一个mysql_conn进程,如果正常启动就继续,如果启动失败则调用msql:start_reconnect/2进行重连。
二是如果启动成功,调用mysql:add_conn/2将启动的连接进程信息(比如pid,poolId,端口等)保存到#state{}中。
add_conn(Conn, State) ->
Pid = Conn#conn.pid,
erlang:monitor(process, Conn#conn.pid),
PoolId = Conn#conn.pool_id,
ConnPools = State#state.conn_pools,
NewPool =
case gb_trees:lookup(PoolId, ConnPools) of
none ->
{[Conn],[]};
{value, {Unused, Used}} ->
{[Conn | Unused], Used}
end,
State#state{conn_pools =
gb_trees:enter(PoolId, NewPool,
ConnPools),
pids_pools = gb_trees:enter(Pid, PoolId,
State#state.pids_pools)}.
这个add_conn函数值得仔细关注下:
当mysql:init/1启动mysql:start失败或者当收到‘DOWN’消息且#conn.reconnect字段为true时,首先将旧的连接通过mysql:remove_conn/2移除,然后触发调用mysql:start_reconnect/2函数进行重连。
start_reconnect(Conn, LogFun) ->
Pid = spawn(fun () ->
process_flag(trap_exit, true),
reconnect_loop(Conn#conn{pid = undefined}, LogFun, 0)
end),
{PoolId, Host, Port} = {Conn#conn.pool_id, Conn#conn.host, Conn#conn.port},
?Log2(LogFun, debug,
"started pid ~p to try and reconnect to ~p:~s:~p (replacing "
"connection with pid ~p)",
[Pid, PoolId, Host, Port, Conn#conn.pid]),
ok.
reconnect_loop(Conn, LogFun, N) ->
{PoolId, Host, Port} = {Conn#conn.pool_id, Conn#conn.host, Conn#conn.port},
case connect(PoolId,
Host,
Port,
Conn#conn.user,
Conn#conn.password,
Conn#conn.database,
Conn#conn.encoding,
Conn#conn.reconnect) of
{ok, ConnPid} ->
?Log2(LogFun, debug,
"managed to reconnect to ~p:~s:~p "
"(connection pid ~p)", [PoolId, Host, Port, ConnPid]),
ok;
{error, Reason} ->
%% log every once in a while
NewN = case N of
1 ->
?Log2(LogFun, debug,
"reconnect: still unable to connect to "
"~p:~s:~p (~p)", [PoolId, Host, Port, Reason]),
0;
_ ->
N + 1
end,
%% sleep between every unsuccessful attempt
timer:sleep(5000),
reconnect_loop(Conn, LogFun, NewN)
end.
这里mysql_dispatcher并没有自己阻塞住去执行mysql:reconnect_loop/3循环,而是spawn一个临时进程去执行这个循环,并通过process_flag(trap_exit,true)来避免这个进程被异常关闭。mysql:
reconnect_loop/3函数调用mysql:connect/8建立新的连接,并打印重连日志。
connect(PoolId, Host, Port, User, Password, Database, Encoding, Reconnect,
LinkConnection) ->
Port1 = if Port == undefined -> ?PORT; true -> Port end,
Fun = if LinkConnection ->
fun mysql_conn:start_link/8;
true ->
fun mysql_conn:start/8
end,
{ok, LogFun} = gen_server:call(?SERVER, get_logfun),
case Fun(Host, Port1, User, Password, Database, LogFun,
Encoding, PoolId) of
{ok, ConnPid} ->
Conn = new_conn(PoolId, ConnPid, Reconnect, Host, Port1, User,
Password, Database, Encoding),
case gen_server:call(
?SERVER, {add_conn, Conn}) of
ok ->
{ok, ConnPid};
Res ->
Res
end;
Err->
Err
end.
mysql:connect /8将会启动一个mysql_conn进程负责与mysql数据库连接,需要传递给mysql_conn对应的PoolId,启动成功返回mysql_conn的进程PId。
注意,mysql_dispatcher进程并不是mysql_conn的父进程与监督者,当LinkConnection参数为start_link时,调用mysql:connect的进程才是真正的监督者。mysql_dispatcher进程只是负责管理连接池并进行调度。
fetch_queries(PoolId, From, State, QueryList) ->
with_next_conn(
PoolId, State,
fun(Conn, State1) ->
Pid = Conn#conn.pid,
mysql_conn:fetch(Pid, QueryList, From),
{noreply, State1}
end).
with_next_conn(PoolId, State, Fun) ->
case get_next_conn(PoolId, State) of
{ok, Conn, NewState} ->
Fun(Conn, NewState);
error ->
%% we have no active connection matching PoolId
{reply, {error, {no_connection_in_pool, PoolId}}, State}
end.
get_next_conn(PoolId, State) ->
ConnPools = State#state.conn_pools,
case gb_trees:lookup(PoolId, ConnPools) of
none ->
error;
{value, {[],[]}} ->
error;
{value, {[], Used}} ->
[Conn | Conns] = lists:reverse(Used),
{ok, Conn,
State#state{conn_pools =
gb_trees:enter(PoolId, {Conns, [Conn]}, ConnPools)}};
{value, {[Conn|Unused], Used}} ->
{ok, Conn, State#state{
conn_pools =
gb_trees:enter(PoolId, {Unused, [Conn|Used]},
ConnPools)}}
end.
mysql_dispatcher负责将业务进程的fetch操作调度给不同的mysql_conn连接。
当有一个新的fetch请求时,通过get_next_conn获取下一个可用的mysql_conn连接进程,调度算法是:
send_reply(GenSrvFrom, Res) when is_pid(GenSrvFrom) ->
GenSrvFrom ! {fetch_result, self(), Res};
send_reply(GenSrvFrom, Res) ->
gen_server:reply(GenSrvFrom, Res).
只有mysql_conn查询结束后,通过gen_server:reply将结果返回From的业务进程,业务进程才会结束阻塞。
start(Host, Port, User, Password, Database, LogFun, Encoding, PoolId) ->
start(Host, Port, User, Password, Database, LogFun, Encoding, PoolId,
false).
start(Host, Port, User, Password, Database, LogFun, Encoding, PoolId,
FoundRows) ->
ConnPid = self(),
Pid = spawn(fun () ->
init(Host, Port, User, Password, Database,
LogFun, Encoding, PoolId, ConnPid, FoundRows)
end),
post_start(Pid).
start_link(Host, Port, User, Password, Database, LogFun, Encoding, PoolId) ->
start_link(Host, Port, User, Password, Database, LogFun, Encoding,
PoolId, false).
start_link(Host, Port, User, Password, Database, LogFun, Encoding, PoolId,
FoundRows) ->
ConnPid = self(),
Pid = spawn_link(fun () ->
init(Host, Port, User, Password, Database,
LogFun, Encoding, PoolId, ConnPid, FoundRows)
end),
post_start(Pid).
mysql_conn提供两种启动mysql_conn进程的方式:start和start_link,并不是通过调用gen_server的接口,而是直接通过spawn和spawn_link启动,返回启动后进程的PId。
init(Host, Port, User, Password, Database, LogFun, Encoding, PoolId, Parent,
FoundRows) ->
case mysql_recv:start_link(Host, Port, LogFun, self()) of
{ok, RecvPid, Sock} ->
case mysql_init(Sock, RecvPid, User, Password, LogFun, FoundRows) of
{ok, Version} ->
Db = iolist_to_binary(Database),
case do_query(Sock, RecvPid, LogFun,
<<"use `", Db/binary, "`">>,
Version) of
{error, MySQLRes} ->
?Log2(LogFun, error,
"mysql_conn: Failed changing to database "
"~p : ~p",
[Database,
mysql:get_result_reason(MySQLRes)]),
Parent ! {mysql_conn, self(),
{error, failed_changing_database}};
%% ResultType: data | updated
{_ResultType, _MySQLRes} ->
Parent ! {mysql_conn, self(), ok},
case Encoding of
undefined -> undefined;
_ ->
EncodingBinary = list_to_binary(atom_to_list(Encoding)),
do_query(Sock, RecvPid, LogFun,
<<"set names '", EncodingBinary/binary, "'">>,
Version)
end,
State = #state{mysql_version=Version,
recv_pid = RecvPid,
socket = Sock,
log_fun = LogFun,
pool_id = PoolId,
data = <<>>
},
loop(State)
end;
{error, _Reason} ->
Parent ! {mysql_conn, self(), {error, login_failed}}
end;
E ->
?Log2(LogFun, error,
"failed connecting to ~p:~p : ~p",
[Host, Port, E]),
Parent ! {mysql_conn, self(), {error, connect_failed}}
end.
start_link(Host, Port, LogFun, Parent) when is_list(Host), is_integer(Port) ->
RecvPid =
spawn_link(fun () ->
init(Host, Port, LogFun, Parent)
end),
%% wait for the socket from the spawned pid
receive
{mysql_recv, RecvPid, init, {error, E}} ->
{error, E};
{mysql_recv, RecvPid, init, {ok, Socket}} ->
{ok, RecvPid, Socket}
after ?CONNECT_TIMEOUT ->
catch exit(RecvPid, kill),
{error, "timeout"}
end.
mysql_recv:start_link/4使用spawn_link启动mysql_conn子进程mysql_recv,返回mysql_recv的PId。
init(Host, Port, LogFun, Parent) ->
case gen_tcp:connect(Host, Port, [binary, {packet, 0}, {keepalive, true}]) of
{ok, Sock} ->
Parent ! {mysql_recv, self(), init, {ok, Sock}},
State = #state{socket = Sock,
parent = Parent,
log_fun = LogFun,
data = <<>>
},
loop(State);
E ->
LogFun(?MODULE, ?LINE, error,
fun() ->
{"mysql_recv: Failed connecting to ~p:~p : ~p",
[Host, Port, E]}
end),
Msg = lists:flatten(io_lib:format("connect failed : ~p", [E])),
Parent ! {mysql_recv, self(), init, {error, Msg}}
end.