现在我们来到了最细节的部分。下面是写gen_server回调模块的3个要点,很简单。
这确实很简单,无须思考——只管照着做就行了。
下面要做一个非常简单的支付系统,我们把这个模块命名为my_bank。
我们会定义下面5个接口函数,它们都在my_bank模块之中。
每一个函数都意味着对gen_server中的例程的一次调用,如下:
start() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
stop() -> gen_server:call(?MODULE, stop).
new_account(Who) -> gen_server:call(?MODULE, {new, Who}).
deposit(Who, Amount) -> gen_server:call(?MODULE, {add, Who, Amount}).
withdraw(Who, Amount) -> gen_server:call(?MODULE, {remove, Who, Amount}).
gen_server:start_link({loca1,Name},Mod, , )会启动一个本地服务器。?MODULE宏展开的时候会对应模块的名称,也就是这里的my_bank。Mod是回调模块的名称。现在我们暂时忽略gen_server:start的其他参数。
gen_server:ca11 (?MODULE,Term)用来发起对服务器的远程调用。
回调模块必须开放6个回调函数:init/1、handle_ca11/3、handle_cast/2、 handle_info/2、terminate/2和code_change/3。
为了更加省事,我们可以用模板来写gen_server程序,这是最简单的模板;
gen_server_template.mini
-module().
%% gen.server .minitemp1ate
-behaviour(gen_server).
-export([startlink/0]).
%% gen_server ca11backs
-export([init/1,handle_ca11/3,handle_cast/2, handle_info/2,
terminate/2,code__change/3]).
start_link() -> gen_server:start_link({loca1,?SERVER},?MODULE,[],[]).
init([]) -> {ok,State}.
handle_ca11( Request,_From,state) -> {rep1y,Reply,State}.handle_cast(Msg,State) -> {noreply,State}.
handle_info(_Info,State) -> {noreply,State} .terminate(_Reason,_state) -> ok.
code_change(_O1dVsn,State,Extra) -> {ok,State}.
填充这个模板,不断编辑,最终得到的代码会是下面这样:
my_bank.erl
init([]) -> {ok, ets:new(?MODULE,[])}.
handle_call({new,Who}, _From, Tab) ->
Reply = case ets:lookup(Tab, Who) of
[] -> ets:insert(Tab, {Who,0}),
{welcome, Who};
[_] -> {Who, you_already_are_a_customer}
end,
{reply, Reply, Tab};
handle_call({add,Who,X}, _From, Tab) ->
Reply = case ets:lookup(Tab, Who) of
[] -> not_a_customer;
[{Who,Balance}] ->
NewBalance = Balance + X,
ets:insert(Tab, {Who, NewBalance}),
{thanks, Who, your_balance_is, NewBalance}
end,
{reply, Reply, Tab};
handle_call({remove,Who, X}, _From, Tab) ->
Reply = case ets:lookup(Tab, Who) of
[] -> not_a_customer;
[{Who,Balance}] when X =< Balance ->
NewBalance = Balance - X,
ets:insert(Tab, {Who, NewBalance}),
{thanks, Who, your_balance_is, NewBalance};
[{Who,Balance}] ->
{sorry,Who,you_only_have,Balance,in_the_bank}
end,
{reply, Reply, Tab};
handle_call(stop, _From, Tab) ->
{stop, normal, stopped, Tab}.
handle_cast(_Msg, State) -> {noreply, State}.
handle_info(_Info, State) -> {noreply, State}.
terminate(_Reason, _State) -> ok.
code_change(_OldVsn, State, _Extra) -> {ok, State}.
完整程序:
-module(my_bank).
-behaviour(gen_server).
-export([start/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-compile(export_all).
-define(SERVER, ?MODULE).
start() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
stop() -> gen_server:call(?MODULE, stop).
new_account(Who) -> gen_server:call(?MODULE, {new, Who}).
deposit(Who, Amount) -> gen_server:call(?MODULE, {add, Who, Amount}).
withdraw(Who, Amount) -> gen_server:call(?MODULE, {remove, Who, Amount}).
init([]) -> {ok, ets:new(?MODULE,[])}.
handle_call({new,Who}, _From, Tab) ->
Reply = case ets:lookup(Tab, Who) of
[] -> ets:insert(Tab, {Who,0}),
{welcome, Who};
[_] -> {Who, you_already_are_a_customer}
end,
{reply, Reply, Tab};
handle_call({add,Who,X}, _From, Tab) ->
Reply = case ets:lookup(Tab, Who) of
[] -> not_a_customer;
[{Who,Balance}] ->
NewBalance = Balance + X,
ets:insert(Tab, {Who, NewBalance}),
{thanks, Who, your_balance_is, NewBalance}
end,
{reply, Reply, Tab};
handle_call({remove,Who, X}, _From, Tab) ->
Reply = case ets:lookup(Tab, Who) of
[] -> not_a_customer;
[{Who,Balance}] when X =< Balance ->
NewBalance = Balance - X,
ets:insert(Tab, {Who, NewBalance}),
{thanks, Who, your_balance_is, NewBalance};
[{Who,Balance}] ->
{sorry,Who,you_only_have,Balance,in_the_bank}
end,
{reply, Reply, Tab};
handle_call(stop, _From, Tab) ->
{stop, normal, stopped, Tab}.
handle_cast(_Msg, State) -> {noreply, State}.
handle_info(_Info, State) -> {noreply, State}.
terminate(_Reason, _State) -> ok.
code_change(_OldVsn, State, _Extra) -> {ok, State}.
运行测试
2> my_bank:start().
{ok,<0.81.0>}
3> my_bank:deposit("joe",10).
not_a_customer
4> my_bank:new_account("joe").
{welcome,"joe"}
5> my_bank:deposit("joe",10).
{thanks,"joe",your_balance_is,10}
6>
我们自己写的一个gen_server回调模块,,一般会有一个start接口函数开启服务,这个start()函数由我们自己写的,start()函数里面其实是调用gen_server:start_link()函数。gen_server:start_link(Name,Mod,InitArgs,Opts)调用负责启动所有这一切。它创建了一个名为Name的通用服务器程序,它的回调模块是Mod,0pts则控制了这个通用服务器程序的行为。在这里我们可以指定日志信息、调试函数等。然后就是调用Mod:init(InitArgs)来启动服务器程序。
%% ---------------------------------------------
%% Function: init(Args) -> {ok,State} |
%% {ok,State,Timeout}|
%% ignore |
%% {stop,Reason} |
%% Description: Initiates the server
init([]) ->
{ok,#state{}}
通常情况下,我们返回的是{ok,State}。如果想知道其他参数的含义,可以查阅文档中关于gen_server的部分。
返回值是{ok,State},这意味着已经成功启动,服务器的初始状态是State。
为了调用服务器程序,客户端程序的接口会调用函数gen_server:call(Name,Request)。这会导致调用回调模块中的handle_ca11/3函数。
%% ---------------------------------------------
%% Function:
%% handle_call(Request,From,State) -> {reply,Reply,State} |
%% {reply,Reply,State,TimeOut}|
%% {noreply,State} |
%% {noreply,State,TimeOut} |
%% {stop,Reason,Replu,State} |
%% {stop,Reason,State} |
%% Description: Handling call messages
handle_call(_Request,_From,State) ->
Reply = ok,
{reply,Reply,State}
如果一切正常的话,我们返回{replay,Reply,NewState}。这个时候,Reply会作为gen_server:ca11的返回值提交给客户端,而NewState是服务器的新状态。
其他的返回值包括{noreply,…}和{stop,…},它们用得相对较少。noreply会使得服务器继续运行,但客户端会等待回应,回调程序要从服务框架接管向其他进程返回数据的职责。而stop则会导致服务器程序终止运行。
我们已经看到了在gen_server:call和handle_call之间的交互动作。它们主要用来实现远程调用(RPC)。而gen_server:cast(Name,Name)则实现了通知(cast),就是一个无返回值程过程调用(RPC)的调用(实际上,也就是发送一个消息,但传统上将这种情况叫做通知,用来和远程调用区别开来)。
相应的回调函数是handle_cast,在模板中的入口是这样的:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LyCubEks-1625996866533)(EF4A0FD447B04FF49DDCF76E3C609AC4)]
处理程序通常只返回{noreply,NewState},这会改变服务器的状态;它也可以返回{stop,… .},这会终止服务器。
回调函数handle_info(Info,State)用来处理发给服务器的原生消息。那么,到底什么样的消息才是原生消息呢?如果服务器和其他的进程建立了连接,并且正在捕捉退出事件,那么它有可能会突然收到意外的{'EXIT’,Pid,what}这样的消息。又或者,系统中的其他进程获得了通用服务器程序的PID,那么它可能会给服务器发送消息。诸如此类的消息最终都会作为Info的值传给回调程序。
handle_info在模板中的入口是这样的:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rD97NXXf-1625996866542)(2EC87A6865DC4337A4C914D3BB7BC71C)]
很多原因会导致服务器终止。比如,在handle_ Somethi ng函数中返回一个{stop,Reason,
NewState},或者直接终止服务器,返回-一个{‘EXIT’, reason}。所有这些情况,无论是怎么引
起的,无- -例外,都会调用回调模块的terminate(Reason,NewState) 函数。
模板是这样的:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jIGdhPUC-1625996866549)(29DE4B951CA846D49118BEC1BFC79DA1)]
因为我们要终止了,所以这个代码没有必要再返回一个新的状态了。那么,要如何处理参数
中的这个State呢?其实还是有很多事情可以做的。主要取决于你的应用程序,比如,可以把这
个State写到磁盘,以消息形式发给另一个进程,或者直接丢掉。如果将来某个时候你还要再次
启动这个服务器,可以用terminate/2来触发“I’ll be back”函数。
你可以在运行中动态地改变服务器的状态。版本管理子系统会在系统对软件进行升级时调用
这个回调函数。
模板是这样的:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mw5GMqdg-1625996866560)(18F30959DE04429CBE146138B5AEC1CD)]
https://www.cnblogs.com/hzy1987/p/5441807.html
用来启动服务器的有start/3,start/4,start_link/3,start_link/4这四个函数。 使用这些start函数之后,就会产生一个新的进程,也就是一个gen_server服务器。这些 start函数的正常情况下返回值是{ok,Pid},Pid就是这个新进程的进程号。 start_link和start的区别在于是否跟父进程建立链接,换种说法是,新启动的进程死掉后,会不会通知启动他的进程(父进程)。
函数
start(Module, Args, Options) -> Result
start(ServerName, Module, Args, Options) -> Result
start_link(Module, Args, Options) -> Result
start_link(ServerName, Module, Args, Options) -> Result
类型
Types
ServerName = {local,Name} | {global,GlobalName}
| {via,Module,ViaName}
Name = atom()
GlobalName = ViaName = term()
Module = atom()原子
Args = term()任何类型
Options = [Option]
Option = {debug,Dbgs} | {timeout,Time} | {hibernate_after,HibernateAfterTimeout} | {spawn_opt,SOpts}
Dbgs = [Dbg]
Dbg = trace | log | statistics | {log_to_file,FileName} | {install,{Func,FuncState}}
SOpts = [term()]
Result = {ok,Pid} | ignore | {error,Error}
Pid = pid()
Error = {already_started,Pid} | term()
start函数可以四个参数(ServerName, Module, Args, Options):
start对应的回调函数是init/1,一般来说是进行服务器启动后的一些初始化的工作, 并生成初始的状态State,正常返回是{ok, State}。这个State是贯穿整个服务器, 并把所有六个回调函数联系起来的纽带。它的值最初由init/1生成, 此后可以由三个handle函数修改,每次修改后又要放回返回值中, 供下一个被调用的handle函数使用。 如果init/1返回ignore或{stop, Reason},则会中止服务器的启动。
有一点细节要注意的是,API函数和回调函数虽然习惯上是写在同一个文件中,但执行函数 的进程却通常是不一样的。在上面的模板中,start_link/0中使用self()的话,显示的是调用者的进程号,而在init/1中使用self()的话,显示的是服务器的进程号。
Module:init(Args) -> Result
Types
Args = term()
Result = {ok,State} | {ok,State,Timeout} | {ok,State,hibernate}
| {ok,State,{continue,Continue}} | {stop,Reason} | ignore
State = term()
Timeout = int()>=0 | infinity
Reason = term()
源码
gen_server.erl
start(Name, Mod, Args, Options) ->
gen:start(?MODULE, nolink, Name, Mod, Args, Options).
%% Name-服务名,Mod- 回调模块,Args-参数,Options-选项
start_link(Name, Mod, Args, Options) ->
gen:start(?MODULE, link, Name, Mod, Args, Options).
这里start和start_link的区别在于调用gen:start函数时,LinkP参数为nolink还是link,后面以start_link进行讲解
gen.erl
%%GenMod-模块gen_server,LinkP-link,Name-服务名,Args-参数,Options-选项
start(GenMod, LinkP, Name, Mod, Args, Options) ->
case where(Name) of
undefined -> %%如果还没有服务名
do_spawn(GenMod, LinkP, Name, Mod, Args, Options);
Pid -> %%如果服务名已存在,返回错误
{error, {already_started, Pid}}
end.
%%start_link
%%GenMod-模块gen_server,LinkP-link,Name-服务名,Args-参数,Options-选项
do_spawn(GenMod, link, Name, Mod, Args, Options) ->
Time = timeout(Options),
proc_lib:start_link(?MODULE, init_it,
[GenMod, self(), self(), Name, Mod, Args,Options],%%[GenMod-gen_server,self(),self(),Name,Mod-回调模块,Args,Options]
Time,
spawn_opts(Options));
%%start
do_spawn(GenMod, _, Name, Mod, Args, Options) ->
Time = timeout(Options),
proc_lib:start(?MODULE, init_it,
[GenMod, self(), self, Name, Mod, Args, Options],
Time,
spawn_opts(Options)).
可以看出,start_link和start的区别在于调用proc_lib:start时,封装的参数A中,第三个参数不同,start_link为self()当前进程自己,start为self。
proc_lib.erl
%%M-模块gen,F-init_it,A-[GenMod, self(), self(), Name, Mod, Args, Options],Time-超时时间,SpawnOpts-spawn_opts(Options)
start_link(M,F,A,Timeout,SpawnOpts) when is_atom(M), is_atom(F), is_list(A) ->
?VERIFY_NO_MONITOR_OPT(M, F, A, Timeout, SpawnOpts),
sync_start_link(?MODULE:spawn_opt(M, F, A, [link|SpawnOpts]), Timeout).
sync_start_link(Pid, Timeout) ->
receive
{ack, Pid, Return} ->
Return;
{'EXIT', Pid, Reason} ->
{error, Reason}
after Timeout ->
kill_flush(Pid),
{error, timeout}
end.
erlang.erl
%%_Module-gen模块,_Function-init_it,_Args-[GenMod, self(), self(), Name, Mod, Args, Options],Time-超时时间
spawn_opt(_Module, _Function, _Args, _Options) ->
erlang:nif_error(undefined).
spawn_opt/4与spawn/3一样工作,只是在创建进程时指定了一个额外的选项列表。如果指定了选项link(start_link),连接到父进程;指定了选项monitor(start),则监视新创建的进程,并返回监视器的pid和引用;。
spawn/3就是新建进程,其入口点为Module:Fun,也就是gen:init_it(),参数为[GenMod-gen_server,self(),self(),Name,Mod-回调模块,Args,Options]。这时已经启动了一个新的并发进程,也就是gen_sever服务器,来执行后面的代码。
让我们看gen:init_it()函数
gen.erl
%%[GenMod-gen_server,self(),self(),Name-服务名称,Mod-回调模块,Args-参数,Options]
init_it(GenMod, Starter, Parent, Name, Mod, Args, Options) ->
case register_name(Name) of
true ->
init_it2(GenMod, Starter, Parent, Name, Mod, Args, Options);
{false, Pid} ->
proc_lib:init_ack(Starter, {error, {already_started, Pid}})
end.
首先调用register_name(Name),将进程自己self()注册一个名为Name的原子
register_name({local, Name} = LN) ->
try register(Name, self()) of
true -> true
catch
error:_ ->
{false, where(LN)}
end;
register_name({global, Name} = GN) ->
case global:register_name(Name, self()) of
yes -> true;
no -> {false, where(GN)}
end;
register_name({via, Module, Name} = GN) ->
case Module:register_name(Name, self()) of
yes ->
true;
no ->
{false, where(GN)}
end.
注册成功,执行init_it2/6函数
init_it2(GenMod, Starter, Parent, Name, Mod, Args, Options) ->
GenMod:init_it(Starter, Parent, Name, Mod, Args, Options).
init_it2函数调用gen_server:init_it/6函数,这里注意调用start_link和start第2个参数Parent的不同,start_link的话Parent是调用start_link当前进程自己,而start为self。
gen_server.erl
init_it(Starter, self, Name, Mod, Args, Options) ->
init_it(Starter, self(), Name, Mod, Args, Options);
init_it(Starter, Parent, Name0, Mod, Args, Options) ->
Name = gen:name(Name0),
Debug = gen:debug_options(Name, Options),
HibernateAfterTimeout = gen:hibernate_after(Options),
case init_it(Mod, Args) of
{ok, {ok, State}} ->
proc_lib:init_ack(Starter, {ok, self()}),
loop(Parent, Name, State, Mod, infinity, HibernateAfterTimeout, Debug);
{ok, {ok, State, Timeout}} ->
proc_lib:init_ack(Starter, {ok, self()}),
loop(Parent, Name, State, Mod, Timeout, HibernateAfterTimeout, Debug);
{ok, {stop, Reason}} ->
%% For consistency, we must make sure that the
%% registered name (if any) is unregistered before
%% the parent process is notified about the failure.
%% (Otherwise, the parent process could get
%% an 'already_started' error if it immediately
%% tried starting the process again.)
gen:unregister_name(Name0),
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
{ok, ignore} ->
gen:unregister_name(Name0),
proc_lib:init_ack(Starter, ignore),
exit(normal);
{ok, Else} ->
Error = {bad_return_value, Else},
proc_lib:init_ack(Starter, {error, Error}),
exit(Error);
{'EXIT', Class, Reason, Stacktrace} ->
gen:unregister_name(Name0),
proc_lib:init_ack(Starter, {error, terminate_reason(Class, Reason, Stacktrace)}),
erlang:raise(Class, Reason, Stacktrace)
end.
case init_it(Mod, Args)调用init_it/2函数,参数为(Mod-回调模块,Args-init参数)。然后调用Mod:init/1,也就是回调模块的init()函数。
init_it(Mod, Args) ->
try
{ok, Mod:init(Args)}
catch
throw:R -> {ok, R};
Class:R:S -> {'EXIT', Class, R, S}
end.
然后
通过proc_lib:init_ack()函数,向Starter进程(也就是调用gen_server:start()进程)发送返回结果消息。
proc_lib.erl
%%proc_lib.erl
init_ack(Parent, Return) ->
Parent ! {ack, self(), Return},
ok.
接受消息为之前的pro_lib的sync_start_link函数
sync_start_link(?MODULE:spawn_opt(M, F, A, [link|SpawnOpts]), Timeout).
sync_start_link(Pid, Timeout) ->
receive
{ack, Pid, Return} ->
Return; %%返回值为Return变量
{'EXIT', Pid, Reason} ->
{error, Reason}
after Timeout ->
kill_flush(Pid),
{error, timeout}
end.
如果是start启动服务器的话,这里是调用sync_start,再里面通过demonitor删除监控
sync_start({Pid, Ref}, Timeout) ->
receive
{ack, Pid, Return} ->
erlang:demonitor(Ref, [flush]),
Return;
{'DOWN', Ref, process, Pid, Reason} ->
{error, Reason}
after Timeout ->
erlang:demonitor(Ref, [flush]),
kill_flush(Pid),
{error, timeout}
end.
然后如果init_it(Mod, Args)的值匹配为{ok, {ok, State}}的话,gen_server调用loop(Parent, Name, State, Mod, infinity, HibernateAfterTimeout, Debug);接受客户端消息
loop(Parent, Name, State, Mod, infinity, HibernateAfterTimeout, Debug) ->
receive
Msg ->
decode_msg(Msg, Parent, Name, State, Mod, infinity, HibernateAfterTimeout, Debug, false)
after HibernateAfterTimeout ->
loop(Parent, Name, State, Mod, hibernate, HibernateAfterTimeout, Debug)
end;
三个handle开头的回调函数对应着三种不同的使用服务器的方式。如下:
gen_server:call ------------- handle_call/3
gen_server:cast ------------- handle_cast/2
服务进程 !消息 ------------- handle_info/2
gen_server:call 是有返回值的调用,同步调用;cast是无返回值的调用,即通知;而直接向服务器进程发的 消息则由handle_info处理。后面两个为异步调用。
call是有返回值的调用,也是所谓的同步调用,进程会在调用后一直等待直到回调函数返回为止。
函数形式
call(ServerRef, Request) -> Reply
call(ServerRef, Request, Timeout) -> Reply
Types
ServerRef = Name | {Name,Node} | {global,GlobalName}
| {via,Module,ViaName} | pid()
Node = atom()
GlobalName = ViaName = term()
Request = term()
Timeout = int()>0 | infinity
Reply = term()
call是用来指挥回调函数handle_call/3干活的。
Module:handle_call(Request, From, State) -> Result
Types
Request = term()
From = {pid(),Tag}
State = term()
Result = {reply,Reply,NewState} | {reply,Reply,NewState,Timeout}
| {reply,Reply,NewState,hibernate}
| {reply,Reply,NewState,{continue,Continue}}
| {noreply,NewState} | {noreply,NewState,Timeout}
| {noreply,NewState,hibernate}
| {noreply,NewState,{continue,Continue}}
| {stop,Reason,Reply,NewState} | {stop,Reason,NewState}
Reply = term()
NewState = term()
Timeout = int()>=0 | infinity
Continue = term()
Reason = term()
源码
首先是gen_server:call函数,函数里面调用的是gen:call函数
gen_server.erl
%%gen_server.erl
%%Name-服务名,Request-请求消息,Timeout-超时时间
call(Name, Request, Timeout) ->
case catch gen:call(Name, '$gen_call', Request, Timeout) of
{ok,Res} ->
Res;
{'EXIT',Reason} ->
exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
end.
gen.erl
%%Process-服务名,Label-'$gen_call',Request-请求消息,TimeOut-超时时间
call(Process, Label, Request, Timeout) when is_pid(Process),
Timeout =:= infinity orelse is_integer(Timeout) andalso Timeout >= 0 ->
do_call(Process, Label, Request, Timeout);
通过send给服务端进程发送消息,消息为 {Label, {self(), Mref}, Request}, [noconnect],其中Label的值为’$gen_call’。发送消息后,会通过receive原语接受gen_server服务端消息,这也是gen_sever:call是同步调用的原因。如果调用gen_sever:call的时候有设置超时时间,那么receive会指定超时时间,设定了进程接受消息的最长时间。
do_call(Process, Label, Request, Timeout) when is_atom(Process) =:= false ->
Mref = erlang:monitor(process, Process),
%% OTP-21:
%% Auto-connect is asynchronous. But we still use 'noconnect' to make sure
%% we send on the monitored connection, and not trigger a new auto-connect.
%%
erlang:send(Process, {Label, {self(), Mref}, Request}, [noconnect]),
receive
{Mref, Reply} ->
erlang:demonitor(Mref, [flush]),
{ok, Reply};
{'DOWN', Mref, _, _, noconnection} ->
Node = get_node(Process),
exit({nodedown, Node});
{'DOWN', Mref, _, _, Reason} ->
exit(Reason)
after Timeout ->
erlang:demonitor(Mref, [flush]),
exit(timeout)
end.
之前启动服务时,会调用的gen_server:loop函数来接受消息。发送给gen_server服务端的消息,会调用gen_server:decode_msg函数
gen_server.erl
loop(Parent, Name, State, Mod, infinity, HibernateAfterTimeout, Debug) ->
receive
Msg ->
decode_msg(Msg, Parent, Name, State, Mod, infinity, HibernateAfterTimeout, Debug, false)
after HibernateAfterTimeout ->
loop(Parent, Name, State, Mod, hibernate, HibernateAfterTimeout, Debug)
end;
在decode_msg时,匹配到第三个值,调用gen_server:handle_msg函数
decode_msg(Msg, Parent, Name, State, Mod, Time, HibernateAfterTimeout, Debug, Hib) ->
case Msg of
{system, From, Req} ->
sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
[Name, State, Mod, Time, HibernateAfterTimeout], Hib);
{'EXIT', Parent, Reason} ->
terminate(Reason, ?STACKTRACE(), Name, undefined, Msg, Mod, State, Debug);
_Msg when Debug =:= [] ->
handle_msg(Msg, Parent, Name, State, Mod, HibernateAfterTimeout);
_Msg ->
Debug1 = sys:handle_debug(Debug, fun print_event/3,
Name, {in, Msg}),
handle_msg(Msg, Parent, Name, State, Mod, HibernateAfterTimeout, Debug1)
end.
handle_msg会有一个子句参数带’$gen_call’标签与其匹配。
handle_msg({'$gen_call', From, Msg}, Parent, Name, State, Mod, HibernateAfterTimeout) ->
Result = try_handle_call(Mod, Msg, From, State),
case Result of
{ok, {reply, Reply, NState}} ->
reply(From, Reply),
loop(Parent, Name, NState, Mod, infinity, HibernateAfterTimeout, []);
{ok, {reply, Reply, NState, Time1}} ->
reply(From, Reply),
loop(Parent, Name, NState, Mod, Time1, HibernateAfterTimeout, []);
{ok, {noreply, NState}} ->
loop(Parent, Name, NState, Mod, infinity, HibernateAfterTimeout, []);
{ok, {noreply, NState, Time1}} ->
loop(Parent, Name, NState, Mod, Time1, HibernateAfterTimeout, []);
{ok, {stop, Reason, Reply, NState}} ->
try
terminate(Reason, ?STACKTRACE(), Name, From, Msg, Mod, NState, [])
after
reply(From, Reply)
end;
Other -> handle_common_reply(Other, Parent, Name, From, Msg, Mod, HibernateAfterTimeout, State)
end;
handle_msg会调用try_handle_call函数获取Result。在try_handle_call函数里面调用回调模块的handle_call函数来处理客户端的消息,handle_call的3个参数为Msg-客户端发送的消息,From-客户端进程,State-gen_server服务端状态。
try_handle_call(Mod, Msg, From, State) ->
try
{ok, Mod:handle_call(Msg, From, State)}
catch
throw:R ->
{ok, R};
Class:R:Stacktrace ->
{'EXIT', Class, R, Stacktrace}
end.
回调模块的handle_call函数执行后,handle_msg函数里面会调用reply函数给客户端发送消息。然后再次调用loop函数,handle_call的返回值为{reply,Reply,State},这里的State会成为下一次loop函数循环的参数,也就是可以通过handle_call函数来改变服务器的状态值。
handle_msg({'$gen_call', From, Msg}, Parent, Name, State, Mod, HibernateAfterTimeout) ->
Result = try_handle_call(Mod, Msg, From, State),
case Result of
{ok, {reply, Reply, NState}} ->
reply(From, Reply),
loop(Parent, Name, NState, Mod, infinity, HibernateAfterTimeout, []);
这里的{To,Tag}等于sned发送消息的{self(),Mref}
reply({To, Tag}, Reply) ->
catch To ! {Tag, Reply},
ok.
cast是没有返回值的调用,一般把它叫做通知。它是一个“异步”的调用,调用后会直接收到 ok,无需等待回调函数执行完毕。
cast(ServerRef, Request) -> ok
Types
ServerRef = Name | {Name,Node} | {global,GlobalName}
| {via,Module,ViaName} | pid()
Node = atom()
GlobalName = ViaName = term()
Request = term()
它的形式是gen_server:cast(ServerRef, Request)。参数含义 与call相同。由于不需要等待返回,所以没必要设置超时,没有第三个参数。
在多节点的情况下,可以用abcast,向各节点上的具有指定名字的服务进程发通知。
cast们对应的回调函数是handle_cast/2,具体为:handle_cast(Msg, State)。 第一个参数是由cast传进去的,第二个是服务器状态,和call类似,不多说。
handel_cast/2的返回值通常是{noreply, NewState},这可以用来改变服务器状态, 或是{stop, Reason, NewState},这会停止服务器。通常来说,停止服务器的命令用 cast来实现比较多。
Module:handle_cast(Request, State) -> Result
Types
Request = term()
State = term()
Result = {noreply,NewState} | {noreply,NewState,Timeout}
| {noreply,NewState,hibernate}
| {noreply,NewState,{continue,Continue}}
| {stop,Reason,NewState}
NewState = term()
Timeout = int()>=0 | infinity
Continue = term()
Reason = term()
源码
gen_server.erl
cast({global,Name}, Request) ->
catch global:send(Name, cast_msg(Request)),
ok;
cast({via, Mod, Name}, Request) ->
catch Mod:send(Name, cast_msg(Request)),
ok;
cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) ->
do_cast(Dest, Request);
cast(Dest, Request) when is_atom(Dest) ->
do_cast(Dest, Request);
cast(Dest, Request) when is_pid(Dest) ->
do_cast(Dest, Request).
如果我们的参数是{Dest,Request},就会匹配到第四个值,会调用do_cast函数。
do_cast(Dest, Request) ->
do_send(Dest, cast_msg(Request)),
ok.
do_cast函数会调用do_send函数,在调用do_send函数前,会调用cast_msg函数对消息进行封装
cast_msg函数主要是封装消息,带上了’$gen_cast’标签
cast_msg(Request) -> {'$gen_cast',Request}.
de_send通过send给服务端发送消息,发送消息后,没有想call一样接受服务端消息,发送消息后直接返回ok,这也是cast是异步调用的原因。
do_send(Dest, Msg) ->
try erlang:send(Dest, Msg)
catch
error:_ -> ok
end,
ok.
再之前上面讲call的时候,已经讲过,启动服务器后,会调用loop接受消息。最终会调用handle_msg函数,handle_cast发送的消息与handle_msg(Msg, Parent, Name, State, Mod, HibernateAfterTimeout)子句匹配。
handle_msg(Msg, Parent, Name, State, Mod, HibernateAfterTimeout) ->
Reply = try_dispatch(Msg, Mod, State),
handle_common_reply(Reply, Parent, Name, undefined, Msg, Mod, HibernateAfterTimeout, State).
在handle_msg函数里面会调用try_dispatch/3函数,try_dispatch函数有一个带’$gen_cast’标签的子句与之匹配,
try_dispatch({'$gen_cast', Msg}, Mod, State) ->
try_dispatch(Mod, handle_cast, Msg, State);
try_dispatch(Info, Mod, State) ->
try_dispatch(Mod, handle_info, Info, State).
然后调用try_dispatch(Mod, handle_cast, Msg, State)函数,在里面,调用MOd:func函数,也就是回调模块的handle_cast函数。
handle_cast(_Msg,State) -> {noreply,state}.
try_dispatch(Mod, Func, Msg, State) ->
try
{ok, Mod:Func(Msg, State)}
catch
throw:R ->
{ok, R};
error:undef = R:Stacktrace when Func == handle_info ->
case erlang:function_exported(Mod, handle_info, 2) of
false ->
?LOG_WARNING(
#{label=>{gen_server,no_handle_info},
module=>Mod,
message=>Msg},
#{domain=>[otp],
report_cb=>fun gen_server:format_log/2,
error_logger=>
#{tag=>warning_msg,
report_cb=>fun gen_server:format_log/1}}),
{ok, {noreply, State}};
true ->
{'EXIT', error, R, Stacktrace}
end;
Class:R:Stacktrace ->
{'EXIT', Class, R, Stacktrace}
end.
在try_dispathch执行完成后,在handle_msg函数里面会调用handle_common_reply(Reply, Parent, Name, undefined, Msg, Mod, HibernateAfterTimeout, State).
handle_common_reply函数又会重新调用loop函数循环接受客户端消息,这里同样能改变服务端的状态State。
handle_common_reply(Reply, Parent, Name, From, Msg, Mod, HibernateAfterTimeout, State) ->
case Reply of
{ok, {noreply, NState}} ->
loop(Parent, Name, NState, Mod, infinity, HibernateAfterTimeout, []);
{ok, {noreply, NState, Time1}} ->
loop(Parent, Name, NState, Mod, Time1, HibernateAfterTimeout, []);
{ok, {stop, Reason, NState}} ->
terminate(Reason, ?STACKTRACE(), Name, From, Msg, Mod, NState, []);
{'EXIT', Class, Reason, Stacktrace} ->
terminate(Class, Reason, Stacktrace, Name, From, Msg, Mod, State, []);
{ok, BadReply} ->
terminate({bad_return_value, BadReply}, ?STACKTRACE(), Name, From, Msg, Mod, State, [])
end.
总结
通过源代码查看,可以看出,call和cast在通过send给服务端发送消息后,call会用recevie接受服务端消息,而cast通过send给服务端发送消息后,直接返回ok。
原生消息
原生消息是指不通过call或cast,直接发往服务器进程的消息,有些书上译成“带外消息”。 比方说网络套接字socket发来的消息、别的进程用!发过来的消息、跟服务器建立链接的进程死掉了, 发来{‘EXIT’, Pid, Why}等等。一般写程序要尽量用API,不要直接用!向服务器进程发消息, 但对于socket一类的依赖于消息的应用,就不得不处理原生消息了。
4> AreaPid = whereis(area_server).
<0.93.0>
5> AreaPid ! {hello}.
area_server handle_info Satate:1
{hello}
6>
上面讲过,loop循环里面接受时候,最终会调用handle_msg处理消息。
而原生消息与之匹配的有
handle_msg(Msg, Parent, Name, State, Mod, HibernateAfterTimeout) ->
Reply = try_dispatch(Msg, Mod, State),
handle_common_reply(Reply, Parent, Name, undefined, Msg, Mod, HibernateAfterTimeout, State).
调用try_dispath函数,与try_dispath函数对应的try_dispatch(Info, Mod, State)
try_dispatch(Info, Mod, State) ->
try_dispatch(Mod, handle_info, Info, State).
原生消息使用handle_info/2处理,具体为handle_info(Info, State)。其中Info是 发过来的消息的内容。回复和handle_cast是一样的。
上面介绍的handle函数返回{stop,…},就会使用回调函数terminate/2进行扫尾工作,或者直接调用gen_server:stop()函数直接终止服务器。 典型的如关闭已打开的资源、文件、网络连接,打log做记录,通知别的进程“我要死啦”, 或是“信春哥,满血复活”。
可以调用gen_server:stop()函数
stop(ServerRef) -> ok
OTP 18.0
stop(ServerRef, Reason, Timeout) -> ok
Types
ServerRef = Name | {Name,Node} | {global,GlobalName}
| {via,Module,ViaName} | pid()
Node = atom()
GlobalName = ViaName = term()
Reason = term()
Timeout = int()>0 | infinity
命令通用服务器以指定的原因退出并等待它终止。gen_server进程会调用Module:terminat在退出前。
如果服务器以预期的原因终止,则该函数返回ok。除了normal、 shutdown或{shutdown,Term}之外的任何其他原因都会导致使用logger(3)发出错误报告 。默认的Reason是normal。
Module:terminate(Reason, State)
Types
Reason = normal | shutdown | {shutdown,term()} | term()
State = term()
该函数在即将终止时由gen_server进程调用。它与Module:init/1相反, 并进行任何必要的清理。当它返回时,gen_server进程以Reason终止。返回值被忽略。