当前位置: 首页 > 工具软件 > TeamTalk > 使用案例 >

TeamTalk服务端分析 三、服务端以及客户端流程

公孙成仁
2023-12-01

前言

在上一篇中,简单的分析了下各个服务端的配置,在这一篇中,简单的分析下TeamTalk服务端的整个运作流程。

服务端流程

服务端的启动没有严格的先后流程,因为各端在启动后会去主动连接其所依赖的服务端。不过在此,如果是线上环境,还是建议按照如下的启动顺序去启动(也不是唯一的顺序):
1、启动db_proxy。
2、启动route_server,file_server,msfs
3、启动login_server
4、启动msg_server

那么我就按照服务端的启动顺序去讲解服务端的一个流程概述。
第一步:启动db_proxy后,db_proxy会去根据配置文件连接相应的mysql实例,以及redis实例。
第二步:启动route_server,file_server,msfs后,各个服务端都会开始监听相应的端口。
第三步:启动login_server,login_server就开始监听相应的端口,等待客户端的连接,而分配一个负载相对较小的msg_server给客户端。
第四步:启动msg_server,msg_server启动后,会去主动连接route_server,login_server,db_proxy,会将自己的监听的端口信息注册到login_server去,同时在用户上线,下线的时候会将自己的负载情况汇报给login_server.
下面我会分析route_server,login_server以及msg_server的一些逻辑,这里只是简单的分析,后面会有针对各个端的具体的分析。
首先讲解下各个端的一些结构体。

route_server

route_server 在整个tt中的作用是一个消息转发的地方,其在内存中维护了全局用户信息。当有多个msg_server的时候,route_server的作用就是在多个msg_server之间中转消息。
在RouteConn.cpp中定义了如下结构体用来保存用户状态:

typedef map<CRouteConn*, uint32_t> RouteConnMap_t;
typedef struct {
    uint32_t    status;
    RouteConnMap_t      conns;
} UserStat_t;

typedef hash_map<uint32_t, UserStat_t> UserStatMap_t;

static UserStatMap_t g_rs_user_map;

g_rs_user_map是一个hash_map,保存了全局用户信息,其定义如下,key是用户id,value是一个结构体:

typedef struct {
    uint32_t    status;
    RouteConnMap_t      conns;
} UserStat_t;

该结构体中status标识用户的状态,conns也是一个map,其保存了对应的msg_server连接,以及该连接上用户的是pc端还是移动端。
当又用户上线的时候,msg_server会将该用户的状态发送到route_server,route_server就会在g_rs_user_map里面插入一条记录。

login_server

login_server在整个TT的架构中,可以简单的理解为一个负载均衡的作用,在login_server中,同样在内存中维护了所有的msg_server的地址以及其目前的负载情况。
在LoginConn.cpp中定义了如下结构体来保存msg_server的状态机负载:

typedef hash_map<uint32_t, uint32_t> UserConnCntMap_t;
typedef struct  {
    string      ip_addr1;   // 网通IP
    string      ip_addr2;   // 电信IP
    uint16_t    port;
    uint32_t    max_conn_cnt;
    uint32_t    cur_conn_cnt;
    uint32_t    cur_user_cnt;   // 当前的用户数- 由于允许用户多点登陆,cur_user_cnt != cur_conn_cnt
    string      hostname;   // 消息服务器的主机名
    uint32_t    server_type;
    UserConnCntMap_t user_cnt_map;
} msg_serv_info_t;

static map<uint32_t, msg_serv_info_t*> g_msg_serv_info;

g_msg_serv_info是一个hash_map用来保存msg_server。其key是一个socket描述符,value保存的是一个结构体,包含了对应msg_server的具体信息:

typedef struct  {
    string      ip_addr1;   // 网通IP
    string      ip_addr2;   // 电信IP
    uint16_t    port;
    uint32_t    max_conn_cnt;
    uint32_t    cur_conn_cnt;
    uint32_t    cur_user_cnt;   // 当前的用户数- 由于允许用户多点登陆,cur_user_cnt != cur_conn_cnt
    string      hostname;   // 消息服务器的主机名
    uint32_t    server_type;
    UserConnCntMap_t user_cnt_map;
} msg_serv_info_t;

ip_addr1与ip_addr2分别存储对应msg_server的网通和电信的IP(在这里由于我们的服务器是双线所以这么设计的),port保存了对应msg_server的端口,max_conn_cnt保存的是msg_server中配置的最大连接数,当msg_server的连接数超过该值后,就不在分配用户过去,cur_conn_cnt,对应msg_server当前的连接数,cur_user_cnt保存的是对应msg_server登陆的用户数,就像注释说的那样,由于可以多点登陆,所以当前用户数<=当前连接数,hostname保存的是msg_server的hostname,server_type标记msg_server网络类型,目前只有一个定义Tcp Server,在ImPduServer.h中定义:

enum {
    MSG_SERVER_TYPE_TCP     = 1,
};

user_cnt_map是一个hash_map用来保存用户id以及对应的用户数量:

hash_map<uint32_t, uint32_t> UserConnCntMap_t;

msg_server

额……msg_server真的是太复杂了,这个留到后面代码分析的时候再说吧。

启动

msg_server<----->login_server

msg_server 启动后会去主动连接login_server,并通过数据包CImPduMsgServInfo向login_server注册自己的信息。其处理逻辑在LoginServConn.cpp中:

void CLoginServConn::OnConfirm()
{
    log("connect to login server success\n");
    m_bOpen = true;
    g_login_server_list[m_serv_idx].reconnect_cnt = MIN_RECONNECT_CNT / 2;

    uint32_t cur_conn_cnt = 0;
    list<user_conn_t> user_conn_list;
    CImUserManager::GetInstance()->GetUserConnCnt(&user_conn_list, cur_conn_cnt);
    char hostname[256] = {0};
    gethostname(hostname, 256);
    CImPduMsgServInfo pdu(g_msg_server_ip_addr1.c_str(), g_msg_server_ip_addr2.c_str(),
        g_msg_server_port, g_max_conn_cnt, cur_conn_cnt, hostname, MSG_SERVER_TYPE_TCP);
    SendPdu(&pdu);

    if (!user_conn_list.empty()) {
        CImPduUserConnInfo pdu2(&user_conn_list);
        SendPdu(&pdu2);
    }
}

当有用户连接上msg_server并登录成功或者用户断开连接的时候,会向login_server发送一个CImPduUserCntUpdate数据包通知login_server。其处理逻辑在LoginServConn.cpp:

void send_to_all_login_server(CImPdu* pPdu)
{
    CLoginServConn* pConn = NULL;

    for (uint32_t i = 0; i < g_login_server_count; i++) {
        pConn = (CLoginServConn*)g_login_server_list[i].serv_conn;
        if (pConn && pConn->IsOpen()) {
            pConn->SendPdu(pPdu);
        }
    }
}

msg_server<---->route_server

msg_server 启动后会去主动连接route_server,并通过数据包CImPduOnlineUserInfo向route_server报告自己当前在线的用户情况。其处理逻辑在LoginServConn.cpp中:

void CRouteServConn::OnConfirm()
{
    log("connect to route server success\n");
    m_bOpen = true;
    m_connect_time = get_tick_count();
    g_route_server_list[m_serv_idx].reconnect_cnt = MIN_RECONNECT_CNT / 2;

    if (g_master_rs_conn == NULL) {
        update_master_route_serv_conn();
    }

    list<user_conn_stat_t> online_user_list;
    CImUserManager::GetInstance()->GetOnlineUserInfo(&online_user_list);
    CImPduOnlineUserInfo pdu(&online_user_list);
    SendPdu(&pdu);
}

当有用户连接上msg_server并登录成功或者用户断开连接的时候,会向route_server发送一个CImPduUserStatusUpdate数据包通知route_server。其处理逻辑在RouteServConn.cpp:

void send_to_all_route_server(CImPdu* pPdu)
{
    CRouteServConn* pConn = NULL;

    for (uint32_t i = 0; i < g_route_server_count; i++) {
        pConn = (CRouteServConn*)g_route_server_list[i].serv_conn;
        if (pConn && pConn->IsOpen()) {
            pConn->SendPdu(pPdu);
        }
    }
}

调用处

向route_server与login_server汇报本机情况的调用在MsgConn.cpp中,在用户上线,下线的时候会调用:

void CMsgConn::SendUserStatusUpdate(uint32_t user_status)
{

    if (!m_bOpen) {
        return;
    }
    CImUser* pImUser = CImUserManager::GetInstance()->GetImUserByName(m_user_name);
    if (!pImUser) {
        return;
    }

    // 只有上下线通知才通知LoginServer
    if (user_status == USER_STATUS_ONLINE) {
        CImPduUserCntUpdate pdu(USER_CNT_INC, pImUser->GetUserId());
        send_to_all_login_server(&pdu);
        
        //if (pImUser->IsMsgConnEmpty()) {
    
        CImPduUserStatusUpdate pdu2(USER_STATUS_ONLINE, pImUser->GetUserId(), GetClientTypeFlag());
        send_to_all_route_server(&pdu2);
        //}
    } else if (user_status == USER_STATUS_OFFLINE) {
        CImPduUserCntUpdate pdu(USER_CNT_DEC, pImUser->GetUserId());
        send_to_all_login_server(&pdu);
    
        //if (pImUser->IsMsgConnEmpty()) {
        CImPduUserStatusUpdate pdu2(USER_STATUS_OFFLINE, pImUser->GetUserId(), GetClientTypeFlag());
        send_to_all_route_server(&pdu2);
        //}
    }
}

下篇预告:下一篇我会分析下login_server的逻辑,后续篇章会偏重代码级别的分析

 类似资料: