其实本来是要讲erlang如何解决tcp粘包问题的,刚好erlang_mysql_driver里面就有关于这个问题的一种解决方式,所以干脆就以erlang_mysql_driver的源码为例来探究下该问题的解决方案。
mysql的协议包是建立在tcp的基础上的,而tcp协议是流协议,也就是在使用的时候可以保证按顺序收到,但是并不是对方发送多少次,我们就能接收多少次。
这就是tcp粘包问题了,例如你调用两次gen_tcp:send,发了两句话,“你好”“吃饭了吗”。可是我这边在调用gen_tcp:recv接收的时候,可能是需要调用3次,分别收到 “你”“好吃”“饭了吗”。那我就根本不知道你在说什么了。
针对上述问题的解决方案是在协议自定义的时候用len+body的形式,len所占的空间固定,比如只占一个字节,那么你就可以这样发“2你好4吃饭了吗”。我这边每次先取一个字节,得到2,然后就取后面2个数据“你好”。再次循环,取一个字节4,然后取后面4个字节“吃饭了吗”。这样我们就可以正常聊天了。(当然中文不止一个字节,我随便定的,不要纠结哈)本文要讲的就是针对这种方案的erlang具体设计了。
另外关于tcp粘包问题的概念也挺多的,有一种特别的看法是把粘包问题看成传输层的问题,然后得出的结论是tcp以前会出现“粘包问题”现在不会了。这种概念的“粘包问题”是指多进程通过同一端口发送消息如何避免乱序。这里不会对这些进行讨论,在这篇文章中我理解的tcp粘包问题是在应用层的,也就是因为tcp的字节流无边界,发送方发出N个协议包,接收方收到M个协议包的问题,N可能大于M也可能小于M。
erlang_mysql_driver使用mysql_recv模块来接收mysql服务器发送来的数据,那么mysql_recv进程肯定是需要处理tcp粘包的。
我们先看下,mysql_recv是怎么建立连接的?
这个连接建立的过程在mysql_recv:init中
init(Host, Port, LogFun, Parent) ->
case gen_tcp:connect(Host, Port, [binary, {packet, 0}, {keepalive, true}]) of
{ok, Sock} ->
Parent ! {mysql_recv, self(), init, {ok, Sock}},
State = #state{socket = Sock,
parent = Parent,
log_fun = LogFun,
data = <<>>
},
loop(State);
E ->
LogFun(?MODULE, ?LINE, error,
fun() ->
{"mysql_recv: Failed connecting to ~p:~p : ~p",
[Host, Port, E]}
end),
Msg = lists:flatten(io_lib:format("connect failed : ~p", [E])),
Parent ! {mysql_recv, self(), init, {error, Msg}}
end.
gen_tcp:connect(Host, Port, [binary, {packet, 0}, {keepalive, true}])
这里有个很重要的默认参数就是{active, true},在不显式指定active的时候,默认为true。
true指定的是主动消息接收,也就是一旦有数据达到这个Socket,那么这个Socket的控制进程(mysql_recv)就会收到{tcp, …}, {tcp_error, …}, {tcp_closed,…}这一类的消息。
而packet这个参数,跟我们要讨论的tcp粘包问题关系就很大了,它就是erlang用来解决tcp粘包的,如{packet, 1}表明协议包的第一个bytes用来表示长度,那么我们只需要直接接收数据就行了,或者直接发送数据。erlang会自动解析协议包头的长度,但是在mysql_recv中并没有使用packet这个功能。
那么mysql_recv自己是怎么解决的呢?我们看它收到数据后怎么做的?
loop(State) ->
Sock = State#state.socket,
receive
{tcp, Sock, InData} ->
NewData = list_to_binary([State#state.data, InData]),
%% send data to parent if we have enough data
Rest = sendpacket(State#state.parent, NewData),
loop(State#state{data = Rest});
{tcp_error, Sock, Reason} ->
LogFun = State#state.log_fun,
LogFun(?MODULE, ?LINE, error,
fun() ->
{"mysql_recv: Socket ~p closed : ~p",
[Sock, Reason]}
end),
State#state.parent ! {mysql_recv, self(), closed, {error, Reason}},
error;
{tcp_closed, Sock} ->
LogFun = State#state.log_fun,
LogFun(?MODULE, ?LINE, debug,
fun() ->
{"mysql_recv: Socket ~p closed", [Sock]}
end),
State#state.parent ! {mysql_recv, self(), closed, normal},
error
end.
每次收到数据后InData后,先把之前缓存在mysql_recv state中的data取出,然后和InData合并。NewData = InData + 之前缓存的Data,然后调用send_packet。
%% send data to parent if we have enough data
sendpacket(Parent, Data) ->
case Data of
<<Length:24/little, Num:8, D/binary>> ->
if
Length =< size(D) ->
{Packet, Rest} = split_binary(D, Length),
Parent ! {mysql_recv, self(), data, Packet, Num},
sendpacket(Parent, Rest);
true ->
Data
end;
_ ->
Data
end.
sendpacket并没有直接发送数据,sendpacket取出前面3个字节,作为这个包长度的值。为什么是3个字节呢?为什么不是2个或4个呢,因为这是mysql规定,mysql的数据包就是用前面3个字节表示长度,第4个字节表示序号,可见上面的代码匹配 Length:24, Num:8。
当我们知道这个包的长度后,就看后面的data够不够了,如果不够那就继续进入#state.data中缓存。如果够了,那就按Length截取出数据,发送给mysql_conn,剩下的data继续进入#state.data中缓存。
事实上erlang本身的{packet, N}这个参数就提供了,强大的tcp粘包处理。
在不考虑{packet, N}的情况下,其他处理tcp粘包的方法:
当然上面说的两种方式,包括{packet, N}都是建立在我们自定义的协议包使用固定的字节数(如mysql的3),来表示该报的长度。包的格式 = Len + Body,Len所占的字节数固定。这里的Len的字节数固定后,这个包的最大值其实也就知道了,如mysql的一个包最大为16m((2^24)-1=(16M-1)字节),超过该值就要分包发送。