erlang_mysql_driver 源码分析3

邹弘
2023-12-01

mysql_conn 与 mysql_dispatcher

前面我们讲了,mysql_conn与mysql_dispatcher的关系,mysql_dispatcher维护多个连接池,每个连接池里有多个mysl_conn进程。我们查询sql时,由mysql_dispathcer选择合适的mysql_conn执行sql。
接下来,关心的是mysql_conn是如何执行sql的?

mysql_conn 与 mysql 数据库的通信

mysql_conn 与 mysql数据库交互,必然会涉及到通信协议。mysql通信协议使用的是tcp,不过其实mysql_conn负责的只是发送数据,mysql_recv负责接收数据,并把接收到的数据转发给mysql_conn。
mysql_conn发送数据使用的是gen_tcp:send(Socket, Data)
mysql_recv接收数据使用的是非阻塞的loop循环,一开始gen_tcp:connect到mysql的时候,默认使用的参数是{active, true}。所以从mysql发来的消息,都会放入mysql_recv进程的消息队列中。


mysql通信协议

探究mysql_conn与mysql数据库的交互过程,必须先说下mysql自身的通信协议。
mysql通信协议使用的是tcp,大体可以分为两种情况
1. 初始化过程

    1. Client -> Server 发起tcp连接
    1. Server -> Client 握手初始化包
    1. Client -> Server 认证包
    1. Server -> Client 认证结果

2、执行sql过程

    1. Client -> Server 命令包
    1. Server -> Client ok、error、结果包

mysql_conn 进程

mysql_conn的启动

mysql_conn进程是在mysql添加connect的时候启动的,或者mysql_dispatcher启动时启动的。
通过mysql_conn:start或start_link启动,mysql_conn进程。但是需要注意的是,mysql_conn并不是一个gen_server进程,其调用 spawn 或 spawn_link启动。

start(Host, Port, User, Password, Database, LogFun, Encoding, PoolId) ->
    ConnPid = self(),
    Pid = spawn(fun () ->
            init(Host, Port, User, Password, Database,
                 LogFun, Encoding, PoolId, ConnPid)
        end),
    post_start(Pid, LogFun).

我们可以看start的代码,这是一个有意思的设计,当我们调用mysql_conn:start的时候。调用者进程会spawn出一个mysql_conn进程(这里我们使用mysql_conn为进程名,是因为该进程的主要逻辑都是在mysql_conn模块里的)。
然后便会阻塞在receive中,这时候的调用者进程其实是一直在等待spawn出来的mysql_conn进程返回消息,从而确定启动无误。
这里我们可能会有个疑问,spawn成功后,为什么还要多此一举让spawn出来的进程再发条消息过来呢?
因为,spawn进程成功后,并不能代表该进程可以顺利和mysql通信,如果这个进程一开始创建了就没法通信,那么这个进程也就不能说创建成功了。
对此,我们可以看下,mysql_conn:init

init(Host, Port, User, Password, Database, LogFun, Encoding, PoolId, Parent) ->
    case mysql_recv:start_link(Host, Port, LogFun, self()) of
    {ok, RecvPid, Sock} ->
        case mysql_init(Sock, RecvPid, User, Password, LogFun) of
        {ok, Version} ->
            Db = iolist_to_binary(Database),
            case do_query(Sock, RecvPid, LogFun,
                  <<"use `", Db/binary, "`">>,
                  Version) of
            {error, MySQLRes} ->
                ?Log2(LogFun, error,
                 "mysql_conn: Failed changing to database "
                 "~p : ~p",
                 [Database,
                  mysql:get_result_reason(MySQLRes)]),
                Parent ! {mysql_conn, self(),
                      {error, failed_changing_database}};

            %% ResultType: data | updated
            {_ResultType, _MySQLRes} ->
                Parent ! {mysql_conn, self(), ok},
                case Encoding of
                undefined -> undefined;
                _ ->
                    EncodingBinary = list_to_binary(atom_to_list(Encoding)),
                    do_query(Sock, RecvPid, LogFun,
                         <<"set names '", EncodingBinary/binary, "'">>,
                         Version)
                end,
                State = #state{mysql_version=Version,
                       recv_pid = RecvPid,
                       socket   = Sock,
                       log_fun  = LogFun,
                       pool_id  = PoolId,
                       data     = <<>>
                      },
                loop(State)
            end;
        {error, _Reason} ->
            Parent ! {mysql_conn, self(), {error, login_failed}}
        end;
    E ->
        ?Log2(LogFun, error,
         "failed connecting to ~p:~p : ~p",
         [Host, Port, E]),
        Parent ! {mysql_conn, self(), {error, connect_failed}}
    end.

