本文简单描述一下goim的auth流程
auth成功以后,便可以接收推送过来的聊天消息
auth流程如下:
client ⇒ comet ⇒ logic ⇒ comet ⇒ client
其中client发往comet的结构为
message Proto {
int32 ver = 1 [(gogoproto.jsontag) = "ver"];
int32 op = 2 [(gogoproto.jsontag) = "op"];
int32 seq = 3 [(gogoproto.jsontag) = "seq"];
bytes body = 4 [(gogoproto.jsontag) = "body"];
}
comet大体流程是先处理client连接请求,然后通过grpc调用logic的Connect方法,具体代码参考comet的代码文件server_tcp.go。
部分代码如下:
// auth for goim handshake with client, use rsa & aes.
func (s *Server) authTCP(ctx context.Context, rr *bufio.Reader, wr *bufio.Writer, p *grpc.Proto) (mid int64, key, rid string, accepts []int32, hb time.Duration, err error) {
for {
if err = p.ReadTCP(rr); err != nil {
return
}
if p.Op == grpc.OpAuth {
break
} else {
log.Errorf("tcp request operation(%d) not auth", p.Op)
}
}
if mid, key, rid, accepts, hb, err = s.Connect(ctx, p, ""); err != nil {
log.Errorf("authTCP.Connect(key:%v).err(%v)", key, err)
return
}
p.Op = grpc.OpAuthReply
p.Body = nil
if err = p.WriteTCP(wr); err != nil {
log.Errorf("authTCP.WriteTCP(key:%v).err(%v)", key, err)
return
}
err = wr.Flush()
return
}
其中p.ReadTCP(rr)就是解析客户端过来的消息,也就是上面提到的结构Proto,紧接着调用Connect方法,这个Connect的接口声明在comet的operation.go中,如下:
// Connect connected a connection.
func (s *Server) Connect(c context.Context, p *model.Proto, cookie string) (mid int64, key, rid string, accepts []int32, heartbeat time.Duration, err error) {
reply, err := s.rpcClient.Connect(c, &logic.ConnectReq{
Server: s.serverID,
Cookie: cookie,
Token: p.Body,
})
if err != nil {
return
}
return reply.Mid, reply.Key, reply.RoomID, reply.Accepts, time.Duration(reply.Heartbeat), nil
}
在这个函数中,调用了logic提供的grpc方法Connect,其接口定义在api.proto中,如下:
message ConnectReq {
option (gogoproto.goproto_stringer) = false;
string server = 1;
string cookie = 2;
bytes token = 3;
}
message ConnectReply {
int64 mid = 1;
string key = 2;
string roomID = 3;
repeated int32 accepts = 4;
int64 heartbeat = 5;
}
service Logic {
// 省略其他接口
// Connect
rpc Connect(ConnectReq) returns (ConnectReply);
// 省略其他接口
}
通过代码可知,ConnectReq中的server是comet的ip和端口组成的字符串(可唯一标识comet),token内容为json字符串,一个示例如下:
var token = '{"mid":123, "room_id":"live://1000", "platform":"web", "accepts":[1000,1001,1002]}'
logic的Connect方法定义在logic的conn.go文件中,如下:
// Connect connected a conn.
func (l *Logic) Connect(c context.Context, server, cookie string, token []byte) (mid int64, key, roomID string, accepts []int32, hb int64, err error) {
var params struct {
Mid int64 `json:"mid"`
Key string `json:"key"`
RoomID string `json:"room_id"`
Platform string `json:"platform"`
Accepts []int32 `json:"accepts"`
}
if err = json.Unmarshal(token, ¶ms); err != nil {
log.Errorf("json.Unmarshal(%s) error(%v)", token, err)
return
}
mid = params.Mid
roomID = params.RoomID
accepts = params.Accepts
hb = int64(l.c.Node.Heartbeat) * int64(l.c.Node.HeartbeatMax)
if key = params.Key; key == "" {
key = uuid.New().String()
}
if err = l.dao.AddMapping(c, mid, key, server); err != nil {
log.Errorf("l.dao.AddMapping(%d,%s,%s) error(%v)", mid, key, server, err)
}
log.Infof("conn connected key:%s server:%s mid:%d token:%s", key, server, mid, token)
return
}
函数中调用了AddMapping方法,如下:
// AddMapping add a mapping.
// Mapping:
// mid -> key_server
// key -> server
func (d *Dao) AddMapping(c context.Context, mid int64, key, server string) (err error) {
conn := d.redis.Get()
defer conn.Close()
var n = 2
if mid > 0 {
if err = conn.Send("HSET", keyMidServer(mid), key, server); err != nil {
log.Errorf("conn.Send(HSET %d,%s,%s) error(%v)", mid, server, key, err)
return
}
if err = conn.Send("EXPIRE", keyMidServer(mid), d.redisExpire); err != nil {
log.Errorf("conn.Send(EXPIRE %d,%s,%s) error(%v)", mid, key, server, err)
return
}
n += 2
}
if err = conn.Send("SET", keyKeyServer(key), server); err != nil {
log.Errorf("conn.Send(HSET %d,%s,%s) error(%v)", mid, server, key, err)
return
}
if err = conn.Send("EXPIRE", keyKeyServer(key), d.redisExpire); err != nil {
log.Errorf("conn.Send(EXPIRE %d,%s,%s) error(%v)", mid, key, server, err)
return
}
if err = conn.Flush(); err != nil {
log.Errorf("conn.Flush() error(%v)", err)
return
}
for i := 0; i < n; i++ {
if _, err = conn.Receive(); err != nil {
log.Errorf("conn.Receive() error(%v)", err)
return
}
}
return
}
此函数的作用是在redis中生成一个两个结构,如注释中所说,
// mid -> key_server
// key -> server
具体来说第一个mapping是
key:mid
value: key和server的hash
第二个mapping是
key: key
value:server
有了这两个mapping,就可以根据mid找到对应的key,然后实现向玩家不同key的推送
第二个mapping,可以实现根据key找到这个key所在的comet的ip和端口
ps:
个人理解,这里mid可以理解为玩家账号id
key可以理解为这个账号打开多个网页后,每个的唯一标识
以上是goim的auth流程
这里很容易想到一个问题,comet既然是实现推送的,那它肯定需要维护一个key到tcp连接的map,这样才可以实现根据key向某个连接推送消息,代码如下:
// Bucket is a channel holder.
type Bucket struct {
c *conf.Bucket
cLock sync.RWMutex // protect the channels for chs
chs map[string]*Channel // map sub key to a channel
// room
rooms map[string]*Room // bucket room channels
routines []chan *grpc.BroadcastRoomReq
routinesNum uint64
ipCnts map[string]int32
}
Bucket中的chs就是这个map。
这个map的修改接口主要有两个Put和Del,其中Put是向Map中加入数据,代码如下:
// Put put a channel according with sub key.
func (b *Bucket) Put(rid string, ch *Channel) (err error) {
var (
room *Room
ok bool
)
b.cLock.Lock()
// close old channel
if dch := b.chs[ch.Key]; dch != nil {
dch.Close()
}
b.chs[ch.Key] = ch
if rid != "" {
if room, ok = b.rooms[rid]; !ok {
room = NewRoom(rid)
b.rooms[rid] = room
}
ch.Room = room
}
b.ipCnts[ch.IP]++
b.cLock.Unlock()
if room != nil {
err = room.Put(ch)
}
return
}
代码中的rid代表roomId
Put的调用代码如下:
// must not setadv, only used in auth
step = 1
if p, err = ch.CliProto.Set(); err == nil {
if ch.Mid, ch.Key, rid, accepts, hb, err = s.authTCP(ctx, rr, wr, p); err == nil {
ch.Watch(accepts...)
b = s.Bucket(ch.Key)
err = b.Put(rid, ch)
if conf.Conf.Debug {
log.Infof("tcp connnected key:%s mid:%d proto:%+v", ch.Key, ch.Mid, p)
}
}
}
// ch.CliProto是一个ring buffer,我理解这里的主要目的是提升接收数据的效率,避免多余的拷贝
也就是在访问logic的Connect后,这里使用Bucket的一个目的应该是为了提高效率,
b = s.Bucket(ch.Key)
根据key便可得到一个唯一的Bucket,代码如下:
// Bucket get the bucket by subkey.
func (s *Server) Bucket(subKey string) *Bucket {
idx := cityhash.CityHash32([]byte(subKey), uint32(len(subKey))) % s.bucketIdx
if conf.Conf.Debug {
log.Infof("%s hit channel bucket index: %d use cityhash", subKey, idx)
}
return s.buckets[idx]
}
其中使用了google开源的cityhash算法。
接下来看看Del的代码
// Del delete the channel by sub key.
func (b *Bucket) Del(dch *Channel) {
var (
ok bool
ch *Channel
room *Room
)
b.cLock.Lock()
if ch, ok = b.chs[dch.Key]; ok {
room = ch.Room
if ch == dch {
delete(b.chs, ch.Key)
}
// ip counter
if b.ipCnts[ch.IP] > 1 {
b.ipCnts[ch.IP]--
} else {
delete(b.ipCnts, ch.IP)
}
}
b.cLock.Unlock()
if room != nil && room.Del(ch) {
// if empty room, must delete from bucket
b.DelRoom(room)
}
}
Del的调用代码如下:
b.Del(ch)
具体是在ServeTCP函数中。
个人觉得这里就是go比较优雅的地方了,处理tcp,每个连接用一个协程处理,当协程退出时,也就是连接需要断开了,简单直接,代码如下:
// Accept accepts connections on the listener and serves requests
// for each incoming connection. Accept blocks; the caller typically
// invokes it in a go statement.
func acceptTCP(server *Server, lis *net.TCPListener) {
var (
conn *net.TCPConn
err error
r int
)
for {
if conn, err = lis.AcceptTCP(); err != nil {
// if listener close then return
log.Errorf("listener.Accept(\"%s\") error(%v)", lis.Addr().String(), err)
return
}
if err = conn.SetKeepAlive(server.c.TCP.KeepAlive); err != nil {
log.Errorf("conn.SetKeepAlive() error(%v)", err)
return
}
if err = conn.SetReadBuffer(server.c.TCP.Rcvbuf); err != nil {
log.Errorf("conn.SetReadBuffer() error(%v)", err)
return
}
if err = conn.SetWriteBuffer(server.c.TCP.Sndbuf); err != nil {
log.Errorf("conn.SetWriteBuffer() error(%v)", err)
return
}
go serveTCP(server, conn, r)
if r++; r == maxInt {
r = 0
}
}
}
其中代码
go serveTCP(server, conn, r)
就是用来处理每个连接的地方,具体又转调用了ServeTCP,如下:
func serveTCP(s *Server, conn *net.TCPConn, r int) {
var (
// timer
tr = s.round.Timer(r)
rp = s.round.Reader(r)
wp = s.round.Writer(r)
// ip addr
lAddr = conn.LocalAddr().String()
rAddr = conn.RemoteAddr().String()
)
if conf.Conf.Debug {
log.Infof("start tcp serve \"%s\" with \"%s\"", lAddr, rAddr)
}
s.ServeTCP(conn, rp, wp, tr)
}
也就是说ServeTCP就是处理tcp的函数,此函数退出,连接断开,协程结束。
既然我们已经知道了key和tcp的Channel如何关联的,那么根据key向channel推送消息的代码也就很明了了,如下:
// PushMsg push a message to specified sub keys.
func (s *server) PushMsg(ctx context.Context, req *pb.PushMsgReq) (reply *pb.PushMsgReply, err error) {
if len(req.Keys) == 0 || req.Proto == nil {
return nil, errors.ErrPushMsgArg
}
for _, key := range req.Keys {
if channel := s.srv.Bucket(key).Channel(key); channel != nil {
if !channel.NeedPush(req.ProtoOp) {
continue
}
if err = channel.Push(req.Proto); err != nil {
return
}
}
}
return &pb.PushMsgReply{}, nil
}
从Bucket中找到channel,然后调用channel的Push方法,Push方法的实现也很直接,如下:
// Push server push message.
func (c *Channel) Push(p *grpc.Proto) (err error) {
select {
case c.signal <- p:
default:
}
return
}
直接把要推送的消息写入管道c.signal中。
而具体处理推送的代码也很直接,在server_tcp.go文件中,如下:
// dispatch accepts connections on the listener and serves requests
// for each incoming connection. dispatch blocks; the caller typically
// invokes it in a go statement.
func (s *Server) dispatchTCP(conn *net.TCPConn, wr *bufio.Writer, wp *bytes.Pool, wb *bytes.Buffer, ch *Channel) {
var (
err error
finish bool
online int32
white = whitelist.Contains(ch.Mid)
)
if conf.Conf.Debug {
log.Infof("key: %s start dispatch tcp goroutine", ch.Key)
}
for {
if white {
whitelist.Printf("key: %s wait proto ready\n", ch.Key)
}
var p = ch.Ready()
if white {
whitelist.Printf("key: %s proto ready\n", ch.Key)
}
if conf.Conf.Debug {
log.Infof("key:%s dispatch msg:%v", ch.Key, *p)
}
switch p {
case grpc.ProtoFinish:
if white {
whitelist.Printf("key: %s receive proto finish\n", ch.Key)
}
if conf.Conf.Debug {
log.Infof("key: %s wakeup exit dispatch goroutine", ch.Key)
}
finish = true
goto failed
case grpc.ProtoReady:
// fetch message from svrbox(client send)
for {
if p, err = ch.CliProto.Get(); err != nil {
break
}
if white {
whitelist.Printf("key: %s start write client proto%v\n", ch.Key, p)
}
if p.Op == grpc.OpHeartbeatReply {
if ch.Room != nil {
online = ch.Room.OnlineNum()
}
if err = p.WriteTCPHeart(wr, online); err != nil {
goto failed
}
} else {
if err = p.WriteTCP(wr); err != nil {
goto failed
}
}
if white {
whitelist.Printf("key: %s write client proto%v\n", ch.Key, p)
}
p.Body = nil // avoid memory leak
ch.CliProto.GetAdv()
}
default:
if white {
whitelist.Printf("key: %s start write server proto%v\n", ch.Key, p)
}
// server send
if err = p.WriteTCP(wr); err != nil {
goto failed
}
if white {
whitelist.Printf("key: %s write server proto%v\n", ch.Key, p)
}
if conf.Conf.Debug {
log.Infof("tcp sent a message key:%s mid:%d proto:%+v", ch.Key, ch.Mid, p)
}
}
if white {
whitelist.Printf("key: %s start flush \n", ch.Key)
}
// only hungry flush response
if err = wr.Flush(); err != nil {
break
}
if white {
whitelist.Printf("key: %s flush\n", ch.Key)
}
}
failed:
if white {
whitelist.Printf("key: %s dispatch tcp error(%v)\n", ch.Key, err)
}
if err != nil {
log.Errorf("key: %s dispatch tcp error(%v)", ch.Key, err)
}
conn.Close()
wp.Put(wb)
// must ensure all channel message discard, for reader won't blocking Signal
for !finish {
finish = (ch.Ready() == grpc.ProtoFinish)
}
if conf.Conf.Debug {
log.Infof("key: %s dispatch goroutine exit", ch.Key)
}
}
dispatchTCP代码比较长,它的主要工作就是派发消息,也是一个单独的协程,调用如下:
// hanshake ok start dispatch goroutine
go s.dispatchTCP(conn, wr, wp, wb, ch)
dispatchTcp中发送消息的主要代码如下:
var p = ch.Ready()
Read的实现为:
// Ready check the channel ready or close?
func (c *Channel) Ready() *grpc.Proto {
return <-c.signal
}
很明显,此处会造成协程阻塞,知道另一个协程调用了Push,dispatchTCP所在的协程才会继续执行,Ready的返回值p中保存了要发给客户端的消息,类型为grpc.Proto,跟client向comet发送的消息结构是一样的。
这里还要注意dispatchTcp中的switch语句,如下:
switch p {
case grpc.ProtoFinish:
// 此处省略
// ......
case grpc.ProtoReady:
// fetch message from svrbox(client send)
// 此处省略
// ......
default:
// 此处省略
// ......
}
default中的代码是comet向client写入消息,而grpc.ProtoReady是指接收到客户端的消息,触发的地方是在ServeTCP中的ch.Signal()代码,实现如下:
// Signal send signal to the channel, protocol ready.
func (c *Channel) Signal() {
c.signal <- grpc.ProtoReady
}
也就是说,case grpc.ProtoReady:是处理client上行消息的地方,只不过现在默认的处理是把收到的消息再返回给client,网上很多人的一个问题是goim为什么不支持全双工,其实要支持也很容易,只要改这里就可以。