当前位置: 首页 > 工具软件 > KCP > 使用案例 >

基于KCP的可靠UDP封装

陶法
2023-12-01

基于KCP的可靠UDP封装

交互原理

客户端服务端交互过程:
1)客户端与服务端都指定端口和IP,客户端服务端启动后,都创建KCP和socket ;
2)客户端在启动后,首先向服务端发送链接消息,在服务端未回复链接成功的确认标识时,客户端不断向服务端发送链接请求消息;
3)服务端在收到链接请求消息后,重置本地KCP,并向客户端发送链接成功确认消息,并启动服务端心跳,检测客户端消息;
4)客户端在收到服务端链接确认消息后,确认链接,并开始不断的向服务端发送心跳消息,以维持客户端与服务端的链接;
5)当客户端断开连接后,如果在服务端心跳还未超时得情况下,重启客户端,则服务端会收到客户端的连接消息,这时服务端就会重置KCP,并向客户端发送连接成功标识,客户端和服务端连接成功,如果服务端心跳超时,则服务端会关闭KCP,等到客户端链接消息后,启动KCP,建立和客户端的链接;
6)当服务端断开连接后,如果客户端心跳还未超时,服务端就重启了,则服务端在断开状态下收到客户端的心跳后,会向客户端发送链接请求,客户端收到链接请求后,会重置客户端KCP,并与服务端重新建立连接,如果客户端心跳超时,则客户端关闭KCP,并不断的向服务端发送连接请求消息,当服务端启动,收到连接请求消息后,会向客户端回复连接确认消息,以此建立客户端和服务端的连接。

直接上代码
kcp_udp.h


/********************************************
* 最终完成人:
* 最终完成时间:2020-11-25
* 类功能:使用KCP算法的UDP封装基类
*
*
****************************************/


#ifndef KCP_UDP_H
#define KCP_UDP_H

#include "ikcp.h"
#include "celltimestamp.h"
#include "cellthread.h"
#include "data.h"
#ifdef __linux__
#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h>
#endif

#ifdef WIN32
#include <winsock2.h>
#define MSG_NOSIGNAL 0
#define socklen_t int
#define MSG_DONTWAIT 0
#endif
#include <stdlib.h>
#include <string.h>
//客户端服务端统一会话标识
#define KCP_CONV 0x11223344
#define ushort unsigned short
#define KCP_RELEASE 0
#define ALL_RELEASE -1
#ifdef __linux__
#define SOCKET_ERROR -1
#endif
class kcp_udp
{
public:
    kcp_udp();

    /********************************************
    * 最终完成人:
    * 最终完成时间:2020-11-26
    * 函数功能:设置MTU,最大传输单元,默认为1400,最小为50
    *
    * 输入:ushort umtu 最大传输单元
    *
    *
    * 返回值:0 设置成功,-1 设置过小  -2 设置时申请内存失败,-3代表kcp未创建
    *
    *
    ****************************************/

    int setmtu(ushort umtu);
    /********************************************
    * 最终完成人:
    * 最终完成时间:2020-11-26
    * 函数功能:设置滑动窗口大小
    *
    * 输入:int sndwnd 发送滑动窗口大小 int recvwnd 接收滑动窗口大小
    *
    *
    * 返回值 0 代表成功,-1代表kcp未创建
    *
    *
    ****************************************/
    int setwndsize(int sndwnd,int recvwnd);

    /********************************************
    * 最终完成人:
    * 最终完成时间:2020-11-26
    * 函数功能:
    *
    * 输入:模式:model 0 默认模式  1 普遍模式,关闭流控 2 快速模式
    *
    *
    * 返回值 0,正常,-1失败
    *
    *
    ****************************************/
    int setnodelay(int model);

    /********************************************
    * 最终完成人:
    * 最终完成时间:2020-11-06
    * 函数功能:两种功能,参数为 0时,只关闭kcp,参数为 -1时,socket也关闭
    *
    * 输入:
    *
    *
    * 输出:
    *
    *
    ****************************************/
    void Close(int mode = KCP_RELEASE);
    /********************************************
    * 最终完成人:
    * 最终完成时间:2020-11-26
    * 函数功能:发送UDP数据
    *
    * 输入:const char* buff 发送缓冲区首地址,int len 发送数据长度
    *
    *
    * 返回值: -1 ,-2发送失败,0表示成功把数据加入到发送队列
    *
    *
    ****************************************/
    int kcp_send(const char* buff,int len);

    /********************************************
    * 最终完成人:
    * 最终完成时间:2020-11-26
    * 函数功能:
    *
    * 输入:char* buff 发送缓冲区首地址,int len 发送数据长度
    *
    *
    * 返回值 接收到的数据长度,小于 0代表无数据
    *
    *
    ****************************************/
    int kcp_recv(char* buff,int len);

    //刷新
    void kcp_flush();

    /********************************************
    * 最终完成人:
    * 最终完成时间:2020-11-26
    * 函数功能:kcp发送回调函数
    *
    * 输入:
    *
    *
    * 输出:
    *
    *
    ****************************************/
    static int udp_output(const char* buf,int len,ikcpcb* kcp,void* user);

    //获取重启状态
    bool getReconnect();
    //获取kcp发送队列的消息量
    IUINT32 getWaitSendNum();

    //发送是否停止,当发送队列大于滑动窗口两倍时不再发送数据,需要停止
    //true 需要停止发送,false 不需要停止
    bool needPaused();
protected:
    //kcp刷新线程执行函数
    void handle_update(CELLThread* pthead);
    //kcp接收线程执行函数,纯虚函数,分别有继承的客户端类和服务类实现
    virtual void recv_work(CELLThread *pthread) = 0;
    //kcp,日志回调函数
    static void write_log(const char *log, struct IKCPCB *kcp, void *user);

    //创建kcp,如果创建失败返回false,创建成功返回true;
    bool create();

    /********************************************
    * 最终完成人:李余同
    * 最终完成时间:2020-11-26
    * 函数功能:启动kcp,初始启动接收线程
    *
    * 输入:
    *
    *
    * 返回值: false 启动失败  true 启动成功
    *
    *
    ****************************************/
    bool start();
protected:
    //kcp结构指针
    ikcpcb *m_kcp = nullptr;
    //kcp跨线程锁
    std::mutex m_kcpMutex;
    //kcp,update频率,单位毫秒,默认10毫秒
    IUINT16 m_updateInterval = 10;
    //socket描述符
    int m_sockfd = SOCKET_ERROR;
    //kcp,update线程
    CELLThread m_updateThread;
    //kcp,接收线程
    CELLThread m_recvThread;

    //kcp,udp_output结构
    struct sockaddr_in ser_addr;

    //kcp监听端口和ip
    struct sockaddr_in client_addr;

    //recv接收到网络结构
    struct sockaddr_in recv_addr;

    //udp client server 心跳标识
    bool m_bconnect = false;

    //重启标志,获取改标志后,改标志变为未重启状态
    //false 未重启,true重启
    bool m_bRConnect = false;
};

#endif // KCP_UDP_H

kcp_udp.cpp

#include "kcp_udp.h"

kcp_udp::kcp_udp()
{

}


void kcp_udp::handle_update(CELLThread *pthead)
{
    time_t old = CELLTime::getNowInMilliSec();
    while(pthead->isRun())
    {
        if(CELLTime::getNowInMilliSec() - old >= m_updateInterval)
        {
            old = CELLTime::getNowInMilliSec();
            std::lock_guard<std::mutex> lock(m_kcpMutex);
            if(!m_kcp)
            {
                continue;
            }
            ikcp_update(m_kcp,old);
        }
    }
}


void kcp_udp::write_log(const char *log, IKCPCB *kcp, void *user)
{

}

bool kcp_udp::create()
{
    {
        std::lock_guard<std::mutex> lock(m_kcpMutex);
        if(m_kcp)
        {
            return true;
        }
        m_kcp = ikcp_create(KCP_CONV,(void*)this);
        m_kcp->output = udp_output;

    }
    {
        setnodelay(1);
        setwndsize(128,128);
    }
    if(m_sockfd == SOCKET_ERROR)
    {
        m_sockfd = socket(AF_INET,SOCK_DGRAM,0);
        int opt=1;
    #ifdef WIN32
        setsockopt(m_sockfd,SOL_SOCKET,SO_REUSEADDR,(char*)&opt,sizeof(opt));//端口复用
    #endif
    #ifdef __linux__
        setsockopt(m_sockfd,SOL_SOCKET,SO_REUSEADDR,(char*)&opt,sizeof(opt));
    #endif
        int ret = bind(m_sockfd,(struct sockaddr*)&client_addr,sizeof(client_addr));
        if(ret < 0)
        {
            return false;
        }
    }


    if(m_kcp && m_sockfd > 0)
    {
        return true;
    }
    else
    {
        Close(ALL_RELEASE);
    }
    return false;
}

bool kcp_udp::start()
{
    if(!create())
    {
         return false;
    }


    if(!m_recvThread.isRun())
    {
        m_recvThread.Start(nullptr,[&](CELLThread* pThread){
            recv_work(pThread);
        });
    }

    if(!m_updateThread.isRun())
    {
        m_updateThread.Start(nullptr,[&](CELLThread* pThread){
            handle_update(pThread);
        });
    }
    return true;
}

int  kcp_udp::setmtu(ushort umtu)
{
    std::lock_guard<std::mutex> lock(m_kcpMutex);
    if(!m_kcp)
    {
        return -3;
    }
    return ikcp_setmtu(m_kcp,umtu);
}

int kcp_udp::setwndsize(int sndwnd, int recvwnd)
{
     std::lock_guard<std::mutex> lock(m_kcpMutex);
     if(!m_kcp)
     {
         return -1;
     }
     return ikcp_wndsize(m_kcp,sndwnd,recvwnd);
}


void kcp_udp::Close(int mode)
{

    if(m_updateThread.isRun())
    {
        m_updateThread.Close();
    }
    if(m_kcp)
    {
        std::lock_guard<std::mutex> lock(m_kcpMutex);
        ikcp_release(m_kcp);
        m_kcp = nullptr;
    }
    if(mode == ALL_RELEASE)
    {
        if(m_recvThread.isRun())
        {
            m_recvThread.Close();
        }
        if(m_sockfd != SOCKET_ERROR)
        {
    #ifdef __linux__
            close(m_sockfd);
    #endif
    #ifdef WIN32
            closesocket(m_sockfd);
    #endif
            m_sockfd = SOCKET_ERROR;
        }
    }
}

int kcp_udp::kcp_send(const char *buff, int len)
{
    std::lock_guard<std::mutex> lock(m_kcpMutex);
    if(!m_kcp)
    {
        return -1;
    }
    return ikcp_send(m_kcp,buff,len);
}

int kcp_udp::kcp_recv(char *buff, int len)
{
    std::lock_guard<std::mutex> lock(m_kcpMutex);
    if(!m_kcp)
    {
        return -1;
    }
    return ikcp_recv(m_kcp,buff,len);
}

int kcp_udp::setnodelay(int model)
{
    std::lock_guard<std::mutex> lock(m_kcpMutex);
    if(!m_kcp)
    {
        return -1;
    }
    switch(model)
    {
        case 0:
        {

            ikcp_nodelay(m_kcp,0,10,0,0);
            break;
        }
        case 1:
        {
            ikcp_nodelay(m_kcp,0,10,0,1);
            break;
        }
        case 2:
        {
            ikcp_nodelay(m_kcp,2,10,2,1);
            break;
        }
        default:
        {
             ikcp_nodelay(m_kcp,0,10,0,0);
             break;
        }
    }
}

void kcp_udp::kcp_flush()
{
    std::lock_guard<std::mutex> lock(m_kcpMutex);
    if(!m_kcp)
    {
        return;
    }
    ikcp_flush(m_kcp);
}

int kcp_udp::udp_output(const char *buf, int len, ikcpcb *kcp, void *user)
{
    kcp_udp *pCur = (kcp_udp*)user;
    ssize_t ssend = sendto(pCur->m_sockfd,buf,len,0,(struct sockaddr*)&pCur->ser_addr,sizeof(struct sockaddr_in));
    if(ssend > 0)
    {
        return (int)ssend;
    }
    else
    {
        return -1;
    }
}

bool kcp_udp::getReconnect()
{
    bool ret = m_bRConnect;
    m_bRConnect = false;
    return ret;
}

IUINT32 kcp_udp::getWaitSendNum()
{
    std::lock_guard<std::mutex> lock(m_kcpMutex);
    if(!m_kcp)
    {
        return 0;
    }
    return m_kcp->nsnd_que;
}




bool kcp_udp::needPaused()
{
    std::lock_guard<std::mutex> lock(m_kcpMutex);
    if(!m_kcp)
    {
        return 0;
    }
    int ret = m_kcp->nsnd_que / m_kcp->snd_wnd;
    if(ret >= 2)
    {
        return true;
    }
    else
    {
        return false;
    }
}

```[技术参考](https://github.com/skywind3000/kcp)
 类似资料: