近来阅读leaf框架的代码,有些感悟,特来记录一番。既是一个总结,又是对后来阅读者的一个启发。
个人看代码的一个习惯,喜欢有上下文,因此会各个代码文件之间相互乱窜。但多看几次也就能对框架要做什么和我们要做什么有一个清晰的了解了。
阅读本文之前,还是建议先看看作者写的Wiki,其中有关于框架整体的一个介绍和使用。至于项目地址,去github上搜索便是。
首先需要搭建好一个使用的环境,leafserver就是一个很好的例子。本文将从leafserver开始探讨整个leaf框架
leafserver中包含了3个很重要的基本模块,game,gate,login
game模块是游戏的主要逻辑模块
gate是网关,负责将client的连接以及消息转发。这里的转发可以转到game,也可以转到login。game和login在逻辑上的平等的
login是登录模块,这里主要是玩家首次连接服务器时,需要进行相关的账号验证等
game和login模块都对外提供了一个ChanRPC的通道,这个通道就是是用来对外进行交互的。
先看看main.go,代码如下
func main() {
lconf.LogLevel = conf.Server.LogLevel
lconf.LogPath = conf.Server.LogPath
lconf.LogFlag = conf.LogFlag
lconf.ConsolePort = conf.Server.ConsolePort
lconf.ProfilePath = conf.Server.ProfilePath
leaf.Run(
game.Module,
gate.Module,
login.Module,
)
}
可以看到,这里调用了leaf.Run函数,并传递了3个(模块)参数,即game,gate以及login
转到leaf.Run方法
func Run(mods ...module.Module) {
// logger
if conf.LogLevel != "" {
logger, err := log.New(conf.LogLevel, conf.LogPath, conf.LogFlag)
if err != nil {
panic(err)
}
log.Export(logger)
defer logger.Close()
}
log.Release("Leaf %v starting up", version)
// module
for i := 0; i < len(mods); i++ {
module.Register(mods[i]) // 标记1
}
module.Init() // 标记2
// cluster
cluster.Init()
// console
console.Init()
// close
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, os.Kill)
sig := <-c
log.Release("Leaf closing down (signal: %v)", sig)
console.Destroy()
cluster.Destroy()
module.Destroy()
}
在标记1处,是对传入的module进行注册,再看看Register方法中做了什么
type Module interface {
OnInit()
OnDestroy()
Run(closeSig chan bool)
}
type module struct {
mi Module
closeSig chan bool
wg sync.WaitGroup
}
var mods []*module
func Register(mi Module) {
m := new(module)
m.mi = mi
m.closeSig = make(chan bool, 1)
mods = append(mods, m)
}
在Register中,可以看到,传入的module参数都是实现了接口Module的。也就是说,game,gate,login这些module都是实现了Module中的OnInit,OnDestroy,Run方法的
再看标记2处,这里调用了module模块中的Init方法,继续走进去呢。
func Init() {
for i := 0; i < len(mods); i++ {
mods[i].mi.OnInit()
}
for i := 0; i < len(mods); i++ {
m := mods[i]
m.wg.Add(1)
go run(m)
}
}
在这个方法中,分别调用了各个模块的OnInit方法,并且在其后,对应于各个模块开启了一个goroutine,参数为各个模块,即参数为game,gate以及login
在这个goroutine中又做什么事情呢
func run(m *module) {
m.mi.Run(m.closeSig)
m.wg.Done()
}
这里可以看到,调用了模块实现的Run方法。
到这里,整个leafServer就正式开始跑起来了
总结一下就是,从leafServer的main.go文件中的main方法开始,依次调用
leaf.Run —> module.Register —> module.Init —> (game.OnInit/gate.OnInit/login.OnInit) —> module.run(game.Run/gate.Run/login.Run)
这样看着逻辑就相对要清楚一些了。
我们再去看看leafServer中的game,gate,login中,其分别对应的OnInit,Run在做些什么事情
gate模块:
type Module struct {
*gate.Gate
}
func (m *Module) OnInit() {
m.Gate = &gate.Gate{
MaxConnNum: conf.Server.MaxConnNum,
PendingWriteNum: conf.PendingWriteNum,
MaxMsgLen: conf.MaxMsgLen,
WSAddr: conf.Server.WSAddr,
HTTPTimeout: conf.HTTPTimeout,
CertFile: conf.Server.CertFile,
KeyFile: conf.Server.KeyFile,
TCPAddr: conf.Server.TCPAddr,
LenMsgLen: conf.LenMsgLen,
LittleEndian: conf.LittleEndian,
Processor: msg.Processor,
AgentChanRPC: game.ChanRPC,
}
}
这里定义了一个Module结构,并声明了一个Gate的指针变量,在OnInit方法中进行了变量的初始化。
game模块:
var (
skeleton = base.NewSkeleton()
ChanRPC = skeleton.ChanRPCServer
)
type Module struct {
*module.Skeleton
}
func (m *Module) OnInit() {
m.Skeleton = skeleton
}
func (m *Module) OnDestroy() {
}
定义了2个变量,skeleton暂且不看,ChanRPC在前面提到过,是对外提供的一个通信的通道。
定义了一个Module,并内嵌了一个Skeleton的指针变量。并在OnInit方法中进行了初始化
login模块的代码于game一致,这里就不单独说明了
有个疑问是 skeleton 是何物?
我们看到skeleton变量是从base模块中进行创建的,所以将视线转到base模块
func NewSkeleton() *module.Skeleton {
skeleton := &module.Skeleton{
GoLen: conf.GoLen,
TimerDispatcherLen: conf.TimerDispatcherLen,
AsynCallLen: conf.AsynCallLen,
ChanRPCServer: chanrpc.NewServer(conf.ChanRPCLen),
}
skeleton.Init()
return skeleton
}
这里可以看到,我们使用了module模块中的Skeleton结构初始化了一个Skeleton的对象,并指定了一些参数,并在返回前调用了其Init函数。
这个Init函数又是怎样的呢?
func (s *Skeleton) Init() {
if s.GoLen <= 0 {
s.GoLen = 0
}
if s.TimerDispatcherLen <= 0 {
s.TimerDispatcherLen = 0
}
if s.AsynCallLen <= 0 {
s.AsynCallLen = 0
}
s.g = g.New(s.GoLen)
s.dispatcher = timer.NewDispatcher(s.TimerDispatcherLen)
s.client = chanrpc.NewClient(s.AsynCallLen)
s.server = s.ChanRPCServer
if s.server == nil {
s.server = chanrpc.NewServer(0)
}
s.commandServer = chanrpc.NewServer(0)
}
原来这里是对上面传入的参数进行使用,进行一些相关的chanrpc的初始化
我们看看这个skeleton的结构体中包含了哪些属性
type Skeleton struct {
GoLen int
TimerDispatcherLen int
AsynCallLen int
ChanRPCServer *chanrpc.Server
g *g.Go
dispatcher *timer.Dispatcher
client *chanrpc.Client
server *chanrpc.Server
commandServer *chanrpc.Server
}
可以暂且不用理会其中的参数到底是什么意思,我们还没到这一步,这里先有个大致的印象。
转了一圈,module.Init的方法总算看完了
初始化了,就得跑的起来,接下来就来看看这些module是如何Run起来的。
看了上面的代码,发现这些模块中没有Run方法啊,这Run从哪儿来的?
还记得前文说过,game,gate,login这些模块都实现Module这个接口。
我们先看gate,game和login是一致的。
gate模块:
上面已经说到gate模块是包含了一个Gate的指针变量,这里是内嵌形式,所以这里定义的module方法也继承了Gate模块中的相关定义。我们可以看到,gate模块中已经有了OnInit方法,还差Run和OnDestroy方法了。所以我们将视线转到Gate模块中。在这里我们可以看到这里是实现了Run和OnDestroy方法的。
func (gate *Gate) Run(closeSig chan bool) {
var wsServer *network.WSServer
if gate.WSAddr != "" {
wsServer = new(network.WSServer)
wsServer.Addr = gate.WSAddr
wsServer.MaxConnNum = gate.MaxConnNum
wsServer.PendingWriteNum = gate.PendingWriteNum
wsServer.MaxMsgLen = gate.MaxMsgLen
wsServer.HTTPTimeout = gate.HTTPTimeout
wsServer.CertFile = gate.CertFile
wsServer.KeyFile = gate.KeyFile
wsServer.NewAgent = func(conn *network.WSConn) network.Agent {
a := &agent{conn: conn, gate: gate}
if gate.AgentChanRPC != nil {
gate.AgentChanRPC.Go("NewAgent", a)
}
return a
}
}
var tcpServer *network.TCPServer
if gate.TCPAddr != "" {
tcpServer = new(network.TCPServer)
tcpServer.Addr = gate.TCPAddr
tcpServer.MaxConnNum = gate.MaxConnNum
tcpServer.PendingWriteNum = gate.PendingWriteNum
tcpServer.LenMsgLen = gate.LenMsgLen
tcpServer.MaxMsgLen = gate.MaxMsgLen
tcpServer.LittleEndian = gate.LittleEndian
tcpServer.NewAgent = func(conn *network.TCPConn) network.Agent {
a := &agent{conn: conn, gate: gate}
if gate.AgentChanRPC != nil {
gate.AgentChanRPC.Go("NewAgent", a)
}
return a
}
}
if wsServer != nil {
wsServer.Start()
}
if tcpServer != nil {
tcpServer.Start()
}
<-closeSig
if wsServer != nil {
wsServer.Close()
}
if tcpServer != nil {
tcpServer.Close()
}
}
下面来重点看看Gate中的Run方法做了什么。首先是看看有没有配置wsserver和tcpserver的地址,看看是否进行对应的服务器初始化。初始化后就调用各自的Start方法开始服务器的监听,最后等待close信号结束。
在game和login中又如何呢?上文分析到,game和login都是从Skeleton中进行衍生来的,所以说,Skeleton就是一个game和login之流的"模板生成器"。所以要看Run方法,就得到Skeleton模块中去看。
func (s *Skeleton) Run(closeSig chan bool) {
for {
select {
case <-closeSig:
s.commandServer.Close()
s.server.Close()
for !s.g.Idle() || !s.client.Idle() {
s.g.Close()
s.client.Close()
}
return
case ri := <-s.client.ChanAsynRet:
s.client.Cb(ri)
case ci := <-s.server.ChanCall:
s.server.Exec(ci)
case ci := <-s.commandServer.ChanCall:
s.commandServer.Exec(ci)
case cb := <-s.g.ChanCb:
s.g.Cb(cb)
case t := <-s.dispatcher.ChanTimer:
t.Cb()
}
}
}
这里可以看到,Skeleton模块中的Run方法就是在其中定义的一些RPC通道上进行等待通知(不知对前文的Skeleton的结构是否还有印象,这里用到了其中定义的一些ChanRPC变量)。这里也需要留有印象,因为我们在后面去分析这些通道之间是如何通信时要回过头来看看。
到这里,各个模块的Run方法也看完了。脑中应该有个大致的印象了吧。没有的话就再多理理上面的分析。
下面就来慢慢理清楚这个2个问题。
在上文中,有看到Gate的Run方法中的Server的初始化和Start,这里我们只介绍TCPServer,WSServer与之类似,故不再单独说明。
先上TCPServer初始化的代码
var tcpServer *network.TCPServer
if gate.TCPAddr != "" {
tcpServer = new(network.TCPServer)
tcpServer.Addr = gate.TCPAddr
tcpServer.MaxConnNum = gate.MaxConnNum
tcpServer.PendingWriteNum = gate.PendingWriteNum
tcpServer.LenMsgLen = gate.LenMsgLen
tcpServer.MaxMsgLen = gate.MaxMsgLen
tcpServer.LittleEndian = gate.LittleEndian
tcpServer.NewAgent = func(conn *network.TCPConn) network.Agent {
a := &agent{conn: conn, gate: gate}
if gate.AgentChanRPC != nil {
gate.AgentChanRPC.Go("NewAgent", a)
}
return a
}
}
这里是对TCPServer结构的相关参数的初始化,这在我们说明Start方法前需要先进行了解的。
在network/tcp_server.go代码中,我们可以看到TCPServer的结构定义
type TCPServer struct {
Addr string
MaxConnNum int
PendingWriteNum ints
NewAgent func(*TCPConn) Agent
ln net.Listener
conns ConnSet
mutexConns sync.Mutex
wgLn sync.WaitGroup
wgConns sync.WaitGroup
--------- 这里是分割线--------------------------
// msg parser
LenMsgLen int
MinMsgLen uint32
MaxMsgLen uint32
LittleEndian bool
msgParser *MsgParser
}
这里定义了TCPServer的相关参数,上半部分为服务器相关的参数,下半部分为RPC协议相关的参数。
在初始化TCPServer后,就调用Start方法。在Start方法中,可以看到调用了init方法,并启动了一个goroutine来等待接收客户端的连接,代码如下:
func (server *TCPServer) Start() {
server.init()
go server.run()
}
在run方法中,要分析下在接收到了一个客户端的连接后,服务器是怎么处理的。截取的部分代码如下:
server.mutexConns.Lock()
if len(server.conns) >= server.MaxConnNum {
server.mutexConns.Unlock()
conn.Close()
log.Debug("too many connections")
continue
}
server.conns[conn] = struct{}{}
server.mutexConns.Unlock()
server.wgConns.Add(1)
tcpConn := newTCPConn(conn, server.PendingWriteNum, server.msgParser)
agent := server.NewAgent(tcpConn)
go func() {
agent.Run()
// cleanup
tcpConn.Close()
server.mutexConns.Lock()
delete(server.conns, conn)
server.mutexConns.Unlock()
agent.OnClose()
server.wgConns.Done()
}()
可以看到,在接收连接后,将conn保存到TCPServer的conns域中,并且调用方法newTCPConn,生成一个客户端在服务器端的一个连接对象(个人见解),并用这个连接对象生成一个agent,并且我们会启动一个goroutine来跑这个agent,即运行agent的Run方法
我们先来看看newTCPConn中做了什么事情
func newTCPConn(conn net.Conn, pendingWriteNum int, msgParser *MsgParser) *TCPConn {
tcpConn := new(TCPConn)
tcpConn.conn = conn
tcpConn.writeChan = make(chan []byte, pendingWriteNum)
tcpConn.msgParser = msgParser
go func() {
for b := range tcpConn.writeChan {
if b == nil {
break
}
_, err := conn.Write(b)
if err != nil {
break
}
}
conn.Close()
tcpConn.Lock()
tcpConn.closeFlag = true
tcpConn.Unlock()
}()
return tcpConn
}
首先根据传入的参数,初始化了一个TCPConn对象,并启动了一个goroutine,这个地方的goroutine主要做的工作就是守候在写通道上,当有消息来时,将消息从写通道中取出,并发送出去。也就是说,服务器与客户端的交换消息时,会将消息写入到这个通道中,并在这里进行发送。
接下来,在看看NewAgent函数
这里的NewAgent便是我们在前文看到初始化TCPServer时的NewAgent方法。
这里需要留意下NewAgent函数的返回值类型是 network.Agent,传递的参数类型是 network.TCPConn
tcpServer.NewAgent = func(conn *network.TCPConn) network.Agent {
a := &agent{conn: conn, gate: gate}
if gate.AgentChanRPC != nil {
gate.AgentChanRPC.Go("NewAgent", a)
}
return a
}
上面的代码,定义了一个agent的对象,并将其进行传递。
再来看看这个agent是如何定义的呢。视线转到gate模块中
type agent struct {
conn network.Conn
gate *Gate
userData interface{}
}
这里我们看到,这个agent中的conn的类型是network.Conn,而上面传入的参数却是 network.TCPConn。
我们再看看network.Conn的定义
type Agent interface {
WriteMsg(msg interface{})
LocalAddr() net.Addr
RemoteAddr() net.Addr
Close()
Destroy()
UserData() interface{}
SetUserData(data interface{})
}
network.Conn是一个接口,也就是说, network.TCPConn这个结构是实现了network.Conn这个接口的所有方法的。通过查看network.TCPConn的代码也可以证明这一点。
这里先记住一点,network.TCPConn是实现了接口network.Conn的。
再看看返回值,network.Agent的定义如下:
type Agent interface {
Run()
OnClose()
}
这也是个接口。所以再记住一点,gate模块中的agent结构实现了network.Agent接口。并且在之后会调用agent的Run和OnClose方法
并且这里再多说一点,在这个NewAgent函数中,我们会向AgentChanRPC的通道写入一个调用,调用的名称为NewAgent,参数为a即当前的agent。这个AgentChanRPC最终又会走向哪里呢?还记得前文的代码吗?
type Module struct {
*gate.Gate
}
func (m *Module) OnInit() {
m.Gate = &gate.Gate{
MaxConnNum: conf.Server.MaxConnNum,
PendingWriteNum: conf.PendingWriteNum,
MaxMsgLen: conf.MaxMsgLen,
WSAddr: conf.Server.WSAddr,
HTTPTimeout: conf.HTTPTimeout,
CertFile: conf.Server.CertFile,
KeyFile: conf.Server.KeyFile,
TCPAddr: conf.Server.TCPAddr,
LenMsgLen: conf.LenMsgLen,
LittleEndian: conf.LittleEndian,
Processor: msg.Processor,
AgentChanRPC: game.ChanRPC,
}
}
在最后一行,我们看到了这个AgentChanRPC是被哪个模块所初始化的,所以这里最终的走向是到game模块中。再看game模块,
func init() {
skeleton.RegisterChanRPC("NewAgent", rpcNewAgent)
skeleton.RegisterChanRPC("CloseAgent", rpcCloseAgent)
}
func rpcNewAgent(args []interface{}) {
a := args[0].(gate.Agent)
_ = a
}
func rpcCloseAgent(args []interface{}) {
a := args[0].(gate.Agent)
_ = a
}
我们看到这里注册了2个方法,其中之一就是NewAgent方法,以及其对应的回调函数。所以说,在Gate模块的server.NewAgent的函数调用中,我们最终会通知到game模块的rpcNewAgent函数里。在这里传入的参数就是我们调用时传入的agent。至于为什么是以 args[0]这样的方式去获取,我们在后面的内容中分析ChanRPC时会讲到。顺便再多说一点,在上面gate模块的OnInit方法中的Processor的初始化可以看到是由 msg.Processor进行初始化的。msg这个模块我一直没有提到,直到现在开始粉墨登场了。
在rpcNewAgent函数中,获取到了第一个参数,其类型为gate模块中的Agent,再看看其定义如何:
type Agent interface {
WriteMsg(msg interface{})
LocalAddr() net.Addr
RemoteAddr() net.Addr
Close()
Destroy()
UserData() interface{}
SetUserData(data interface{})
}
发现这也是个接口。而我们在
tcpServer.NewAgent = func(conn *network.TCPConn) network.Agent {
a := &agent{conn: conn, gate: gate}
if gate.AgentChanRPC != nil {
gate.AgentChanRPC.Go("NewAgent", a)
}
return a
}
这里传入的参数是一个agent结构。通过从gate模块中的agent的相关实现看出,agent也是实现了gate.Agent接口的。所以这里还需要记住一点,agent结构,既实现了gate.Agent接口,也实现了network.Agent接口。在game模块中,我们一般使用的是gate模块中的Agent接口。
接下来再继续往下看看agent.Run函数中做了什么事情。
func (a *agent) Run() {
for {
data, err := a.conn.ReadMsg()
if err != nil {
log.Debug("read message: %v", err)
break
}
if a.gate.Processor != nil {
msg, err := a.gate.Processor.Unmarshal(data)
if err != nil {
log.Debug("unmarshal message error: %v", err)
break
}
err = a.gate.Processor.Route(msg, a)
if err != nil {
log.Debug("route message error: %v", err)
break
}
}
}
}
这里可以看到是tcpconn对象不停的去read从客户端来的消息,并将消息交由gate模块的Processor进行解码和Route。
到此,我们看到,从服务器接收了客户端的连接后所做的事情,大致分为2步:
第一步,建立客户端连接在服务器的一个抽象,并启动一个写的goroutine,负责对外进行写操作。
第二步,创建一个agent对象,并进行读事件的检测,当有读取到数据时,再将数据解码并路由到具体的模块去进行处理。
这里路由的模块主要是login和game模块,是多个goroutine之间通过通道进行交互。传递的参数有2个,一个是消息,一个是agent对象,这里可以看个简单的例子:
func init() {
// 向当前模块(game 模块)注册 Hello 消息的消息处理函数 handleHello
handler(&msg.Hello{}, handleHello)
}
func handler(m interface{}, h interface{}) {
skeleton.RegisterChanRPC(reflect.TypeOf(m), h)
}
func handleHello(args []interface{}) {
// 收到的 Hello 消息
m := args[0].(*msg.Hello)
// 消息的发送者
a := args[1].(gate.Agent)
// 输出收到的消息的内容
log.Debug("hello %v", m.Name)
// 给发送者回应一个 Hello 消息
a.WriteMsg(&msg.Hello{
Name: "client",
})
}
可以从handleHello这个函数中看到,我们取出数据时,第一个参数就是协议数据,第二个参数就是agent对象
讲到与客户端进行交互,那么就得说到RPC协议了。leaf中提供了2种协议,一个是json,一个是protobuf(简称pb)。
那么有个问题是,这些协议如何相互兼容呢?
还记得前文提到过,TCPServer的结构体,我把它说成了分为上下2部分,一部分是服务器相关的,一部分是协议相关的。
协议相关的部分,有几个参数,分别为,包体的长度定义,最小包体,最大包体,大小端,以及一个解析器。
在tcpserver的init方法中,在最下部为协议解析器的初始化,有个方法为 NewMsgParser,其实现如下:
func NewMsgParser() *MsgParser {
p := new(MsgParser)
p.lenMsgLen = 2
p.minMsgLen = 1
p.maxMsgLen = 4096
p.littleEndian = false
return p
}
其返回了一个结构,这个结构包含的数据正如前文所说。并指定了默认值,包体长度有2个字节表示,最小长度为1,最大长度为4096,默认为大端序
这个MsgParser结构提供了一些方法,这里只对这个模块种的Read和Write方法进行说明。
Read方法是从客户端连接中获取指定的包体大小的数据到缓冲中,并按照大小端序进行解析包体的实际大小。即先获取包体大小,进行大小端序的解码,再根据实际的包体大小读出实际数据,并返回。
Write方法是Read方法的一个逆过程。
读和写理清楚了,就说如何实际来进行操作。
这里又回到了TCPConn模块中了,其实现了network.Conn接口,其中有2个函数,ReadMsg和WriteMsg
type Conn interface {
ReadMsg() ([]byte, error)
WriteMsg(args ...[]byte) error
LocalAddr() net.Addr
RemoteAddr() net.Addr
Close()
Destroy()
}
我们之前说到,network.TCPConn这个结构实现了network.Conn接口,所以在TCPConn中的实现为:
func (tcpConn *TCPConn) ReadMsg() ([]byte, error) {
return tcpConn.msgParser.Read(tcpConn)
}
func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error {
return tcpConn.msgParser.Write(tcpConn, args...)
}
这里我们看到,我们实际调用了msgParse中的Read和Write方法。在这之后,ReadMsg方法将会在agent.Run中进行调用,以从连接冲获取数据,WriteMsg方法会在agent.WriteMsg方法中进行调用,以将消息写入到writeChan通道中,并最终发送出去。
在这2个过程中会涉及到消息的解码和编码。也就是在上文提到过的粉墨登场的msg模块。在Gate中,有一个Processor的接口定义,这个接口在Gate初始化时被初始化为了msg.Processor。再看看msg中是如何进行定义的。这个msg是leafServer提供的一个例子:
import (
"github.com/name5566/leaf/network/json"
)
var Processor = json.NewProcessor()
func init() {
Processor.Register(&Hello{})
}
type Hello struct {
Name string
}
可以看到,这里是使用json协议来进行初始化的。
到这里,应该就很清晰了,leaf在处理不同的RPC协议之间是如何来做的了。也就是需要实现network.Processor这个接口。我们可以看到在network/json和network/protobuf中也都是如此。
再说点实际的东西,也是leaf的作者在wiki里面举的例子。
在定义了Processor后,还需要用此来进行消息的处理,通过register方法可在processor中注册相关的协议,并在gate模块处,对注册的协议进行路由的选择,如下:
func init() {
// 指定消息路由到哪个模块
msg.Processor.SetRouter(&msg.Hello{}, game.ChanRPC)
}
至此,整个网络部分的分析就结束了。我想第一个问题也应该解决了。
下面就来说说第二个问题,各个模块之间的goroutine是如何进行通信的。即ChanRPC
再看看下面这段代码:
func (s *Skeleton) Run(closeSig chan bool) {
for {
select {
case <-closeSig:
s.commandServer.Close()
s.server.Close()
for !s.g.Idle() || !s.client.Idle() {
s.g.Close()
s.client.Close()
}
return
case ri := <-s.client.ChanAsynRet:
s.client.Cb(ri)
case ci := <-s.server.ChanCall:
s.server.Exec(ci)
case ci := <-s.commandServer.ChanCall:
s.commandServer.Exec(ci)
case cb := <-s.g.ChanCb:
s.g.Cb(cb)
case t := <-s.dispatcher.ChanTimer:
t.Cb()
}
}
}
在这里,遍寻各个通道,等待事件的发生并返回,然后进行处理。这里可以理解为消费者,那么生产者在哪里呢?
type Skeleton struct {
GoLen int
TimerDispatcherLen int
AsynCallLen int
ChanRPCServer *chanrpc.Server
g *g.Go
dispatcher *timer.Dispatcher
client *chanrpc.Client
server *chanrpc.Server
commandServer *chanrpc.Server
}
还是这个Skeleton的定义。抛开前面3个参数暂且不看,后面的都跟通道相关。
先说简单点的g.Go这个结构
这是g包里面相关的定义:
type Go struct {
ChanCb chan func()
pendingGo int
}
type LinearGo struct {
f func()
cb func()
}
type LinearContext struct {
g *Go
linearGo *list.List
mutexLinearGo sync.Mutex
mutexExecution sync.Mutex
}
再往下继续看代码;
func (g *Go) Go(f func(), cb func()) {
g.pendingGo++
go func() {
defer func() {
g.ChanCb <- cb
if r := recover(); r != nil {
if conf.LenStackBuf > 0 {
buf := make([]byte, conf.LenStackBuf)
l := runtime.Stack(buf, false)
log.Error("%v: %s", r, buf[:l])
} else {
log.Error("%v", r)
}
}
}()
f()
}()
}
看到这里时,我们看到g包中的Go结构体的方法Go传入了2个函数作为参数,第一个参数是在其中启动的goroutine中执行的,而第二个参数是被放到了ChanCb中了。**而这个ChanCb是在Run中进行监测并获取,并调用Cb来进行调用的。**再看看Cb的实现:
func (g *Go) Cb(cb func()) {
defer func() {
g.pendingGo--
if r := recover(); r != nil {
if conf.LenStackBuf > 0 {
buf := make([]byte, conf.LenStackBuf)
l := runtime.Stack(buf, false)
log.Error("%v: %s", r, buf[:l])
} else {
log.Error("%v", r)
}
}
}()
if cb != nil {
cb()
}
}
这就是从ChanCb中取出回调并执行嘛。如此g包中的Go结构体的方法就理清楚了。调用Go方法时,先启动一个goroutine执行传入的第一个参数,并在goroutine将结束时,将第二个参数放入ChanCb中。然后这个ChanCb会被Skeleton.Run函数中给监测到,继而调用其Cb函数,也就是执行我们调用Go方法时传入的回调函数.
举个实际的例子,这个例子也是leaf中的测试例子:
d := g.New(10)
// go 1
var res int
d.Go(func() {
fmt.Println("1 + 1 = ?")
res = 1 + 1
}, func() {
fmt.Println(res)
})
d.Cb(<-d.ChanCb)
执行的流程就是:
1.先执行调用Go传入的第一个参数(一个匿名函数),并在该函数返回前将传入的第二个参数放入到ChanCb中,以待取出。
2.<-d.ChanCb 会将其中的数据取出,并放入到Cb函数的参数中
3.Cb函数执行传入的回调函数。
以上的过程是在只有一个Go的调用情况下的一个例子,当有多个Go的调用时,并不能确定哪个先会被回调,故而此种时一个并行的调用,其结果不保证先后。
另外,再注意下Close方法,在这个方法中,如果当前有还未从ChanCb中取出的回调函数,则会进行回调。
既然g.Go提供了一种并行的调用方式,g.LinearGo 则提供了一种串行化的调用方式了(从其用词上也可看出,手动滑稽),g.LinearGo 将严格按照调用的顺序来进行回调的处理顺序。
看个例子:
c := d.NewLinearContext()
c.Go(func() {
time.Sleep(time.Second / 2)
fmt.Println("1")
}, nil)
c.Go(func() {
fmt.Println("2")
}, nil)
不管第一个函数的执行需要多久,第二个函数都会等待其执行完再执行。
timer.Dispatcher参数主要提供的是定时器相关的通信,简单点说就是,提前给自己安排个日程,到了那个点,你就得去干哪个事了(执行回调)。这里就不展开去看了
最后着重来分析下 chanrpc.Server 和 chanrpc.Client
先看看其定义
type Server struct {
// id -> function
//
// function:
// func(args []interface{})
// func(args []interface{}) interface{}
// func(args []interface{}) []interface{}
functions map[interface{}]interface{}
ChanCall chan *CallInfo
}
Server结构中的functions变量,主要是事件名称<–>函数的一个映射。ChanCall是调用时的信息,这就是个缓冲队列,一个个的事件排着队的等着执行呢。
type CallInfo struct {
f interface{}
args []interface{}
chanRet chan *RetInfo
cb interface{}
}
调用信息包含:调用的函数是什么,参数有哪些,返回值是一个通道,也就是说,返回值将写入到chanRet中,cb是回调函数
type RetInfo struct {
// nil
// interface{}
// []interface{}
ret interface{}
err error
// callback:
// func(err error)
// func(ret interface{}, err error)
// func(ret []interface{}, err error)
cb interface{}
}
client的结构定义
type Client struct {
s *Server
chanSyncRet chan *RetInfo
ChanAsynRet chan *RetInfo
pendingAsynCall int
}
这里有Server又有Client,是什么意思呢?简单点说,Server负责注册事件和处理函数,Client负责构造参数并发起对这些事件的调用。Client的调用分为2种方式,同步和异步。Server还提供了一个Go方式的调用,该调用就是将调用参数构造好,并写入到ChanCall通道中。其实Client的调用也同样如此。
所以大致的流程如下:
ChanRpc.NewServer —> ChanRpc.Register —> ChanRpc.Server.ChanCall/ChanRpc.Server.client.ChanAsynRet —> Call*/Go
具体描述为:
1.初始化一个ChanRPC的server。
2.向其注册相关的事件和函数(server就是管理这个的)。
3.所有的调用参数都是通过ChanCall通道进行传递的,也就是说,将调用传入的参数包装成CallInfo对象,将其写入ChanCall队列中。
4.当ChanCall通道中有数据时,在Run方法中就会被接收并执行server的Exec方法,执行调用信息。
所以前文提到的生产者是什么?就是产生这些CallInfo对象的调用者。而消费者就是这个Exec方法
调用可能时同步的,也可能是异步的。同步的调用会阻塞并等待处理后而返回;而异步的调用会将消息返回的结构写入到ChanAsynRet通道中,再通过Run方法中对该通道进行监测,如果有数据(返回了)就执行调用时传入的Cb进行处理。
前文中,CallInfo结构中的chanRet变量,其实际对应的是Client结构中的同步和异步通道。这里发生的转换是在构造参数进行调用时进行指定的,代码如下:
同步调用代码:
func (c *Client) Call1(id interface{}, args ...interface{}) (interface{}, error) {
f, err := c.f(id, 1)
if err != nil {
return nil, err
}
err = c.call(&CallInfo{
f: f,
args: args,
chanRet: c.chanSyncRet,
}, true)
if err != nil {
return nil, err
}
ri := <-c.chanSyncRet
return ri.ret, ri.err
}
可以看到,在构建调用参数时,就进行了参数的传递,并且其结果是进行同步返回的
call0,call1,callN是指返回的参数的个数。
异步调用代码:
func (c *Client) asynCall(id interface{}, args []interface{}, cb interface{}, n int) {
f, err := c.f(id, n)
if err != nil {
c.ChanAsynRet <- &RetInfo{err: err, cb: cb}
return
}
err = c.call(&CallInfo{
f: f,
args: args,
chanRet: c.ChanAsynRet,
cb: cb,
}, false)
if err != nil {
c.ChanAsynRet <- &RetInfo{err: err, cb: cb}
return
}
}
异步调用时,chanRet指代的是ChanAsynRet通道,并有一个回调函数(这个回调函数不可省略)
以上便是对于ChanRPC中Server和Client的一个分析了。其封装设计非常值得学习。充分理解了,可以学到goroutine间是如何通过通道进行通信共享!
如此便是我对于Leaf框架的一个分析和理解了。我也是初入golang学习当中,分析这些优秀的框架,可加深学习的理论。如有误人之处,还望不吝指出。