当前位置: 首页 > 工具软件 > Multi-OTP > 使用案例 >

第22章 OTP介绍

乐正玺
2023-12-01

1.简单服务器的架构

  • 对于该服务器主要使用的是第12章所用的知识,包括在rpc,loop()中实现对response的绑定,所用到的新知识点是回调函数的应用
    %server1.erl
    -module(server1).
    -export([start/2,rpc/2]).
    
    start(Name,Mod) ->
        register(Name,spawn(fun()->loop(Name,Mod,Mod:init()) end)).
        %register:注册进程,spawn:创建一个并发进程
    rpc(Name,Request) ->
        Name!{self(),Request},
        receive
            {Name,Response} ->Response
        end.
    loop(Name,Mod,State) ->
        receive
            {From,Request}->
                {Response,State1}=Mod:handle(Request,State),
                From!{Name,Response},
                loop(Name,Mod,State1)
        end.
    %定义回调模块:name_server.erl
    -module(name_server).
    -export([init/0,add/2,find/1,handle/2]).
    -import(server1,[rpc/2]).
    
    %客户端方法
    add(Name,Place) -> 
        rpc (name_server,{add,Name,Place}).
    find(Name) -> 
        rpc(name_server,{find,Name}).
    
    %回调方法
    init() ->dict:new().
    handle({add,Name,Place},Dict) ->
        {ok,dict:store(Name,Place,Dict)};
    handle({find,Name},Dict) ->
        {dict:find(Name,Dict),Dict}.
    
    
    %调用示例:
    1>server1:start(name_server,name_server).
    true
    2>name_server:add(joe,"at home").
    ok
    3>name_server:find(joe).
    {ok,"at home"}

     

  • 调用原理:
    • 1)当调用server1:start()方法时,会注册一个Pid为name_server的进程,并调用name_server:init()方法,会创建一个dict,同时loop进入receive模式.
    • 2)当调用name_server:add()方法时,会调用导入的server1模块中的rpc方法,此方法向Pid为Name(name_server)的进程发送一个客户端自身的Pid(由self()函数产生)和request参数.
    • 3)服务端收到该客户端Pid和request,使用loop函数对其解析,此时From绑定的是客户端的Pid,解析完后向该客户端Pid发送Name参数(name_server)和response.
    • 4)客户端接收到该Name和response参数,进行判别,注意rpc中的Name参数一直已经是确定的了,所以匹配成功,则输出response.

2.实现事务管理的服务器

     所谓事务,即指当服务端奔溃或是产生了错误时,要让客户端也崩溃

%serve2.erl
-module(server2).
-export([start/2,rpc/2]).

start(Name,Mod) ->
    register(Name,spawn(fun()->loop(Name,Mod,Mod:init()) end)).
rpc(Name,Request) ->
    Name!{self(),Request},
    receive
        {Name,crash} -> exit(rpc);
        {Name,ok,Response} ->Response
    end.
loop(Name,Mod,OldState) ->
    receive
        {From,Request}->
            try Mod:hadle(Request,OldState) of 
            %执行handle方法,如果执行成功,则与of 中定义的参数进行匹配
                {Response,NewState}->
                From!{Name,ok,Response},
                loop(Name,Mod,NewState)
            catch
                _:Why -> %_ :代表不管是throw,exit,error都好,都执行后面的操作.
                    log_the_error(Name,Request,Why),
                    %发送一个消息让客户端崩溃
                    From!{Name,crash},
                    %以"初始"状态继续循环
                    loop(Name,Mod,OldState)
            end
    end.
log_the_error(Name,Request,Why)->
    io:format("Server ~p request ~p~n"
    "caused exception ~p~n",
    [Name,Request,Why]).

3.实现热代码交换的服务器

     所谓的热代码交换其实主要是替换了回调函数,因为在服务器代码中并没有直接对回调函数的注入,所以可以方便的替换.

%server3.erl:
-module(server3).
-export([start/2,rpc/2,swap_code/2]).

start(Name,Mod) ->
    register(Name,spawn(fun()->
        loop(Name,Mod,Mod:init()) end)).
    swap_code(Name,Mod) -> 
        rpc(Name,{swap_code,Mod}).%模块替换方法
rpc(Name,Request) ->
    Name!{self(),Request},
    receive
        {Name,Response} ->Response
    end.
loop(Name,Mod,OldState) ->
    receive
        {From,{swap_code,NewCallBackMod}}-> 
%若是接收到的信息符合回调函数模块替换规则,则返回response=ack,同时重新调用loop方法,并传入新的回调函数模块
            From!{Name,ack},
            loop(Name,NewCallBackMod,OldState);
        {From,Request} ->
            {Response,NewState}=Mod:handle(Request,OldState),
            From! {Name,Response},
            loop(Name,Mod,NewState)
    end.
%旧回调模块: name_server1.erl
-module(name_server1).
-export([init/0,add/2,find/1,handle/2]).
-import(server3,[rpc/2]).

%客户端方法
add(Name,Place) -> 
    rpc(name_server,{add,Name,Place}).
find(Name) -> 
    rpc(name_server,{find,Name}).
%回调方法
init() ->dict:new().
handle({add,Name,Place},Dict) ->
    {ok,dict:store(Name,Place,Dict)};
handle({find,Name},Dict) ->
    {dict:find(Name,Dict),Dict}.
%新回调模块:new_name_server.erl
-module(new_name_server).
-export([init/0,add/2,all_names/0,delete/1,find/1,handle/2]).
-import(server3,[rpc/2]).

%客户端方法
all_names() -> rpc(name_server,allNames).
add(Name,Place) -> rpc (name_server,{add,Name,Place}).
delete(Name) -> rpc(name_server,{delete,Name}).
find(Name) -> rpc(name_server,{find,Name}).

%回调方法
init() ->dict:new().
handle({add,Name,Place},Dict) ->{ok,dict:store(Name,Place,Dict)};
handle(allNames,Dict) ->{dict:fetch_keys(Dict),Dict};
handle({delete,Name},Dict) ->{ok,dict:erase(Name,Dict)};
handle({find,Name},Dict) ->{dict:find(Name,Dict),Dict}.


%调用示例:
1>server3:start(name_server,name_server).
true
2>name_server1:add(joe,"at home").
ok
3>name_server1:add(helen, "at work").
ok
4>server3:swap_code(name_server,new_name_server).
ack
5>new_name_server:all_names().
[joe,helen]

4.实现事务与热代码交换

%主要是将2与3进行整合:
-module(server2).
-export([start/2,rpc/2]).

start(Name,Mod) ->
    register(Name,spawn(fun()->loop(Name,Mod,Mod:init()) end)).
swap_code(Name,Mod) -> rpc(Name,{swap_code,Mod}).
rpc(Name,Request) ->
    Name!{self(),Request},
    receive
        {Name,crash} -> exit(rpc);
        {Name,ok,Response} ->Response
    end.
loop(Name,Mod,OldState) ->
    receive
        {From,{swap_code,NewCallBackMod}}->
            From!{Name,ok,ack},
            loop(Name,NewCallBackMod,OldState);
        {From,Request} ->
            try Mod:hadle(Request,OldState) of
                {Response,NewState}->
                    From!{Name,ok,Response},
                    loop(Name,Mod,NewState)
            catch
                _:Why -> %_ :代表不管是throw,exit,error都好,都执行后面的操作.
                    log_the_error(Name,Request,Why),
                    %发送一个消息让客户端崩溃
                    From!{Name,crash},
                    %以"初始"状态继续循环
                    loop(Name,Mod,OldState)
            end
     end.
log_the_error(Name,Request,Why)->
    io:format("Server ~p request ~p~n"
    "caused exception ~p~n",
    [Name,Request,Why]).

5.实现空服务器,当接收相关指令后再转换为某一种类型的服务器

%server5.erl:
-module(server5).
-export([start/0,rpc/2]).

start()->spawn(fun()->wait() end).
wait()->​
    receive
        {become,F} ->F()
    end.
rpc(Pid,Q) ->
    Pid!{self(),Q},
    receive
        {Pid,Reply}->Reply
    end.


%调用示例: 
1>Pid=server5:start(). %假设已经实现了一个模块my_fac_server.erl
2>Pid!{becom,fun my_fac_server:loop/0}.
{become,#Fun<my_fac_server.loop.0>}
3>server5:rpc(Pid,{fac,30}).

6.gen_server

  • 1)编写gen_server回调模块的简要步骤:
    • (1)确定回调模块名
    • (2)编写接口函数
    • (3)在回调模块里编写六个必需的回调函数
  • 2)首先确定我们回调模块名和在模块中实现的方法:
    %my_bank.erl
    start() ->
        gen_server:start_link({local,?SERVER},?MODULE,[],[]).
        %start()方法必须对应start_link,因为会创建一个全局服务器.
    
    %其余的方法分别对应call方法
    stop() ->
        gen_server:call(?MODULE,stop).
    new_account(Who) ->
        gen_server:call(?MODULE,{new,Who}).
    deposit(Who,Amount) ->
        gen_server:call(?MODULE,{add,Who,Amount}).
    withdraw(Who,Amount) ->
        gen_server:call(?MODULE,{remove,Who,Amount}).
  • 3)六个必需的回调函数
    • init(Args) -> Result
      • Result = {ok,State} | {ok,State,Timeout} | {ok,State,hibernate}| {stop,Reason} | ignore
      • State = term()
      • Timeout = int()>=0 | infinity
      • 通过start或start_link初始化一个新的进程。
      • 若初始化成功返回{ok,State} | {ok,State,Timeout} | {ok,State,hibernate}。State是gen_server的内部状态;Timeout指进程初始化后等待接受请求的时间限制,超过时间限制将向handle_info发送请求为timeout的信息,默认是infinity;hibernate指可通过调用proc_lib:hibernate/3使进程进入冬眠状态从而进行GC,当有消息请求该进程,处理该请求,然后冬眠并进行GC.注意应该小心使用'hibernate',主要针对空闲时间比较长的进程,因为至少有两个GC回收器,对于请求比较平凡的进程,资源的消耗高。
      • 如果初始化失败将返回{stop,Reason} | ignore
    • handle_call(Request,From,State) -> Result
      • From = {pid(),Tag}
      • Result = {reply,Reply,NewState} | {reply,Reply,NewState,Timeout}
      • | {reply,Reply,NewState,hibernate}
      • | {noreply,NewState} | {noreply,NewState,Timeout}
      • | {noreply,NewState,hibernate}
      • | {stop,Reason,Reply,NewState} | {stop,Reason,NewState}
      • Timeout = int()>=0 | infinity
      • 处理call或multi_call的请求。
      • 若返回{reply,Reply,NewState} | {reply,Reply,NewState,Timeout} | {reply,Reply,NewState,hibernate},Reply将返回给请求函数call or multi_call.Timeout | hibernate的意义与Module:init中意义相同。
      • 若返回{noreply,NewState} | {noreply,NewState,Timeout} | {noreply,NewState,hibernate},gen_server将继续执行但没有返回,若要返回需要显示的调用gen_server:reply/2来返回。
      • 若返回{stop,Reason,Reply,NewState} | {stop,Reason,NewState} ,前者的Reply将返回给调用函数,后者没有返回,若要返回显示调用gen_server:reply/2;两者最终都将调用Module:terminate/2来终止进程。
    • handle_cast(Request,State) -> Result
      • Result = {noreply,NewState} | {noreply,NewState,Timeout}
      • | {noreply,NewState,hibernate}
      • | {stop,Reason,NewState}
      • Timeout = int()>=0 | infinity
      • 处理cast or abcast的请求。
      • 其参数的描述信息与Module:handle_call中的一致。
    • handle_info(Info,State) -> Result
      • Info = timeout | term()
      • Result = {noreply,NewState} | {noreply,NewState,Timeout}
      • | {noreply,NewState,hibernate}
      • | {stop,Reason,NewState}
      • Timeout = int()>=0 | infinity
      • Reason = normal | term()
      • 处理同步或异步异步请求的timeout信息,以及receive的信息。
      • 其参数的描述信息与Module:handle_call中的一致。
    • terminate(Reason,State) ->ok
      • Reason = normal | shutdown | {shutdown,term()} | term()
      • State = term()
      • 这个函数在gen_server终止时被调用。它和Module:init是相对的可以在终止前进行一些清理工作。当gen_server终止的Reason返回时,这个返回将被ignore.
      • 终止的Reason依赖于为什么终止,如果它是因为回调函数返回一个stop元组{stop, ...}那么终止Reason就是指定的终止原因;如果是由于失败(failure),则Reason是error原因。如果gen_server是监控树的一部分,被它的监控树有序的终止并且满足,1.被设置成可捕获的退出信号;2.关闭策略被设置成一个整数的timeout,而不是brutal_kill.则它的Reason是shutdown。甚至gen_server并不是监控树的一部分,只要它从父进程接收到'EXIT'消息,则Reason则是'EXIT'。
      • 注意:无论由于任何原因[normal | shutdown | {shutdown,term()} | term()]终止,终止原因都是由于一个error或一个error report issued(error_logger:format/2)
    • code_change(OldVsn,State,Extra) -> {ok,NewState} | {error,Reason}
      • 该函数主要用于版本的热更新
  • 4)完整结构:
    %my_bank.erl
    -module(my_bank).
    -behaviour(gen_server).
    -export([start/0,stop/0,new_account/1,deposit/2,withdraw/2]).
    %gen_server callbacks
    -export([init/1,handle_call/3,handle_cast/2,handle_info/2,terminate/2,code_change/3]).
    
    start() ->gen_server:start_link({local,?SERVER},?MODULE,[],[]).
    %会启动一个本地服务器,如果一个参数是原子global,就会启动一个能被Erlang节点集群访问的全局服务器,第二个参数是Mod,也就是回调模块名;gen_server:start_link(Name,CallBackMod,StartArgs,Opts)会启动服务器,之后第一个被调用的回调模块方法是Mod:init(StartArgs),它必须返回{ok,State},State的值作为handle_call的第三个参数重新出现;停止服务器后会返回{stop,normal,stopped,Tab},第二个参数normal被用作my_bank:terminate/2的首个参数,第三个参数stopped会成为my_bank:stop的返回值.
    
    stop() ->gen_server:call(?MODULE,stop).
    %被用来对服务器进行远程调用
    new_account(Who) ->
        gen_server:call(?MODULE,{new,Who}).
    deposit(Who,Amount) ->
        gen_server:call(?MODULE,{add,Who,Amount}).
    withdraw(Who,Amount) ->
        gen_server:call(?MODULE,{remove,Who,Amount}).
    init([]) ->
        {ok,ets:new(?MODULE,[])}.
    handle_call({new,Who},_From,Tab) ->
        Reply=case ets:lookup(Tab,Who) of
            [] -> 
                ets:insert(Tab,{Who,0}),
                {welcome,Who};    
            [_] -> 
                {Who,you_already_are_a_customer}
        end,
        {reply,Reply,Tab};
    handle_call({add,Who,X},_From,Tab) ->
        Reply=case ets:lookup(Tab,Who) of
            [] -> 
                not_a_customer;
            [{Who,Balance}] ->
                NewBalance=Balance+X,
                ets:insert(Tab,{Who,NewBalance}),
                {thanks,Who,your_balance_is,NewBalance}
        end,
        {reply,Reply,Tab};
    handle_call({remove,Who,X},_From,Tab) ->
        Reply=case ets:lookup(Tab,Who) of
            [] -> 
                not_a_customer;
            [{Who,Balance}] when X =< Balance ->
                NewBalance =Balance -X,
                ets:insert(Tab,{Who,NewBalance}),
                {thanks,Who,your_balance_is,NewBalance};
            [{Who,Balance}] ->
                {sorry,Who,you_only_have,Balance,in_the_bank}
        end,
        {reply,Reply,Tab};
    handle_call(stop,_From,Tab) ->
        {stop,normal,stopped,Tab}.
    handle_cast(_Msg,State) ->
        {noreply,State}.
    handle_info(_Info,State) ->
        {noreply,State}.
    terminate(_Reson,_State) ->
        ok.
    code_change(_OldVsn,State,_Extra) ->
        {ok,State}.
    
    
    %调用示例:
    1>my_bank:start().
    {ok,<0.33.0>}
    2>my_bank:deposit("joe",10).
    not_a_customer
    3>my_bank:new_account("joe").
    {welcome,"joe"}
    4>my_bank:deposit("joe",10).
    {thanks,"joe",your_balance_is,10}
    5>my_bank:deposit("joe",30).
    {thanks,"joe",your_balance_is,40}
    6>my_bank:withdraw("joe",15).
    {thanks,"joe",your_balance_is,25}
    7>my_bank:withdraw("joe",45).
    {sorry,"joe",you_only_have,25,in_the_bank}
 类似资料: