对于游戏开发,尤其是 MOBA(多人在线竞技)游戏,延迟是需要控制的。但是对于传统的 TCP(网络友好,很棒),并不利于包的实时性传输,因为他的超时重传和拥塞控制都是网络友好,对于我们包的实时性,没有优势。所以一般都是需要基于 UDP 去实现一套自己的网络协议,保证包的实时,以及可靠。其实就是牺牲 TCP 的友好,牺牲带宽,以空间换时间。基于 UDP,网上有一些优秀的协议,比如 KCP。
KCP 只是简单的算法实现,并没有涉及到任何的底层调用。我们只需要在 UDP 系统调用的时候,注册 KCP 回调函数,即可使用。所以可以将它理解为一个应用层协议。对比 TCP:
TCP 的 RTO 翻倍。这个概念是很恐怖的。KCP 为 1.5 倍。
选择性重传,只会传输丢失的数据包。
快速重传,不会等到超时。默认若干次重新传输
TCP 会延时发送 ACK。KCP 可设置
非退让流控。发送窗口可以只取决于发送缓存大小和接收端剩余接收缓存大小。
KCP 为了实现选择性重传(ARQ),会维护一个接收窗口(滑动窗口)。如果收到有序数据会将其放到接收队列,以待应用层消费。如果存在包丢失,会判断。超过设置的次数,会让其选择重传对应的包。其实就是通过一个 rcv_nxt(接收窗口当前偏移)来判断当前需要接受的数据包。如果收到的包在窗口范围,但是不是 rcv_nxt。先保存,等包连续之后才会将连续的数据包放入到接受队列供应用层消费。同样网络不好的情况,KCP 也会实现拥塞控制,限制发送端的包。
本文福利, 免费领取C++音视频学习资料包、技术视频,内容包括(音视频开发,面试题,FFmpeg ,webRTC ,rtmp ,hls ,rtsp ,ffplay ,编解码,推拉流,srs)↓↓↓↓↓↓见下面↓↓文章底部点击免费领取↓↓
首先在分析之前我们应该去 github 看一下使用方法。其实很简单,初始化 kcp 对象,然后实现回调函数,其实就是实现自己底层 UDP 系统调用。每次我们通过 KCP 发包的时候,他都会调用这个回调。UDP 收到包之后调用 ikcp_input 函数,即可。我们最终只需要通过 ikcp_send 和 ikcp_recv 收发数据。
在看代码前,先看看 kcp 数据包的结构,Segement
struct IKCPSEG
{
struct IQUEUEHEAD node;
IUINT32 conv; //会话编号,两方一致才会通信
IUINT32 cmd; //指令类型,四种下面会说
IUINT32 frg; //分片编号 倒数第几个seg。主要就是用来合并一块被分段的数据。
IUINT32 wnd; //自己可用窗口大小
IUINT32 ts;
IUINT32 sn; //编号 确认编号或者报文编号
IUINT32 una; //代表编号前面的所有报都收到了的标志
IUINT32 len;
IUINT32 resendts; //重传的时间戳。超过当前时间重发这个包
IUINT32 rto; //超时重传时间,根据网络去定
IUINT32 fastack; //快速重传机制,记录被跳过的次数,超过次数进行快速重传
IUINT32 xmit; //重传次数
char data[1]; //数据内容
};
1234567891011121314151617
Kcp 就是通过数据包的这些字段,实现稳定通信,针对不同的点可以去做优化。从上面的字段,也可以看出 kcp 借助 UNA 和 ACK 实现了选择性重传。
首先来看包发送的逻辑,我们会调用 ikcp_send 方法。
这个方法,首先会判断 kcp 流。并尝试将包追加到前一段,如果可能的话。否则进行分片传输。
if (len <= (int)kcp->mss) count = 1;
else count = (len + kcp->mss - 1) / kcp->mss;
if (count >= (int)IKCP_WND_RCV) return -2;
if (count == 0) count = 1;
// fragment
for (i = 0; i < count; i++) {
int size = len > (int)kcp->mss ? (int)kcp->mss : len;
seg = ikcp_segment_new(kcp, size);
assert(seg);
if (seg == NULL) {
return -2;
}
if (buffer && len > 0) {
memcpy(seg->data, buffer, size);
}
seg->len = size;
seg->frg = (kcp->stream == 0)? (count - i - 1) : 0;
iqueue_init(&seg->node);
iqueue_add_tail(&seg->node, &kcp->snd_queue);
kcp->nsnd_que++;
if (buffer) {
buffer += size;
}
len -= size;
}
return 0;
上面的代码逻辑中 count 其实就是包的分片数。然后循环,创建 segment,segment 的数据结构主要就是保存了分片的数据包信息。比如 eg->frg 保存当前分片的编号。完事之后调用 iqueue_add_tail 方法将 segment 传入到发送队列。这些方法通过宏定义实现。其实就是链表操作。队列是一个双向链表。逻辑很简单。那么这一步之时将数据分片放入到队列。具体发送逻辑在哪实现呢,继续往下看。
我们看一下回调的逻辑,其实就是 ikcp_output 方法,这个方法会在 ikcp_flush 中调用。也就是 ikcp_output 做的是最终的数据发送。那是如何驱动的呢?我先来看看这个方法。
1.这个方法首先发送 ack。遍历所有 ack。调用 ikcp_output 方法发送。
count = kcp->ackcount;
for (i = 0; i < count; i++) {
size = (int)(ptr - buffer);
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ikcp_ack_get(kcp, i, &seg.sn, &seg.ts);
ptr = ikcp_encode_seg(ptr, &seg);
}
kcp->ackcount = 0;
2.判断当前是否需要进行窗口探测,因为如果窗口为 0,是不能发数据,所以必须进行窗口探测才行。探测结束之后,如果需要,设置探测窗口时间。发送探测窗口的请求或者窗口恢复的请求。主要就是请求对端窗口大小,以及告知远端窗口大小。
while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
IKCPSEG *newseg;
if (iqueue_is_empty(&kcp->snd_queue)) break;
newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);
iqueue_del(&newseg->node);
iqueue_add_tail(&newseg->node, &kcp->snd_buf);
kcp->nsnd_que--;
kcp->nsnd_buf++;
newseg->conv = kcp->conv;
newseg->cmd = IKCP_CMD_PUSH;
newseg->wnd = seg.wnd;
newseg->ts = current;
newseg->sn = kcp->snd_nxt++;
newseg->una = kcp->rcv_nxt;
newseg->resendts = current;
newseg->rto = kcp->rx_rto;
newseg->fastack = 0;
newseg->xmit = 0;
}
完事之后将结果放入 seg 中。
3.计算本次发送可用的窗口大小,由多个因素决定,KCP 有选择性配置。可以选择不结合流控窗口。
4.将发送队列中的消息放到发送缓冲区,其实就是发送窗口。也就是说所有发送后的数据都会在这个缓存区。发送数据之前,还需要设置对应的重传次数和间隔。
while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
IKCPSEG *newseg;
if (iqueue_is_empty(&kcp->snd_queue)) break;
newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);
iqueue_del(&newseg->node);
iqueue_add_tail(&newseg->node, &kcp->snd_buf);
kcp->nsnd_que--;
kcp->nsnd_buf++;
newseg->conv = kcp->conv;
newseg->cmd = IKCP_CMD_PUSH;
newseg->wnd = seg.wnd;
newseg->ts = current;
newseg->sn = kcp->snd_nxt++;
newseg->una = kcp->rcv_nxt;
newseg->resendts = current;
newseg->rto = kcp->rx_rto;
newseg->fastack = 0;
newseg->xmit = 0;
}
这个逻辑就比较简单了,其实就从发送窗口队列拿出一个 seg。然后设置对应的参数。并且更新缓冲队列。以及缓冲队列的大小。如果设置 nodelay,重传时间有*2 变成 1.5
5.遍历发送窗口队列。判断是否有需要发送的数据(包括重新传输的)。其实就是拿到对应的 segment,然后根据信息进行逻辑判断是否需要重新传输。或者需要发送。判断结束之后进行重新传输。
逻辑也很简单
如果包是第一次传输,直接发。
如果到了包的重传时间,再次传输,并且记录丢失标志
如果被跳过的次数超过了 fastack,重新传输。
其实 lost 和 change 是用来更新窗口大小的字段。并且两个更新算法不一样。
if (segment->xmit == 0) {
needsend = 1;
segment->xmit++;
segment->rto = kcp->rx_rto;
segment->resendts = current + segment->rto + rtomin;
}
else if (_itimediff(current, segment->resendts) >= 0) {
needsend = 1;
segment->xmit++;
kcp->xmit++;
if (kcp->nodelay == 0) {
segment->rto += kcp->rx_rto;
} else {
segment->rto += kcp->rx_rto / 2;
}
segment->resendts = current + segment->rto;
//记录包丢失
lost = 1;
}
else if (segment->fastack >= resent) {
if ((int)segment->xmit <= kcp->fastlimit ||
kcp->fastlimit <= 0) {
needsend = 1;
segment->xmit++;
segment->fastack = 0;
segment->resendts = current + segment->rto;
//用来标示发生了快速重传
change++;
}
}
基本上所有的快速重传和超时重传的逻辑都在这个方法中。如果出现超时重传(丢包),就会进入慢启动,拥塞窗口减半,滑动窗口变为 1。如果发生了快速重传,也会更新拥塞窗口。具体算法可看代码。
看完这个 flush 方法,我们基本了解发送数据的逻辑。然后就看他在哪调用的。
其实就是在 ikcp_update 方法中就行调用,这个方法需要应用层反复调用,一般可以为 10ms 和 100ms,时间将决定数据发送的实时性。也就是说他会定时刷新判断发送窗口队列的数据或者需要重传的数据,并通过底层 UDP 进行数据发送。这个方法没有什么逻辑。
void ikcp_update(ikcpcb *kcp, IUINT32 current)
{
IINT32 slap;
kcp->current = current;
if (kcp->updated == 0) {
kcp->updated = 1;
kcp->ts_flush = kcp->current;
}
slap = _itimediff(kcp->current, kcp->ts_flush);
if (slap >= 10000 || slap < -10000) {
kcp->ts_flush = kcp->current;
slap = 0;
}
if (slap >= 0) {
kcp->ts_flush += kcp->interval;
if (_itimediff(kcp->current, kcp->ts_flush) >= 0) {
kcp->ts_flush = kcp->current + kcp->interval;
}
ikcp_flush(kcp);
}
}
然后根据字段去调用 ikcp_parse_una 和 ikcp_shrink_buf 方法。前者是解析 una,确定已经发出去的数据包,有哪些对方接收到了。如果收到了直接重接受窗口移除。后者是更新 kcp 的 send_una。send_una 代表之前的包已经确定收到。
2.如果是 ACK 指令,其实就是做了一些处理。
ikcp_update_ack 主要就是更新 kcp 的一些参数,包括 rtt 以及 rto, 首先 ikcp_parse_ack 方法主要就是根据 sn,去移除发送队列中对应的 segment。然后就是更新 maxack 以及时间,并且记录日志
if (cmd == IKCP_CMD_ACK) {
if (_itimediff(kcp->current, ts) >= 0) {
ikcp_update_ack(kcp, _itimediff(kcp->current, ts));
}
ikcp_parse_ack(kcp, sn);
//根据snd队列去更新una
ikcp_shrink_buf(kcp);
if (flag == 0) {
flag = 1;
maxack = sn;
latest_ts = ts;
} else {
if (_itimediff(sn, maxack) > 0) {
#ifndef IKCP_FASTACK_CONSERVE
//记录最大ACK
maxack = sn;
latest_ts = ts;
#else
if (_itimediff(ts, latest_ts) > 0) {
maxack = sn;
latest_ts = ts;
}
#endif
}
}
//打印日志
}
3.如果收到的是数据包,这个逻辑其实很简单,就是检测数据,并将有效的数据放到接受队列,首先就是判断数据包是否有效,如果是,构造一个 segment。将数据放入。,然后调用 ikcp_parse_data 方法。这个方法逻辑也比较简单,其实就是判断是否有效,如果已经被接收过的话,就丢弃,否则根据 sn(编号)插入到接收队列。
else if (cmd == IKCP_CMD_PUSH) {
if (ikcp_canlog(kcp, IKCP_LOG_IN_DATA)) {
ikcp_log(kcp, IKCP_LOG_IN_DATA,
"input psh: sn=%lu ts=%lu", (unsigned long)sn, (unsigned long)ts);
}
if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) {
ikcp_ack_push(kcp, sn, ts);
if (_itimediff(sn, kcp->rcv_nxt) >= 0) {
seg = ikcp_segment_new(kcp, len);
seg->conv = conv;
seg->cmd = cmd;
seg->frg = frg;
seg->wnd = wnd;
seg->ts = ts;
seg->sn = sn;
seg->una = una;
seg->len = len;
if (len > 0) {
memcpy(seg->data, data, len);
}
ikcp_parse_data(kcp, seg);
}
}
}
如果是询问窗口大小的包。这个其实就做个标记,因为每个 kcp 的 header 都有 win 大小。剩下的操作就是根据网络状况更新拥塞以及窗口大小了。
看了 kcp 的实现,其实发现和传输层的 TCP 差不多,只不过进行一下微调和可控。比如牺牲流控保证数据包的实时传输。或者加速重传等等。还有通过 una 和 ack 实现选择性重传。总的来说用于游戏帧同步或者数据实时传输领域还是有一定的优势。
本文福利, 免费领取C++音视频学习资料包、技术视频,内容包括(音视频开发,面试题,FFmpeg ,webRTC ,rtmp ,hls ,rtsp ,ffplay ,编解码,推拉流,srs)↓↓↓↓↓↓见下面↓↓文章底部点击免费领取↓↓