前言
该IM模块是使用golang开发,性能优秀,而且提供全套的客户端,服务端程序,就是说如果要快速拥有一套完整的IM系统,这个IM是很不错的选择.
IM分为三个模块,分别为:
im模块---用于消息分发,可以起多个模块做分布式,通过LVS实现负载均衡.
imr模块---用于做im模块的路由,多个im之间通过imr来搜索各个用户的存在.
ims模块---用于做消息的顺序存储,不停的存储在message_x文件,x代表数字,每个文件存满默认128M之后,会存到下一个文件,x数字递增.
im模块
--模块目录
带小箭头的文件为几个模块共用文件,解析放在im模块;benchmark是测试文件;Makefile是用来编译的文件.
im消息处理:
用户A->B:
1.SaveMessage:保存消息到目标用户B存储队列 (rpc->SavePeerMessage)
2.SaveMessage:保存消息到发送用户A存储队列(供多点登录同步消息)
3.PushMessage:外部推送消息给目标用户B(由IMR寻路由)MSG_IM
4.SendMessage:发送同步消息给目标用户B (外部推送+本地寻址发送) MSG_SYNC_NOTIFY
5.SendMessage:发送同步消息给发送用户A(多点登录)MSG_SYNC_NOTIFY
6.EnqueueMessage:给本连接回复MSG_ACK消息
1.SaveMessage:保存消息,将消息发送给ims去保存
rpc库: github.com/valyala/gorpc
函数: SaveMessage(appid int64, uid int64, device_id int64, m *Message) (int64, error)复制代码
(1)因为ims支持分布式,所以ims可以有多个.所以通过对uid进行取模(%),来获取对应的ims对象,达到负载均衡的效果,这样每一个uid就跟每一个ims一一对应了,这也是为什么ims无法动态添加的原因.
(2)通过rpc函数SavePeerMessage,将msg发送给ims.
(3)最后rpc响应成功后,会返回一个msgid,也就是消息存储在文件的偏移量.
2.PushMessage:外部推送消息给目标用户(由IMR寻路由)
函数: func PublishMessage(appid int64, uid int64, m *Message) 复制代码
(1)imr跟ims一样支持分布式,一样无法动态获取,用uid来取模获取imr的对象,uid与imr一一对应.获取一个imr对象(channel).
(2)调用PublishMessage,将Message封装成AppMessage,然后传给chanel.Publish(amsg).
(3)channel.Publish将消息又封装成Message,并传入到channel队列wt中.
(4)channel内部有个for循环,wt不停将消息发给SendMessage.
- SendMessage:发送同步消息给目标用户B (外部推送+本地寻址发送)
函数: func SendMessage(conn io.Writer, msg *Message) error复制代码
(1)将字节流通过WriteMessage(buffer,msg),转成成如下格式的buf
包:header(12)|body
header:len(4),seq(4),cmd(1),version(1),空(2)复制代码
(2)将buf写入conn.Write
- cliengt.EnqueueMessage:给本连接回复MSG_ACK消息
(1)将msg传给Connection 的wt队列
(2)Connection的for,将wt中的msg,重新封装成Message对象,调用Connection.send(msg)
(3)判断是tcp就发给SendMessages,websocket就发给SendEngineIOBinaryMessage(conn,msg)
-------------------------------------------------------------------
功能:im模块的启动入口
main:
1.初始化配置参数.
2.设置RedisPool连接池.
3.设置ims连接及其几个RPC方法,根据配置给的多个ims地址实现分布式,但是不能动态添加,后面加入的ims模块需要再重启im模块才能使用.两种地址:
storage_rpc_addrs用来保存 个人消息/普通群消息/客服消息
group_storage_rpc_addrs用来保存 超级群消息
(可以让他们分开存放数据,若group_storage_rpc_addrs不存在时,直接使用storage_rpc_addrs,
我们公司就是没有分开存放,一个原因也是没有使用超级群.)
tips: 超级群消息和普通群消息的区别
一般都是用普通群,当有一个群特别多人的情况,可以使用超级群,但是千万不要滥用.
普通群消息:
当用户在群里发送一条消息后,会对群上的成员遍历一次,有多少个成员就将消息存多少条,
分别存到群成员各自的消息队列上面.在发送消息的时候,性能消耗会多点.
超级群消息:
当用户在群里发送一条消息后,只会存一条消息到该群的超级群队列上面.但是客户端每次获取消息的时候,
除了需要自己队列上获取消息,还要去超级群队列获取,会增加获取消息的耗时.
复制代码
4.设置imr连接及其几个RPC方法,两种地址: route_addrs和group_route_addrs,情况如上.
DispatchAppMessage, DispatchGroupMessage, DispatchRoomMessage
分发 普通消息,群消息,房间消息复制代码
5.设置关键词字典
6.启动群管理: group_manager.Start() [group_manager.go文件] 创建多个临时文件夹.
普通群消息首先保存到临时文件中,之后按照保存到文件中的顺序依次派发,通过配置group_deliver_count的数量可以设置多少创建个临时文件夹,多少个协程来操作.
7.开启订阅redis服务
8.开启SyncKey服务,SyncKey是每条消息存在文件中的偏移量,如一个文件上限设置128M,当偏移量超过128M,则取余后存入第二个文件,以此类推,第三,第四...
9.开启TCP和websocket的连接服务端
10.开启API
(2)app_route.go
功能:以appid划分多个集合,每个集合存放route.及其针对AppRoute的Route增删查改.
type AppRoute struct{
mutex sunc.Mutex
apps map[int64]*Route
}
(3)route.go
功能:以uid划分多个集合,每个集合存放每个用户的信息.及其针对Route的Clients增删查改.
初始化Client使用了set是一个空struct,用来存放任意类型的类.
type Route struct{
appid int64
mutex sunc.Mutex
clients map[int64]ClientSet
room_clients map[int64]ClientSet
}
(4)channel.go
功能:记录用户在线状态,跟route的route_addrs,group_route_addrs数量对应.
(5)client.go
功能:客户端连接的对象,负责与客户端的通讯(write和read).发送messages等待队列中的消息.
type Client struct {
Connection //必须放在结构体首部
*PeerClient
*GroupClient
*RoomClient
*CustomerClient
public_ip int32
}复制代码
read()获取到消息,到client.HandleMessage(msg)去解析.
首次连接需要通过token验证,im获取token后,拼接上access_token_%s到redis寻找字段,若字段存在则通过验证,否则连接失败.
client.HandleAuthToken(login *AuthenticationToken,version int)
---验证token,失败则直接返回status=1,成功往下走,并返回status=0.
---通过platform_id判断客户端平台,iOS和安卓属于在线状态,修改online状态,停止用户推送消息.
---为client添加各种初始化信息.
---client.PeerClient.Login(),发送通知Sunbscribe函数,并通知调用Sunbscribe到所有group_route_channels.
---CountDAU按天划分key,添加进HyperLogLog
(5)config.go
功能:配置初始参数变量
(6)connection.go
功能:处理消息队列,通过uid寻找接收客户端的对象,将消息塞到client对象的messages等待队列
type Connection struct {
conn interface{}
closed int32
forbidden int32 //是否被禁言
notification_on bool //桌面在线时是否通知手机端
online bool
tc int32 //write channel timeout count
wt chan *Message
lwt chan int //客户端协议版本号
version int
tm time.Time
appid int64
uid int64
device_id string
device_ID int64 //generated by device_id + platform_id
platform_id int8
messages *list.List //待发送的消息队列 FIFO
mutex sync.Mutex
}复制代码
(7)customer_client.go customer_service.go
功能:处理客服消息
(8)device.go
就一个方法,获取设备的DeviceID,如果没有就去redis申请一个最新的ID
(9)dummy_grpc.go grpc.go(废弃)