erlang_mysql_driver源码阅读笔记

陆翰学
2023-12-01

erlang_mysql_driver源码阅读笔记

最近项目里发现了一个比较严重的bug,线上服务器游戏服创建了大量的mysql连接没有释放,轻松达到了设定的连接数上限,因为使用了第三方的驱动erlang_mysql_driver,排查的过程中就研究了下驱动的源码。

结构

代码比较简单,四个文件组成:
mysql.erl:唯一一个behavior为gen_server的模块,主要封装了一些外部调用的api(比如执行sql语句)以及管理连接池。
mysql_conn.erl:负责处理一条与mysql数据库的连接,可单独使用,也可通过mysql模块管理多条连接,甚至可以连接到不同的服务器。sql语句的执行和返回结果也是由这个模块处理的。
mysql_recv.erl:负责实际与mysql的tcp连接,将结果返回给父进程(mysql_conn).
mysql_auth.erl:提供给mysql_conn模块调用的接口,认证相关,可以忽略。

mysql_dispatcher进程启动与初始化

启动

mysql模块提供两种启动方式:start_link和start,实际上都是由start1函数处理,区别就是start1最终调用的是gen_server:start_link还是gen_server:start(前者可以启动在supervisor监督树下,后者则是启动独立的进程)。
要注意的是,mysql启动的进程别名不是mysql而是mysql_dispatcher.

初始化

init([PoolId, Host, Port, User, Password, Database, LogFun, Encoding]) ->
    LogFun1 = if LogFun == undefined -> fun log/4; true -> LogFun end,
    case mysql_conn:start(Host, Port, User, Password, Database, LogFun1,
              Encoding, PoolId) of
    {ok, ConnPid} ->
        Conn = new_conn(PoolId, ConnPid, true, Host, Port, User, Password,
                Database, Encoding),
        State = #state{log_fun = LogFun1},
        {ok, add_conn(Conn, State)};
    {error, _Reason} ->
        ?Log(LogFun1, error,
         "failed starting first MySQL connection handler, "
         "exiting"),
         C = #conn{pool_id = PoolId,
          reconnect = true,
          host = Host,
          port = Port,
          user = User,
          password = Password,
          database = Database,
          encoding = Encoding},
          start_reconnect(C, LogFun),
        {ok, #state{log_fun = LogFun1}}
    end.

init函数主要做了两件事:
一是启动一个mysql_conn进程,如果正常启动就继续,如果启动失败则调用msql:start_reconnect/2进行重连。
二是如果启动成功,调用mysql:add_conn/2将启动的连接进程信息(比如pid,poolId,端口等)保存到#state{}中。

add_conn(Conn, State) ->
    Pid = Conn#conn.pid,
    erlang:monitor(process, Conn#conn.pid),
    PoolId = Conn#conn.pool_id,
    ConnPools = State#state.conn_pools,
    NewPool = 
	case gb_trees:lookup(PoolId, ConnPools) of
	    none ->
		{[Conn],[]};
	    {value, {Unused, Used}} ->
		{[Conn | Unused], Used}
	end,
    State#state{conn_pools =
		gb_trees:enter(PoolId, NewPool,
			       ConnPools),
		pids_pools = gb_trees:enter(Pid, PoolId,
					    State#state.pids_pools)}.

这个add_conn函数值得仔细关注下:

  1. 当调用add_conn时,mysql_dispatcher首先会调用erlang:monitor建立一个进程的单向监控,目的是当mysql_conn异常挂掉时监控进程mysql_dispatcher收到来自于子进程的‘DOWN’消息.
  2. mysql_dispatcher进程维护两个数据结构——ConnPools连接池和PidPools进程池。使用gb_trees二叉平衡树存储key-value类型的数据结构:
    a. ConnPools:key为PoolId::term(连接池id),value为{Unused::Lists, Used::Lists}元组,其中Unused为未使用的连接列表,Used为使用中的连接列表。
    b. PidPools:key为Pid::pid(进程id),value为PoolId::term(连接池id)

mysql_dispather的重连机制

当mysql:init/1启动mysql:start失败或者当收到‘DOWN’消息且#conn.reconnect字段为true时,首先将旧的连接通过mysql:remove_conn/2移除,然后触发调用mysql:start_reconnect/2函数进行重连。

start_reconnect(Conn, LogFun) ->
    Pid = spawn(fun () ->
      process_flag(trap_exit, true),
			reconnect_loop(Conn#conn{pid = undefined}, LogFun, 0)
		end),
    {PoolId, Host, Port} = {Conn#conn.pool_id, Conn#conn.host, Conn#conn.port},
    ?Log2(LogFun, debug,
	"started pid ~p to try and reconnect to ~p:~s:~p (replacing "
	"connection with pid ~p)",
	[Pid, PoolId, Host, Port, Conn#conn.pid]),
    ok.

reconnect_loop(Conn, LogFun, N) ->
    {PoolId, Host, Port} = {Conn#conn.pool_id, Conn#conn.host, Conn#conn.port},
    case connect(PoolId,
		 Host,
		 Port,
		 Conn#conn.user,
		 Conn#conn.password,
		 Conn#conn.database,
		 Conn#conn.encoding,
		 Conn#conn.reconnect) of
	{ok, ConnPid} ->
	    ?Log2(LogFun, debug,
		"managed to reconnect to ~p:~s:~p "
		"(connection pid ~p)", [PoolId, Host, Port, ConnPid]),
	    ok;
	{error, Reason} ->
	    %% log every once in a while
      NewN = case N of
                 1 ->
               ?Log2(LogFun, debug,
                   "reconnect: still unable to connect to "
                   "~p:~s:~p (~p)", [PoolId, Host, Port, Reason]),
               0;
                 _ ->
               N + 1
             end,
	    %% sleep between every unsuccessful attempt
	    timer:sleep(5000),
	    reconnect_loop(Conn, LogFun, NewN)
    end.

这里mysql_dispatcher并没有自己阻塞住去执行mysql:reconnect_loop/3循环,而是spawn一个临时进程去执行这个循环,并通过process_flag(trap_exit,true)来避免这个进程被异常关闭。mysql:
reconnect_loop/3函数调用mysql:connect/8建立新的连接,并打印重连日志。

启动mysql_conn进程

connect(PoolId, Host, Port, User, Password, Database, Encoding, Reconnect,
       LinkConnection) ->
    Port1 = if Port == undefined -> ?PORT; true -> Port end,
    Fun = if LinkConnection ->
		  fun mysql_conn:start_link/8;
	     true ->
		  fun mysql_conn:start/8
	  end,

   {ok, LogFun} = gen_server:call(?SERVER, get_logfun),
    case Fun(Host, Port1, User, Password, Database, LogFun,
	     Encoding, PoolId) of
	{ok, ConnPid} ->
	    Conn = new_conn(PoolId, ConnPid, Reconnect, Host, Port1, User,
			    Password, Database, Encoding),
	    case gen_server:call(
		   ?SERVER, {add_conn, Conn}) of
		ok ->
		    {ok, ConnPid};
		Res ->
		    Res
	    end;
	Err->
	    Err
    end.

mysql:connect /8将会启动一个mysql_conn进程负责与mysql数据库连接,需要传递给mysql_conn对应的PoolId,启动成功返回mysql_conn的进程PId。
注意,mysql_dispatcher进程并不是mysql_conn的父进程与监督者,当LinkConnection参数为start_link时,调用mysql:connect的进程才是真正的监督者。mysql_dispatcher进程只是负责管理连接池并进行调度。

mysql:fetch

fetch_queries(PoolId, From, State, QueryList) ->
    with_next_conn(
      PoolId, State,
      fun(Conn, State1) ->
	      Pid = Conn#conn.pid,
	      mysql_conn:fetch(Pid, QueryList, From),
	      {noreply, State1}
      end).

with_next_conn(PoolId, State, Fun) ->
    case get_next_conn(PoolId, State) of
	{ok, Conn, NewState} ->    
	    Fun(Conn, NewState);
	error ->
	    %% we have no active connection matching PoolId
	    {reply, {error, {no_connection_in_pool, PoolId}}, State}
    end.
get_next_conn(PoolId, State) ->
    ConnPools = State#state.conn_pools,
    case gb_trees:lookup(PoolId, ConnPools) of
	none ->
	    error;
	{value, {[],[]}} ->
	    error;

	{value, {[], Used}} ->
	    [Conn | Conns] = lists:reverse(Used),
	    {ok, Conn,
	     State#state{conn_pools =
			 gb_trees:enter(PoolId, {Conns, [Conn]}, ConnPools)}};
	{value, {[Conn|Unused], Used}} ->
	    {ok, Conn, State#state{
			 conn_pools =
			 gb_trees:enter(PoolId, {Unused, [Conn|Used]},
					ConnPools)}}
    end.

mysql_dispatcher负责将业务进程的fetch操作调度给不同的mysql_conn连接。
当有一个新的fetch请求时,通过get_next_conn获取下一个可用的mysql_conn连接进程,调度算法是:

  1. ConnPool维护两个列表,一个unused连接池,一个used连接池。
  2. 当unused连接池不为空时,取出第一个连接放到used连接池中。
  3. 当unused连接池为空时,翻转used连接池作为新的unused连接池并清空used连接池,最后再执行第2步。
    值得注意的是,mysql_dispatcher只是通过调度算法将消息转发到mysql_conn进程(其中From是一个{Pid, Tag}元组,用来标识发送进程),并返回{noreply, NewState},业务进程仍处在阻塞状态中。
send_reply(GenSrvFrom, Res) when is_pid(GenSrvFrom) ->    
    GenSrvFrom ! {fetch_result, self(), Res};
send_reply(GenSrvFrom, Res) ->
    gen_server:reply(GenSrvFrom, Res).

只有mysql_conn查询结束后,通过gen_server:reply将结果返回From的业务进程,业务进程才会结束阻塞。

mysql_conn进程启动与初始化

启动

start(Host, Port, User, Password, Database, LogFun, Encoding, PoolId) ->
	start(Host, Port, User, Password, Database, LogFun, Encoding, PoolId,
	      false).

start(Host, Port, User, Password, Database, LogFun, Encoding, PoolId,
      FoundRows) ->
    ConnPid = self(),
    Pid = spawn(fun () ->
			init(Host, Port, User, Password, Database,
			     LogFun, Encoding, PoolId, ConnPid, FoundRows)
		end),
    post_start(Pid).

start_link(Host, Port, User, Password, Database, LogFun, Encoding, PoolId) ->
	start_link(Host, Port, User, Password, Database, LogFun, Encoding,
	           PoolId, false).

start_link(Host, Port, User, Password, Database, LogFun, Encoding, PoolId,
           FoundRows) ->
    ConnPid = self(),
    Pid = spawn_link(fun () ->
			     init(Host, Port, User, Password, Database,
				  LogFun, Encoding, PoolId, ConnPid, FoundRows)
		     end),
    post_start(Pid).

mysql_conn提供两种启动mysql_conn进程的方式:start和start_link,并不是通过调用gen_server的接口,而是直接通过spawn和spawn_link启动,返回启动后进程的PId。

初始化

init(Host, Port, User, Password, Database, LogFun, Encoding, PoolId, Parent,
     FoundRows) ->
    case mysql_recv:start_link(Host, Port, LogFun, self()) of
	{ok, RecvPid, Sock} ->
	    case mysql_init(Sock, RecvPid, User, Password, LogFun, FoundRows) of
		{ok, Version} ->
		    Db = iolist_to_binary(Database),
		    case do_query(Sock, RecvPid, LogFun,
				  <<"use `", Db/binary, "`">>,
				  Version) of
			{error, MySQLRes} ->
			    ?Log2(LogFun, error,
				 "mysql_conn: Failed changing to database "
				 "~p : ~p",
				 [Database,
				  mysql:get_result_reason(MySQLRes)]),
			    Parent ! {mysql_conn, self(),
				      {error, failed_changing_database}};

			%% ResultType: data | updated
			{_ResultType, _MySQLRes} ->
			    Parent ! {mysql_conn, self(), ok},
			    case Encoding of
				undefined -> undefined;
				_ ->
				    EncodingBinary = list_to_binary(atom_to_list(Encoding)),
				    do_query(Sock, RecvPid, LogFun,
					     <<"set names '", EncodingBinary/binary, "'">>,
					     Version)
			    end,
			    State = #state{mysql_version=Version,
					   recv_pid = RecvPid,
					   socket   = Sock,
					   log_fun  = LogFun,
					   pool_id  = PoolId,
					   data     = <<>>
					  },
			    loop(State)
		    end;
		{error, _Reason} ->
		    Parent ! {mysql_conn, self(), {error, login_failed}}
	    end;
	E ->
	    ?Log2(LogFun, error,
		 "failed connecting to ~p:~p : ~p",
		 [Host, Port, E]),
	    Parent ! {mysql_conn, self(), {error, connect_failed}}
    end.

mysql_recv进程的启动与初始化

启动

start_link(Host, Port, LogFun, Parent) when is_list(Host), is_integer(Port) ->
    RecvPid =
	spawn_link(fun () ->
			   init(Host, Port, LogFun, Parent)
		   end),
    %% wait for the socket from the spawned pid
    receive
	{mysql_recv, RecvPid, init, {error, E}} ->
	    {error, E};
	{mysql_recv, RecvPid, init, {ok, Socket}} ->
	    {ok, RecvPid, Socket}
    after ?CONNECT_TIMEOUT ->
	    catch exit(RecvPid, kill),
	    {error, "timeout"}
    end.

mysql_recv:start_link/4使用spawn_link启动mysql_conn子进程mysql_recv,返回mysql_recv的PId。

初始化

init(Host, Port, LogFun, Parent) ->
    case gen_tcp:connect(Host, Port, [binary, {packet, 0}, {keepalive, true}]) of
	{ok, Sock} ->
	    Parent ! {mysql_recv, self(), init, {ok, Sock}},
	    State = #state{socket  = Sock,
			   parent  = Parent,
			   log_fun = LogFun,
			   data    = <<>>
			  },
	    loop(State);
	E ->
	    LogFun(?MODULE, ?LINE, error,
		   fun() ->
			   {"mysql_recv: Failed connecting to ~p:~p : ~p",
			    [Host, Port, E]}
		   end),
	    Msg = lists:flatten(io_lib:format("connect failed : ~p", [E])),
	    Parent ! {mysql_recv, self(), init, {error, Msg}}
    end.

 类似资料: