当前位置: 首页 > 工具软件 > KCP > 使用案例 >

KCP

蔺弘
2023-12-01

什么是KCP?为什么要使用KCP?
KCP是一个快速可靠协议。它主要的设计目的是为了解决在网络拥堵的情况下TCP协议网络速度慢的问题,增大网络传输速率,但相当于TCP而言,会相应的牺牲一部分带宽。
kcp没有规定下层传输协议,一般用UDP作为下层传输协议。kcp层协议的数据包在UDP数据报文的基础上增加控制头。当用户数据很大,大于一个UDP包能承担的范围时(大于MSS),kcp会将用户数据分片存储在多个kcp包中。因此每个kcp包称为一个分片。

首先我们先复习一下网络协议的一些基本的概念,这对我们理解KCP有很大的帮助。

【超时与重传】
超时重传指的是,发送数据包在一定的时间内没有收到相应的ACK,等待一定的时间,超时之后就认为这个数据包丢失,就会重新发送。这个等待时间被称为RTO,即重传超时时间。

【滑动窗口】
TCP通过确认机制来保证数据传输的可靠性。在早期的时候,发送数据方在发送数据之后会启动定时器,在一定时间内,如果没有收到发送数据包的ACK报文,就会重新发送数据,直到发送成功为止。但是这种停等的重传机制必须等待确认之后才能发送下一个包,传输速度比较慢。
为了提高传输速度,发送方不必在每发送一个包之后就进行等待确认,而是可以发送多个包出去,然后等待接收方一 一确认。但是接收方不可能同时处理无限多的数据,因此需要限制发送方往网络中发送的数据数量。接收方在未收到确认之前,发送方在只能发送wnd大小的数据,这个机制叫做滑动窗口机制。TCP的每一端都可以收发数据。每个TCP活连接的两端都维护一个发送窗口和接收窗口结构。

kcp结构体字段含义
snd_una:第一个未确认的包;
snd_nxt:下一个待分配的包的序号;

KCP通过以下方式提高速率:
(1)RTO。
TCP的RTO是以2倍的方式来计算的。当丢包的次数多的时候,重传超时时间RTO就非常非常的大了,重传就非常的慢,效率低,性能差。而KCP的RTO可以以1.5倍的速度增长,相对于TCP来说,有更短的重传超时时间。
(2)快速重传机制—无延迟ACK回复模式
假如开启KCP的快速重传机制,并且设置了当重复的ACK个数大于resend时候,直接进行重传。 当发送端发送了1,2,3,4,5五个包,然后收到远端的ACK:1,3,4,5。当收到ACK3时,KCP知道2被跳过1次,当收到ACK4的时候,KCP知道2被跳过2次,当次数大于等于设置的resend的值的时候,不用等到超时,可直接重传2号包。这就是KCP的快速重传机制。
下面是设置快速重传机制的源码:

//nodelay::0 不启用,1启用快速重传模式
//interval:  内部flush刷新时间
//resend:   0(默认)表示关闭。可以自己设置值,若设置为2(则2次ACK跨越将会直接重传)
//nc:         是否关闭拥塞控制,0(默认)代表不关闭,1代表关闭
int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc)
{
    if (nodelay >= 0)              //大于0表示启用快速重传模式
    {             
        kcp->nodelay = nodelay;
        if (nodelay) {
            kcp->rx_minrto = IKCP_RTO_NDL;    //最小重传超时时间(如果需要可以设置更小)
        } else{
            kcp->rx_minrto = IKCP_RTO_MIN;  
        }
    }
    if (interval >= 0) {
        if (interval > 5000) 
            interval = 5000;
        else if (interval < 10) 
            interval = 10;
        kcp->interval = interval;           //内部flush刷新时间
    }
    if (resend >= 0) {                     // ACK被跳过resend次数后直接重传该包, 而不等待超时
        kcp->fastresend = resend           // fastresend : 触发快速重传的重复ack个数
    }
    if (nc >= 0) {
        kcp->nocwnd = nc;
    }
    return 0;
}

(3)选择重传
KCP采用滑动窗口机制来提高发送速度。由于UDP是不可靠的传输方式,会存在丢包和乱序。而KCP是可靠的且保证数据有序的协议。为了保证包的顺序,接收方会维护一个接收窗口,接收窗口有一个起始序号 rcv_nxt(待接收消息序号)以及尾序号 rcv_nxt + rcv_wnd(接收窗口大小)。如果接收窗口收到序号为 rcv_nxt 的分片(属于接收窗口的消息序号),那么 rcv_nxt 就加1,也就是滑动窗口右移,并把该数据放入接收队列供应用层取用。如果收到的数据在窗口范围内但不是 rcv_nxt ,那么就把数据缓存起来,等收到rcv_nxt序号的分片时再一并放入接收队列供应用层取用。
当丢包发生的时候,假设第n个包丢失了,但是第n+1,n+2个包都已经传输成功了,此时只重传第n个包,而不重传成功传输的n+1,n+2号包,这就是选择重传。为了能够做到选择重传,接收方需要告诉发送方哪些包它收到了。比如在返回的ACK中包含rcv_nxt和sn,rcv_nxt的含义是接收方已经成功按顺序接收了rcv_nxt序号之前的所有包,大于rcv_nxt的序号sn表示的是在接收窗口内的不连续的包。那么根据这两个参数就可以计算出哪些包没有收到了。发送方接收到接收方发过来的数据时,首先解析rcv_nxt,把所有小于rcv_nxt序号的包从发送缓存队列中移除。然后再解析sn(大于rcv_nxt),遍历发送缓存队列,找到所有序号小于sn的包,根据我们设置的快速重传的门限,对每个分片维护一个快速重传的计数,每收到一个ack解析sn后找到了一个分片,就把该分片的快速重传的计数加一,如果该计数达到了快速重传门限,那么就认为该分片已经丢失,可以触发快速重传,该门限值在kcp中可以设置。
(4)拥塞窗口
当网络状态不好的时候,KCP会限制发送端发送的数据量,这就是拥塞控制。拥塞窗口(cwnd)会随着网络状态的变化而变化。这里采用了慢启动机制,慢启动也就是控制拥塞窗口从0开始增长,在每收到一个报文段确认后,把拥塞窗口加1,多增加一个MSS的数值。但是为了防止拥塞窗口过大引起网络阻塞,还需要设置一个慢机制的的门限(ssthresh即拥塞窗口的阈值)。当拥塞窗口增长到阈值以后,就减慢增长速度,缓慢增长。
但是当网络很拥堵的情况下,导致发送数据出现重传时,这时说明网络中消息太多了,用户应该减少发送的数据,也就是拥塞窗口应该减小。怎么减小呢,在快速重传的情况下,有包丢失了但是有后续的包收到了,说明网络还是通的,这时采取拥塞窗口的退半避让,拥塞窗口减半,拥塞门限减半。减小网络流量,缓解拥堵。当出现超时重传的时候,说明网络很可能死掉了,因为超时重传会出现,原因是有包丢失了,并且该包之后的包也没有收到,这很有可能是网络死了,这时候,拥塞窗口直接变为1,不再发送新的数据,直到丢失的包传输成功。

KCP主要工作过程:
1、把要发送的buffer分片成KCP的数据包格式,插入待发送队列中;
当用户的数据超过一个MSS(最大分片大小)的时候,会对发送的数据进行分片处理。KCP采用的是流的方式进行分片处理。通过frg进行排序区分,frg即message中的segment分片ID,在message中的索引,由大到小,0表示最后一个分片。比如3,2,1,0。即把message分成了四个分片,分片的ID分别是4,3,2,1;

分片方式共有两种:
消息方式:将用户数据分片,为每个分片设置ID,将分片后的数据一个一个地存入发送队列,接收方通过id解析原来的包,消息方式一个分片的数据量可能不能达到MSS;
流方式:检测每个发送队列里的分片是否达到最大MSS,如果没有达到就会用新的数据填充分片;
网络速度:流方式 > 消息方式
接收数据:流方式一个分片一个分片的的接收。消息方式kcp的接收函数会把自己原本属于一个数据的分片重组。

int ikcp_send(ikcpcb *kcp, const char *buffer, int len)
{
    IKCPSEG *seg;
    int count, i;

    assert(kcp->mss > 0);
    if (len < 0) return -1;

    //根据len计算出需要多少个分片
    if (len <= (int)kcp->mss) 
        count = 1;
    else 
        count = (len + kcp->mss - 1) / kcp->mss;   

    if (count > 255) 
        return -2;

    if (count == 0) 
        count = 1;

    // fragment
    for (i = 0; i < count; i++) {
        int size = len > (int)kcp->mss ? (int)kcp->mss : len;   //获取当前分片的长度,存放到size中
        seg = ikcp_segment_new(kcp, size);      
        assert(seg);
        if (seg == NULL) {
            return -2;
        }
        if (buffer && len > 0) {
            memcpy(seg->data, buffer, size);
        }
        seg->len = size;
        seg->frg = count - i - 1;    //frg用来表示被分片的序号,从大到小递减
        iqueue_init(&seg->node);
        iqueue_add_tail(&seg->node, &kcp->snd_queue);   //把segment分片插入到发送队列中
        kcp->nsnd_que++;
        if (buffer) {
            buffer += size;
        }
        len -= size;
    }

    return 0;
}

2、将发送队列中的数据通过下层协议UDP进行发送
void ikcp_flush(ikcpcb *kcp)主要处理一下四种情况:
(1)发送ack

// flush acknowledges
count = kcp->ackcount;
for (i = 0; i < count; i++) {
    size = (int)(ptr - buffer);
    if (size + IKCP_OVERHEAD > IKCP_OVERHEAD) {
        ikcp_output(kcp, buffer, size);
        ptr = buffer;
    }
    ikcp_ack_get(kcp, i, &seg.sn, &seg.ts);   //sn:message分片segment的序号,ts:message发送时刻的时间戳
    ptr = ikcp_encode_seg(ptr, &seg);
}

kcp->ackcount = 0;

(2)发送探测窗口消息

// probe window size (if remote window size equals zero)
if (kcp->rmt_wnd == 0) {                                //远端接收窗口大小为0的时候
    if (kcp->probe_wait == 0) {                         //探查窗口需要等待的时间为0
        kcp->probe_wait = IKCP_PROBE_INIT;              //设置探查窗口需要等待的时间
        kcp->ts_probe = kcp->current + kcp->probe_wait; //设置下次探查窗口的时间戳 = 当前时间 + 探查窗口等待时间间隔
    }    
    else {
        if (_itimediff(kcp->current, kcp->ts_probe) >= 0) { //当前时间 > 下一次探查窗口的时间
            if (kcp->probe_wait < IKCP_PROBE_INIT) 
                kcp->probe_wait = IKCP_PROBE_INIT;
            kcp->probe_wait += kcp->probe_wait / 2;   //等待时间变为之前的1.5倍
            if (kcp->probe_wait > IKCP_PROBE_LIMIT)
                kcp->probe_wait = IKCP_PROBE_LIMIT;   //若超过上限,设置为上限值
            kcp->ts_probe = kcp->current + kcp->probe_wait;  //计算下次探查窗口的时间戳
            kcp->probe |= IKCP_ASK_SEND;         //设置探查变量。IKCP_ASK_TELL表示告知远端窗口大小。IKCP_ASK_SEND表示请求远端告知窗口大小
        }
    }
}    else {
    kcp->ts_probe = 0;
    kcp->probe_wait = 0;
}

// flush window probing commands。IKCP_ASK_SEND表示请求远端告知窗口大小
if (kcp->probe & IKCP_ASK_SEND) {
    seg.cmd = IKCP_CMD_WASK;
    size = (int)(ptr - buffer);
    if (size + IKCP_OVERHEAD > IKCP_OVERHEAD) {
        ikcp_output(kcp, buffer, size);     //KCP的下层输出协议,通过设置回调函数来实现
        ptr = buffer;
    }
    ptr = ikcp_encode_seg(ptr, &seg);
}

// flush window probing commands。IKCP_ASK_TELL表示告知远端窗口大小
if (kcp->probe & IKCP_ASK_TELL) {
    seg.cmd = IKCP_CMD_WINS;
    size = (int)(ptr - buffer);
    if (size + IKCP_OVERHEAD > IKCP_OVERHEAD) {
        ikcp_output(kcp, buffer, size);
        ptr = buffer;
    }
    ptr = ikcp_encode_seg(ptr, &seg);
}

// flash remain no data segments
size = (int)(ptr - buffer);
if (size > 0) {
    ikcp_output(kcp, buffer, size);
    ptr = buffer;
}

kcp->probe = 0;

(3)计算拥塞窗口大小

// calculate window size
cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);    //cwnd = 发送窗口大小 和 远端接收窗口大小的最小值
if (kcp->nocwnd == 0)                         //不取消拥塞控制
    cwnd = _imin_(kcp->cwnd, cwnd);           //拥塞窗口 = 当前拥塞窗口和cwnd的最小值(也就是取当前拥塞窗口、发送窗口、接收窗口的最小值)

(4)将发送队列中的消息存入发送缓存队列(发送缓存队列就是发送窗口)


while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
    IKCPSEG *newseg;
    if (iqueue_is_empty(&kcp->snd_queue)) 
        break;

    newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);  //snd_queue:发送消息的队列

    iqueue_del(&newseg->node);                      //从发送消息队列中,删除节点
    iqueue_add_tail(&newseg->node, &kcp->snd_buf);  //然后把删除的节点,加入到kcp的发送缓存队列中
    kcp->nsnd_que--; 
    kcp->nsnd_buf++;

    newseg->conv = kcp->conv;     //会话id
    newseg->cmd = IKCP_CMD_PUSH;  //cmd:用来区分分片的作用。IKCP_CMD_PUSH:数据分片,IKCP_CMD_ACK:ack分片,IKCP_CMD_WASK:请求告知窗口大小,IKCP_CMD_WINS:告知窗口大小
    newseg->wnd = seg.wnd;  
    newseg->ts = current;           
    newseg->sn = kcp->snd_nxt++;  //下一个待发报的序号
    newseg->una = kcp->rcv_nxt;   //待收消息序号
    newseg->resendts = current;   //下次超时重传的时间戳
    newseg->rto = kcp->rx_rto;    //由ack接收延迟计算出来的重传超时时间
    newseg->fastack = 0;          //收到ack时计算的该分片被跳过的累计次数
    newseg->xmit = 0;             //发送分片的次数,每发送一次加一
}

(5)检查缓存队列中当前需要发送的数据(包括新传数据和重传数据)


// flush data segments
for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
    IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
    int needsend = 0;
    if (segment->xmit == 0) {
        needsend = 1;
        segment->xmit++;                 //发送分片的次数
        segment->rto = kcp->rx_rto;     //该分片超时重传的时间戳
        segment->resendts = current + segment->rto + rtomini;  //下次超时重传的时间戳
    }
    else if (_itimediff(current, segment->resendts) >= 0) {   //当前时间>下次重传时间。说明没有重传,即丢包了?
        needsend = 1;
        segment->xmit++;
        kcp->xmit++;
        if (kcp->nodelay == 0) {        //0:表示不启动快速重传模式
            segment->rto += kcp->rx_rto;    //不启动快速重传模式,每次重传之后rto的时间就是之前的2倍
        }    else {
            segment->rto += kcp->rx_rto / 2;  //启用快速重传之后,rto变成原来的1.5倍
        }
        segment->resendts = current + segment->rto;
        lost = 1;
    }
    else if (segment->fastack >= resent) {     //fastack:表示收到ack计算的该分片被跳过的累积次数
        needsend = 1;
        segment->xmit++;
        segment->fastack = 0;
        segment->resendts = current + segment->rto;
        change++;
    }

    if (needsend) {
        int size, need;
        segment->ts = current;
        segment->wnd = seg.wnd;       //剩余接收窗口大小。即接收窗口大小-接收队列大小
        segment->una = kcp->rcv_nxt;  //待接收消息序号

        size = (int)(ptr - buffer);
        need = IKCP_OVERHEAD + segment->len;   //segment报文默认大小 + segment的长度

        // 禁止数据包合包
        if (size + need > IKCP_OVERHEAD) {
            ikcp_output(kcp, buffer, size, IKCP_RETRY_FLAG);
            ptr = buffer;
        }

        ptr = ikcp_encode_seg(ptr, segment);

        if (segment->len > 0) {
            memcpy(ptr, segment->data, segment->len);
            ptr += segment->len;
        }

        if (segment->xmit >= kcp->dead_link) {
            kcp->state = -1;
        }

        // 重试次数打日志
        if (segment->xmit > 1)
        {
            ikcp_log(kcp, 0x80000000, "xmit: %d, sn: %d, rto: %u", segment->xmit, segment->sn, segment->rto);
        }
    }
}

// flash remain segments
size = (int)(ptr - buffer);
if (size > 0) {
    ikcp_output(kcp, buffer, size, IKCP_RETRY_FLAG);
}

(6)根据重传数据更新发送窗口大小;

(7)在发生快速重传的时候,会将慢启动阈值调整为当前发送窗口的一半,并把拥塞窗口大小调整为kcp.ssthresh + resent,resent是触发快速重传的丢包的次数,resent的值代表的意思在被弄丢的包后面收到了resent个数的包的ack,也就是我们在ikcp_nodelay方法中设置的resend的值。这样调整后kcp就进入了拥塞控制状态。

if (change) {
    IUINT32 inflight = kcp->snd_nxt - kcp->snd_una;   //下一个要分配的包 - 第一个未确认的包
    kcp->ssthresh = inflight / 2;                     //change=1说明发生过快速重传。当发生快速重传的时候,会将慢启动阈值调整为当前发送窗口的一半
    if (kcp->ssthresh < IKCP_THRESH_MIN)
        kcp->ssthresh = IKCP_THRESH_MIN;   
    kcp->cwnd = kcp->ssthresh + resent;   //并把拥塞窗口大小 = 拥塞窗口阈值 + 触发快速重传的ack大小
    kcp->incr = kcp->cwnd * kcp->mss;
}

(8)如果发生的超时重传,那么就重新进入慢启动状态。

if (lost) {
    kcp->ssthresh = cwnd / 2;   //丢包了。窗口的大小需要减半
    if (kcp->ssthresh < IKCP_THRESH_MIN)
        kcp->ssthresh = IKCP_THRESH_MIN;
    kcp->cwnd = 1;
    kcp->incr = kcp->mss;
}

3、kcp接收到下层协议UDP传进来的数据底层数据buffer转换成kcp的数据包格式;

int ikcp_input(ikcpcb *kcp, const char *data, long size)

KCP报文分为ACK报文、数据报文、探测窗口报文、响应窗口报文四种。
kcp报文的una字段(snd_una:第一个未确认的包)表示对端希望接收的下一个kcp包序号,也就是说明接收端已经收到了所有小于una序号的kcp包。解析una字段后需要把发送缓冲区里面包序号小于una的包全部丢弃掉。

ack报文则包含了对端收到的kcp包的序号,接到ack包后需要删除发送缓冲区中与ack包中的发送包序号(sn)相同的kcp包。

if (cmd == IKCP_CMD_ACK) {
    if ((_itimediff(kcp->current, ts) >= 0) && (_itimediff(sn, kcp->maxsn) >= 0)) {
        ikcp_update_ack(kcp, _itimediff(kcp->current, ts));
    }

    ikcp_parse_ack(kcp, sn);
    ikcp_shrink_buf(kcp);
    if (ikcp_canlog(kcp, IKCP_LOG_IN_ACK)) {
        ikcp_log(kcp, IKCP_LOG_IN_DATA, 
            "input ack: sn=%lu rtt=%ld rto=%ld", sn, 
            (long)_itimediff(kcp->current, ts),
            (long)kcp->rx_rto);
    }
}

解析ACK报文:

static void ikcp_parse_ack(ikcpcb *kcp, IUINT32 sn)
{
    struct IQUEUEHEAD *p, *next;

    if (_itimediff(sn, kcp->snd_una) < 0 || _itimediff(sn, kcp->snd_nxt) >= 0)
        return;

    for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = next) {
        IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
        next = p->next;
        if (sn == seg->sn) {
            iqueue_del(p);

            kcp->sumxmit += seg->xmit;
            ++kcp->sumseg;

            ikcp_segment_delete(kcp, seg);
            kcp->nsnd_buf--;
            break;
        }
        else {
            // 序号为sn的被跳过了
            seg->fastack++;
        }
    }
}

收到数据报文时,需要判断数据报文是否在接收窗口内,如果是则保存ack,如果数据报文的sn正好是待接收的第一个报文rcv_nxt,那么就更新rcv_nxt(加1)。如果配置了ackNodelay模式(无延迟ack)或者远端窗口为0(代表暂时不能发送用户数据),那么这里会立刻fulsh()发送ack。

else if (cmd == IKCP_CMD_PUSH) {    //数据报文
    if (ikcp_canlog(kcp, IKCP_LOG_IN_DATA)) {
        ikcp_log(kcp, IKCP_LOG_IN_DATA, 
            "input psh: sn=%lu ts=%lu", sn, ts);
    }
    if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) {
        ikcp_ack_push(kcp, sn, ts);     //sn:message分片segment的序号,ts:message发送时刻的时间戳
        if (_itimediff(sn, kcp->rcv_nxt) >= 0) {
            seg = ikcp_segment_new(kcp, len);
            seg->conv = conv;
            seg->cmd = cmd;
            seg->frg = frg;
            seg->wnd = wnd;
            seg->ts = ts;
            seg->sn = sn;
            seg->una = una;
            seg->len = len;

            if (len > 0) {
                memcpy(seg->data, data, len);
            }

            ikcp_parse_data(kcp, seg);
        }
    }
}

如果snd_una增加了那么就说明对端正常收到且回应了发送方发送缓冲区第一个待确认的包,此时需要更新cwnd(拥塞窗口)

 if (_itimediff(kcp->snd_una, una) > 0) {     //如果第一个未确认的包的序号>待接收消息序号
    if (kcp->cwnd < kcp->rmt_wnd) {          //用拥塞口大小 < 远端接收窗口大小
       IUINT32 mss = kcp->mss;
        if (kcp->cwnd < kcp->ssthresh) {     //拥塞窗口大小 < 拥塞窗口阈值
            kcp->cwnd++;                     //拥塞窗口+1
             kcp->incr += mss;                //可发送最大数据量增加最大分片个大小
         }   else {
           if (_itimediff(kcp->snd_una, una) > 0) {
        if (kcp->cwnd < kcp->rmt_wnd) {
            IUINT32 mss = kcp->mss;
            if (kcp->cwnd < kcp->ssthresh) {
                kcp->cwnd++;
                kcp->incr += mss;
            }    else {
                if (kcp->incr < mss) kcp->incr = mss;
                kcp->incr += (mss * mss) / kcp->incr + (mss / 16);
                if ((kcp->cwnd + 1) * mss <= kcp->incr) {
                    kcp->cwnd++;
                }
            }
            if (kcp->cwnd > kcp->rmt_wnd) {
                kcp->cwnd = kcp->rmt_wnd;
                kcp->incr = kcp->rmt_wnd * mss;
            }
        }
    }

4、kcp将接收到的kcp数据包还原成之前kcp发送的buffer数据。
int ikcp_recv(ikcpcb *kcp, char *buffer, int len)
如果有分片,则执行merge操作。

 类似资料:

相关阅读

相关文章

相关问答