看代码我们可以发现,mysql_recv出现了。没错,mysql_recv马上去和mysql建立tcp连接,并返回socket。注意这时候,mysql_conn可以知道了socket,但是socket的控制进程始终还是mysql_recv,所以从mysql服务端发来的消息还是由mysql_recv进程收到的。

先不扯mysql_recv。我们再看下mysql_conn:init,在mysql_recv成功建立tcp连接的基础上,我们的mysql_conn要发起认证请求啦!这个过程可以看上文说到的mysql通信协议。
看下mysql_init的代码

mysql_init(Sock, RecvPid, User, Password, LogFun) ->
    case do_recv(LogFun, RecvPid, undefined) of
    {ok, <<255:8, Rest/binary>>, _InitSeqNum} ->
        {Code, ErrData} = get_error_data(Rest, ?MYSQL_4_0),
        ?Log2(LogFun, error, "init error ~p: ~p",
          [Code, ErrData]),
        {error, ErrData};
    {ok, Packet, InitSeqNum} ->
        {Version, Salt1, Salt2, Caps} = greeting(Packet, LogFun),
        AuthRes =
        case Caps band ?SECURE_CONNECTION of
            ?SECURE_CONNECTION ->
            mysql_auth:do_new_auth(
              Sock, RecvPid, InitSeqNum + 1,
              User, Password, Salt1, Salt2, LogFun);
            _ ->
            mysql_auth:do_old_auth(
              Sock, RecvPid, InitSeqNum + 1, User, Password,
              Salt1, LogFun)
        end,
        case AuthRes of
        {ok, <<0:8, _Rest/binary>>, _RecvNum} ->
            {ok,Version};
        {ok, <<255:8, Rest/binary>>, _RecvNum} ->
            {Code, ErrData} = get_error_data(Rest, Version),
            ?Log2(LogFun, error, "init error ~p: ~p",
             [Code, ErrData]),
            {error, ErrData};
        {ok, RecvPacket, _RecvNum} ->
            ?Log2(LogFun, error,
              "init unknown error ~p",
              [binary_to_list(RecvPacket)]),
            {error, binary_to_list(RecvPacket)};
        {error, Reason} ->
            ?Log2(LogFun, error,
              "init failed receiving data : ~p", [Reason]),
            {error, Reason}
        end;
    {error, Reason} ->
        {error, Reason}
    end.

mysql_recv建立连接后,一收到消息就会发消息给mysql_conn。在刚开始发起连接成功后,mysql_conn就一直在等待mysql_recv发消息过来,这里的等待是在do_recv中进行的,这是一个阻塞的等待。实际每一次的具体操作,mysql_conn都会阻塞等待相应的返回。

这个时候,mysql_conn就会收到mysql服务端发过来的握手初始化包。这时候问题又来了,这个初始化包是什么东西。其实里面的东西是mysql规定的,大概说明了mysql的版本,支持什么功能,密码要怎么加密等等。

然后,mysql_conn就赶紧按这个包说明的情况去发起认证,这就是mysql_auth的作用了。
mysql_auth就按上述说的,可以do_old_auth或者do_new_auth。这个过程就涉及到crypto sha加密过程。
这个mysql_auth:do_auth的过程也是阻塞的,mysql_conn需要知道成功与否才能进行下一步。
如果这个时候,我们顺利收到mysql服务端发来的验证成功包,那么mysql_init的过程就顺利完成了。


mysql_conn loop

mysql_conn 初始化成功后,就一直在loop中等待各种业务操作的请求了。

mysql_recv loop

 类似资料: