当前位置: 首页 > 工具软件 > X-MSG-IM > 使用案例 >

im_service--im模块

瞿博学
2023-12-01

前言

该IM模块是使用golang开发,性能优秀,而且提供全套的客户端,服务端程序,就是说如果要快速拥有一套完整的IM系统,这个IM是很不错的选择.

IM分为三个模块,分别为:

im模块---用于消息分发,可以起多个模块做分布式,通过LVS实现负载均衡.

imr模块---用于做im模块的路由,多个im之间通过imr来搜索各个用户的存在.

ims模块---用于做消息的顺序存储,不停的存储在message_x文件,x代表数字,每个文件存满默认128M之后,会存到下一个文件,x数字递增.





im模块

--模块目录

带小箭头的文件为几个模块共用文件,解析放在im模块;benchmark是测试文件;Makefile是用来编译的文件.



im消息处理:

用户A->B:

1.SaveMessage:保存消息到目标用户B存储队列 (rpc->SavePeerMessage)

2.SaveMessage:保存消息到发送用户A存储队列(供多点登录同步消息)

3.PushMessage:外部推送消息给目标用户B(由IMR寻路由)MSG_IM

4.SendMessage:发送同步消息给目标用户B (外部推送+本地寻址发送) MSG_SYNC_NOTIFY

5.SendMessage:发送同步消息给发送用户A(多点登录)MSG_SYNC_NOTIFY

6.EnqueueMessage:给本连接回复MSG_ACK消息


1.SaveMessage:保存消息,将消息发送给ims去保存

rpc库: github.com/valyala/gorpc
函数:  SaveMessage(appid int64, uid int64, device_id int64, m *Message) (int64, error)复制代码

      (1)因为ims支持分布式,所以ims可以有多个.所以通过对uid进行取模(%),来获取对应的ims对象,达到负载均衡的效果,这样每一个uid就跟每一个ims一一对应了,这也是为什么ims无法动态添加的原因.

      (2)通过rpc函数SavePeerMessage,将msg发送给ims.

      (3)最后rpc响应成功后,会返回一个msgid,也就是消息存储在文件的偏移量.


2.PushMessage:外部推送消息给目标用户(由IMR寻路由)

函数:  func PublishMessage(appid int64, uid int64, m *Message) 复制代码

    (1)imr跟ims一样支持分布式,一样无法动态获取,用uid来取模获取imr的对象,uid与imr一一对应.获取一个imr对象(channel).

    (2)调用PublishMessage,将Message封装成AppMessage,然后传给chanel.Publish(amsg).

    (3)channel.Publish将消息又封装成Message,并传入到channel队列wt中.

    (4)channel内部有个for循环,wt不停将消息发给SendMessage.


  • SendMessage:发送同步消息给目标用户B (外部推送+本地寻址发送)
    函数:  func SendMessage(conn io.Writer, msg *Message) error复制代码

          (1)将字节流通过WriteMessage(buffer,msg),转成成如下格式的buf

包:header(12)|body

header:len(4),seq(4),cmd(1),version(1),空(2)复制代码

         (2)将buf写入conn.Write

        

  • cliengt.EnqueueMessage:给本连接回复MSG_ACK消息

        (1)将msg传给Connection 的wt队列

        (2)Connection的for,将wt中的msg,重新封装成Message对象,调用Connection.send(msg)

        (3)判断是tcp就发给SendMessages,websocket就发给SendEngineIOBinaryMessage(conn,msg)

  

-------------------------------------------------------------------

(1)im.go

功能:im模块的启动入口

main:

1.初始化配置参数.

2.设置RedisPool连接池.

3.设置ims连接及其几个RPC方法,根据配置给的多个ims地址实现分布式,但是不能动态添加,后面加入的ims模块需要再重启im模块才能使用.两种地址:

storage_rpc_addrs用来保存 个人消息/普通群消息/客服消息 
group_storage_rpc_addrs用来保存 超级群消息
(可以让他们分开存放数据,若group_storage_rpc_addrs不存在时,直接使用storage_rpc_addrs,
我们公司就是没有分开存放,一个原因也是没有使用超级群.) 

tips: 超级群消息和普通群消息的区别
一般都是用普通群,当有一个群特别多人的情况,可以使用超级群,但是千万不要滥用.

普通群消息:
当用户在群里发送一条消息后,会对群上的成员遍历一次,有多少个成员就将消息存多少条,
分别存到群成员各自的消息队列上面.在发送消息的时候,性能消耗会多点.


超级群消息: 
当用户在群里发送一条消息后,只会存一条消息到该群的超级群队列上面.但是客户端每次获取消息的时候,
除了需要自己队列上获取消息,还要去超级群队列获取,会增加获取消息的耗时.

 复制代码


4.设置imr连接及其几个RPC方法,两种地址: route_addrs和group_route_addrs,情况如上.

DispatchAppMessage, DispatchGroupMessage, DispatchRoomMessage
分发 普通消息,群消息,房间消息复制代码

5.设置关键词字典


6.启动群管理: group_manager.Start() [group_manager.go文件] 创建多个临时文件夹.
 普通群消息首先保存到临时文件中,之后按照保存到文件中的顺序依次派发,通过配置group_deliver_count的数量可以设置多少创建个临时文件夹,多少个协程来操作.


7.开启订阅redis服务

8.开启SyncKey服务,SyncKey是每条消息存在文件中的偏移量,如一个文件上限设置128M,当偏移量超过128M,则取余后存入第二个文件,以此类推,第三,第四...

9.开启TCP和websocket的连接服务端

10.开启API



(2)app_route.go

功能:以appid划分多个集合,每个集合存放route.及其针对AppRoute的Route增删查改.

type AppRoute struct{

         mutex     sunc.Mutex

         apps       map[int64]*Route

}


(3)route.go

功能:以uid划分多个集合,每个集合存放每个用户的信息.及其针对Route的Clients增删查改.

初始化Client使用了set是一个空struct,用来存放任意类型的类.

type Route struct{

         appid    int64

         mutex   sunc.Mutex

         clients  map[int64]ClientSet

          room_clients  map[int64]ClientSet

}


(4)channel.go

功能:记录用户在线状态,跟route的route_addrs,group_route_addrs数量对应.


(5)client.go

功能:客户端连接的对象,负责与客户端的通讯(write和read).发送messages等待队列中的消息.

type Client struct {   
     Connection //必须放在结构体首部   
     *PeerClient   
     *GroupClient   
     *RoomClient   
     *CustomerClient   
     public_ip int32
}复制代码

read()获取到消息,到client.HandleMessage(msg)去解析.

首次连接需要通过token验证,im获取token后,拼接上access_token_%s到redis寻找字段,若字段存在则通过验证,否则连接失败.

client.HandleAuthToken(login *AuthenticationToken,version int)

---验证token,失败则直接返回status=1,成功往下走,并返回status=0.

---通过platform_id判断客户端平台,iOS和安卓属于在线状态,修改online状态,停止用户推送消息.

---为client添加各种初始化信息.

---client.PeerClient.Login(),发送通知Sunbscribe函数,并通知调用Sunbscribe到所有group_route_channels.

---CountDAU按天划分key,添加进HyperLogLog


(5)config.go

功能:配置初始参数变量


(6)connection.go

功能:处理消息队列,通过uid寻找接收客户端的对象,将消息塞到client对象的messages等待队列

type Connection struct {   
    conn   interface{}   
    closed int32      
    forbidden int32 //是否被禁言   
    notification_on bool //桌面在线时是否通知手机端   
    online bool      
    tc     int32 //write channel timeout count   
    wt     chan *Message   
    lwt    chan int      //客户端协议版本号   
    version int   
    tm     time.Time   
    appid  int64   
    uid    int64   
    device_id string   
    device_ID int64 //generated by device_id + platform_id   
    platform_id int8      
    messages *list.List //待发送的消息队列 FIFO   
    mutex  sync.Mutex
}复制代码



(7)customer_client.go  customer_service.go

功能:处理客服消息


(8)device.go

就一个方法,获取设备的DeviceID,如果没有就去redis申请一个最新的ID


(9)dummy_grpc.go grpc.go(废弃)





 类似资料: