当前位置: 首页 > 工具软件 > ejabberd > 使用案例 >

Ejabberd源码解读-ejabberd_listener模块

拓拔骁
2023-12-01

ejabberd_listener模块启动分为两部分
1 启动端口监听
查看ejabberd_app.erl文件,启动过程语句

%%%启动一个supervisor,并启动和监控定义子进程
Sup = ejabberd_sup:start_link()

其中启动项包括ejabberd_listener:start_link/0

Listener =
    {ejabberd_listener,
     {ejabberd_listener, start_link, []},
     permanent,
     infinity,
     supervisor,
     [ejabberd_listener]},

以下为程序启动跟踪流程

start_link() ->
    supervisor:start_link({local, ejabberd_listeners}, ?MODULE, []).
init(_) ->
    %%%新建listen_sockets表
    ets:new(listen_sockets, [named_table, public]),
    %%%端口绑定
    bind_tcp_ports(),
    {ok, {{one_for_one, 10, 1}, []}}.
bind_tcp_ports() ->
    %%%获取配置文件信息
    case ejabberd_config:get_option(listen, fun validate_cfg/1) of
    undefined ->
        ignore;
    Ls ->
        lists:foreach(
          fun({Port, Module, Opts}) ->
              ModuleRaw = strip_frontend(Module),
              case ModuleRaw:socket_type() of
              independent -> ok;
              _ ->
                  bind_tcp_port(Port, Module, Opts)
              end
          end, Ls)
    end.

注:查看ejabberd_config:get_option/2执行结果,由此可知,配置文件需监听3端口详细配置
(可调用执行ejabberd_config:get_option(listen,fun ejabberd_listener:validate_cfg/1).)

[{{5222,{0,0,0,0},tcp},
  ejabberd_c2s,
  [{access,c2s},{shaper,c2s_shaper},{max_stanza_size,65536}]},
 {{5269,{0,0,0,0},tcp},ejabberd_s2s_in,[]},
 {{5280,{0,0,0,0},tcp},
  ejabberd_http,
  [{captcha,true},
   {http_bind,true},
   {web_admin,true},
   {request_handlers,[{<<"/websocket">>,ejabberd_http_ws}]}]}]

继续跟踪

bind_tcp_port(PortIP, Module, RawOpts) ->
    try check_listener_options(RawOpts) of
    ok ->
        {Port, IPT, IPS, IPV, Proto, OptsClean} = parse_listener_portip(PortIP, RawOpts),
        {_Opts, SockOpts} = prepare_opts(IPT, IPV, OptsClean),
        case Proto of
        udp -> ok;
        _ ->
            %%%TCP
            ListenSocket = listen_tcp(PortIP, Module, SockOpts, Port, IPS),
            %%%监听端口成功,插入listen_sockets表
            ets:insert(listen_sockets, {PortIP, ListenSocket}),
                    ok
        end
    catch
    throw:{error, Error} ->
        ?ERROR_MSG(Error, [])
    end.
listen_tcp(PortIP, Module, SockOpts, Port, IPS) ->
    %%%判断当前端口是否存在listen_sockets表
    case ets:lookup(listen_sockets, PortIP) of
    [{PortIP, ListenSocket}] ->
        ?INFO_MSG("Reusing listening port for ~p", [PortIP]),
        ets:delete(listen_sockets, PortIP),
        ListenSocket;
    _ ->
        Res = gen_tcp:listen(Port, [binary,
                    {packet, 0},
                    {active, false},
                    {reuseaddr, true},
                    {nodelay, true},
                    {send_timeout, ?TCP_SEND_TIMEOUT},
                    {send_timeout_close, true},
                    {keepalive, true} |
                    SockOpts]),
        case Res of
        {ok, ListenSocket} ->
            ListenSocket;
        {error, Reason} ->
            socket_error(Reason, PortIP, Module, SockOpts, Port, IPS)
        end
    end.

由上述可知以上分别启动了三个端口监听服务:
1 ejabberd_c2s作为连接客户端的请求服务
2 ejabberd_s2s_in作为连接服务器的请求服务
3 ejabberd_http作为http的请求服务

2 开启连接接收
查看ejabberd_app.erl文件,启动过程语句

%%%启动配置文件中的监听器模块
ejabberd_listener:start_listeners()

以下为程序跟踪流程

start_listeners() ->
    case ejabberd_config:get_option(listen, fun validate_cfg/1) of
    undefined ->
        ignore;
    Ls ->
        Ls2 = lists:map(
            fun({Port, Module, Opts}) ->
                case start_listener(Port, Module, Opts) of
                {ok, _Pid} = R -> R;
                {error, Error} ->
                throw(Error)
            end
        end, Ls),
        report_duplicated_portips(Ls),
        {ok, {{one_for_one, 10, 1}, Ls2}}
    end.
%%%端口复用说明
report_duplicated_portips(L) ->
    LKeys = [Port || {Port, _, _} <- L],
    LNoDupsKeys = proplists:get_keys(L),
    case LKeys -- LNoDupsKeys of
    [] -> ok;
    Dups ->
        ?CRITICAL_MSG("In the ejabberd configuration there are duplicated "
              "Port number + IP address:~n  ~p",
              [Dups])
  end.
%%% @spec (Port, Module, Opts) -> {ok, Pid} | {error, Error}
start_listener(Port, Module, Opts) ->
    case start_listener2(Port, Module, Opts) of
    {ok, _Pid} = R -> R;
    {error, {{'EXIT', {undef, [{M, _F, _A}|_]}}, _} = Error} ->
        ?ERROR_MSG("Error starting the ejabberd listener: ~p.~n"
               "It could not be loaded or is not an ejabberd listener.~n"
               "Error: ~p~n", [Module, Error]),
        {error, {module_not_available, M}};
    {error, {already_started, Pid}} ->
        {ok, Pid};
    {error, Error} ->
        {error, Error}
%%% @spec (Port, Module, Opts) -> {ok, Pid} | {error, Error}
start_listener2(Port, Module, Opts) ->
    %%% It is only required to start the supervisor in some cases.
    %%% But it doesn't hurt to attempt to start it for any listener.
    %%% So, it's normal (and harmless) that in most cases this call returns: {error, {already_started, pid()}}
    maybe_start_sip(Module),
    start_module_sup(Port, Module),
    start_listener_sup(Port, Module, Opts).
start_module_sup(_Port, Module) ->
    Proc1 = gen_mod:get_module_proc(<<"sup">>, Module),
    ChildSpec1 =
    {Proc1,
     {ejabberd_tmp_sup, start_link, [Proc1, strip_frontend(Module)]},
     permanent,
     infinity,
     supervisor,
     [ejabberd_tmp_sup]},
    supervisor:start_child(ejabberd_sup, ChildSpec1).

注:测试gen_mod:get_module_proc(<<”sup”>>, Module)执行结果
(可调用执行gen_mod:get_module_proc(<<”sup”>>, ejabberd_c2s)=>ejabberd_c2s_sup)
由此可知start_module_sup会产生ejabberd_c2s_sup,ejabberd_s2s_sup,ejabberd_http_sup三个督程

start_listener_sup(Port, Module, Opts) ->
    ChildSpec = {Port,
         {?MODULE, start, [Port, Module, Opts]},
         transient,
         brutal_kill,
         worker,
         [?MODULE]},
    supervisor:start_child(ejabberd_listeners, ChildSpec).

注:由以上函数可知start_listener_sup会产生三个子进程,用于接受外部连接,跟踪函数继续执行,最后会调用accept/3接受连接

start(Port, Module, Opts) ->
    %%% Check if the module is an ejabberd listener or an independent listener
    ModuleRaw = strip_frontend(Module),
    case ModuleRaw:socket_type() of
    independent -> ModuleRaw:start_listener(Port, Opts);
    _ -> start_dependent(Port, Module, Opts)
    end.
%%% @spec(Port, Module, Opts) -> {ok, Pid} | {error, ErrorMessage}
start_dependent(Port, Module, Opts) ->
    try check_listener_options(Opts) of
    ok ->
        proc_lib:start_link(?MODULE, init, [Port, Module, Opts])
    catch
    throw:{error, Error} ->
        ?ERROR_MSG(Error, []),
        {error, Error}
    end.
init(PortIP, Module, RawOpts) ->
    {Port, IPT, IPS, IPV, Proto, OptsClean} = parse_listener_portip(PortIP, RawOpts),
    {Opts, SockOpts} = prepare_opts(IPT, IPV, OptsClean),
    if Proto == udp ->
        init_udp(PortIP, Module, Opts, SockOpts, Port, IPS);
       true ->
        init_tcp(PortIP, Module, Opts, SockOpts, Port, IPS)
    end.

init_tcp(PortIP, Module, Opts, SockOpts, Port, IPS) ->
    ListenSocket = listen_tcp(PortIP, Module, SockOpts, Port, IPS),
    %%% Inform my parent that this port was opened succesfully
    proc_lib:init_ack({ok, self()}),
    case erlang:function_exported(Module, tcp_init, 2) of
    false ->
        accept(ListenSocket, Module, Opts);
    true ->
        case catch Module:tcp_init(ListenSocket, Opts) of
        {'EXIT', _} = Err ->
            ?ERROR_MSG("failed to process callback function "
                   "~p:~s(~p, ~p): ~p",
                   [Module, tcp_init, ListenSocket, Opts, Err]),
            accept(ListenSocket, Module, Opts);
        NewOpts ->
            accept(ListenSocket, Module, NewOpts)
        end
    end.

接受外部连接时,accept函数,根据CallMod模块参数调用特定模块

CallMod = case is_frontend(Module) of
              true -> ejabberd_frontend_socket;
              false -> ejabberd_socket
              end,
        CallMod:start(strip_frontend(Module), gen_tcp, Socket, Opts)

例CallMod==ejabberd_socket则进入ejabberd_socket模块
ejabberd_socket:start(strip_frontend(Module), gen_tcp, Socket, Opts)

start(Module, SockMod, Socket, Opts) ->
    case Module:socket_type() of
      xml_stream ->
      MaxStanzaSize = case lists:keysearch(max_stanza_size, 1,
                           Opts)
                  of
                {value, {_, Size}} -> Size;
                _ -> infinity
              end,
      {ReceiverMod, Receiver, RecRef} = case catch
                           SockMod:custom_receiver(Socket)
                        of
                          {receiver, RecMod, RecPid} ->
                          {RecMod, RecPid, RecMod};
                          _ ->
                          %%%启动ejabberd_receiver进程
                          RecPid =
                              ejabberd_receiver:start(Socket,
                                          SockMod,
                                          none,
                                          MaxStanzaSize),
                          {ejabberd_receiver, RecPid,
                           RecPid}
                        end,
      SocketData = #socket_state{sockmod = SockMod,
                     socket = Socket, receiver = RecRef},
      %%%启动ejabberd_c2s进程/ejabberd_s2s进程/ejabberd_http进程
      case Module:start({?MODULE, SocketData}, Opts) of
        {ok, Pid} ->
        case SockMod:controlling_process(Socket, Receiver) of
          ok -> ok;
          {error, _Reason} -> SockMod:close(Socket)
        end,
        ReceiverMod:become_controller(Receiver, Pid);
        {error, _Reason} ->
        SockMod:close(Socket),
        case ReceiverMod of
          ejabberd_receiver -> ReceiverMod:close(Receiver);
          _ -> ok
        end
      end;
      independent -> ok;
      raw ->
      case Module:start({SockMod, Socket}, Opts) of
        {ok, Pid} ->
        case SockMod:controlling_process(Socket, Pid) of
          ok -> ok;
          {error, _Reason} -> SockMod:close(Socket)
        end;
        {error, _Reason} -> SockMod:close(Socket)
      end
    end.

如上所示ejabberd_socket:start/4会启动两个进程(每一个socket连接都会分别起一个c2s和receiver进程)
1 ejabberd_socket:start/4启动ejabberd_receiver模块
调用ejabberd_receiver:start/4:

start(Socket, SockMod, Shaper, MaxStanzaSize) ->
    {ok, Pid} = gen_server:start(ejabberd_receiver,
                 [Socket, SockMod, Shaper, MaxStanzaSize], []),
    Pid.

由此可知ejabberd_receiver启动的是一个gen_server服务,用于处理XML信息节并转发到逻辑处理模块

2 ejabberd_socket:start/4启动ejabberd_c2s模块(其余情况包含ejabberd_s2s ejabberd_http),以下以ejabberd_c2s为例进行跟踪
调用函数ejabberd_c2s:start/2

start(SockData, Opts) ->
    ?GEN_FSM:start(ejabberd_c2s,
           [SockData, Opts],
           fsm_limit_opts(Opts) ++ ?FSMOPTS).

由此可知ejabberd_c2s启动的是一个有限状态机gen_fsm,用于处理客户端与服务器间连接


ejabberd_socket:start/4实现Socket与Receiver绑定

gen_tcp:controlling_process(Socket, Receiver) 

(附:gen_tcp:controlling_process/2是绑定Socket与Pid,所有发送给Socket的消息都等同于发送给绑定的进程,可以通过编写Pid进程模块handle_info/2函数进行响应的处理)

发送绑定成功消息,通知receiver进程c2s进程pid,更新State(包含c2s_pid和xml_stream_state两项),调用handle_call/3如下

handle_call({become_controller, C2SPid}, _From, State) ->
    XMLStreamState = xml_stream:new(C2SPid, State#state.max_stanza_size),
    NewState = State#state{c2s_pid = C2SPid,
                           xml_stream_state = XMLStreamState},
    activate_socket(NewState),
    Reply = ok,
    {reply, Reply, NewState, ?HIBERNATE_TIMEOUT};

以上xml_stream以C2SPid进行初始化,后期receiver模块调用xml_stream:parse/2解析xml时根据c2s_pid向c2s进行发送解析内容

ejabberd_receiver:become_controller(Receiver,Pid)

ejabberd_receiver模块简单说明:
gen_tcp:controlling_process/2执行成功,会将socket消息全部转发到receiver模块,依靠hand_info/2处理消息,处理过程会解析xml消息内容,然后转发给c2s模块

补充-关于pro_lib
如何使用 proc_lib 中的函数创建进程?
用 proc_lib 模块中存在的若干函数来启动进程,例如异步启动的 spawn_link/3,4 以及同步启动的 start_link/3,4,5
使用这些函数中的任何一个启动的进程都会储存监督树所必须的信息。
当使用 proc_lib:start_link 以同步方式启动进程时,调用进程直到 proc_lib:init_ack 被调用后才返回,所以必须成对使用

 类似资料: