1、发送数据片的结构体定义。
typedef struct tagRUDPSendSegment
{
uint64_t seq_;
uint64_t push_ts_; //进入发送队列的时刻
uint64_t last_send_ts_; //最后一次发送的时刻
uint16_t send_count_; //发送的次数
uint8_t data_[MAX_SEGMENT_SIZE];
uint16_t data_size_;
tagRUDPSendSegment()
{
reset();
}
void reset()
{
seq_ = 0;
push_ts_ = 0;
last_send_ts_ = 0;
send_count_ = 0;
data_size_ = 0;
}
}RUDPSendSegment;
typedef list<RUDPSendSegment*> SendDataList;
丢失的数据包的序号组成的集合
typedef set<uint64_t> LossIDSet;
滑动窗口结构体定义:
typedef map<uint64_t, RUDPSendSegment*> SendWindowMap;
key是数据片对应的ID,value是指向数据片的指针。数据片的ID从一个随机数(0或者1)开始,并依次增大,每个数据片的ID唯一。
待发送的数据片的集合定义。
typedef list<RUDPSendSegment*> SendDataList;
发送数据片时,是往上述list集合里面插入数据片。尝试发送数据片的时候,会依据list的大小从其中依次取出与发送滑动窗口尺寸相同数量的数据片,对于每一份数据片一份放入发送窗口,一份尝试发送。
【实现. h】
/*************************************************************************************
*filename: rudp_send_buffer.h
*
*to do: 定义RUDP的发送缓冲BUFFER和发送窗口
*Create on: 2013-04
*Author: zerok
*check list:
*************************************************************************************/
#ifndef __RUDP_SEND_BUFFER_H_
#define __RUDP_SEND_BUFFER_H_
#include "rudp/rudp_channel_interface.h"
#include "rudp/rudp_segment.h"
#include <map>
#include <list>
BASE_NAMESPACE_BEGIN_DECL
typedef map<uint64_t, RUDPSendSegment*> SendWindowMap;
typedef list<RUDPSendSegment*> SendDataList;
class RUDPCCCObject;
class RUDPSendBuffer
{
public:
RUDPSendBuffer();
virtual ~RUDPSendBuffer();
void reset();
//发送数据接口
int32_t send(const uint8_t* data, int32_t data_size);
//ACK处理
void on_ack(uint64_t ack_seq);
//NACK处理
void on_nack(uint64_t base_seq, const LossIDArray& loss_ids);
//定时器接口
void on_timer(uint64_t now_ts);
void check_buffer();
public:
uint64_t get_buffer_seq() {return buffer_seq_;}
//设置NAGLE算法
void set_nagle(bool nagle = true){nagle_ = nagle;}
bool get_nagle() const {return nagle_;}
//设置发送缓冲区的大小
void set_buffer_size(int32_t buffer_size){buffer_size_ = buffer_size;}
int32_t get_buffer_size() const {return buffer_size_;}
void set_net_channel(IRUDPNetChannel *channel) {net_channel_ = channel;}
void set_ccc(RUDPCCCObject* ccc) {ccc_ = ccc;}
uint32_t get_bandwidth();
int32_t get_buffer_data_size() const {return buffer_data_size_;}
void clear_loss();
protected:
//试图发送
void attempt_send(uint64_t now_timer);
uint32_t calculate_snd_size(uint64_t now_timer);
uint32_t get_threshold(uint32_t rtt);
protected:
IRUDPNetChannel* net_channel_;
//正在发送的数据片
SendWindowMap send_window_;
//正在发送的报文的丢包集合
LossIDSet loss_set_;
//等待发送的数据片
SendDataList send_data_;
//发送缓冲区的大小
int32_t buffer_size_;
//当前缓冲数据的大小
int32_t buffer_data_size_;
//当前BUFFER中最大的SEQ
uint64_t buffer_seq_;
//当前WINDOW中最大的SEQ
uint64_t cwnd_max_seq_;
//接收端最大的SEQ
uint64_t dest_max_seq_;
//接收端最大丢失的报文SEQ
uint64_t max_loss_seq_;
uint64_t send_ts_;
//速度控制器
RUDPCCCObject* ccc_;
//是否启动NAGLE算法
bool nagle_;
uint32_t bandwidth_;
uint64_t bandwidth_ts_;
};
BASE_NAMESPACE_END_DECL
#endif
/************************************************************************************/
.cpp
#include "rudp/rudp_send_buffer.h"
#include "revolver/base_timer_value.h"
#include "rudp/rudp_log_macro.h"
#include "rudp/rudp_ccc.h"
#include <math.h>
BASE_NAMESPACE_BEGIN_DECL
#define DEFAULT_RUDP_SEND_BUFFSIZE 4096
#define NAGLE_DELAY 100
RUDPSendBuffer::RUDPSendBuffer()
: net_channel_(NULL)
, buffer_seq_(0)
, dest_max_seq_(0)
, cwnd_max_seq_(0)
, max_loss_seq_(0)
, buffer_size_(DEFAULT_RUDP_SEND_BUFFSIZE)
, buffer_data_size_(0)
, nagle_(false)
, send_ts_(0)
, ccc_(NULL)
{
reset();
}
RUDPSendBuffer::~RUDPSendBuffer()
{
reset();
}
void RUDPSendBuffer::reset()
{
buffer_seq_ = rand() + 1;
dest_max_seq_ = buffer_seq_ - 1;
cwnd_max_seq_ = buffer_seq_;
max_loss_seq_ = buffer_seq_;
buffer_data_size_ = 0;
buffer_size_ = DEFAULT_RUDP_SEND_BUFFSIZE;
nagle_ = false;
send_ts_ = CBaseTimeValue::get_time_value().msec();
loss_set_.clear();
for(SendWindowMap::iterator it = send_window_.begin(); it != send_window_.end(); ++ it)
RETURN_SEND_SEG(it->second);
send_window_.clear();
for(SendDataList::iterator it = send_data_.begin(); it != send_data_.end(); ++ it)
RETURN_SEND_SEG(*it);
send_data_.clear();
bandwidth_ = 0;
bandwidth_ts_ = CBaseTimeValue::get_time_value().msec();
}
int32_t RUDPSendBuffer::send(const uint8_t* data, int32_t data_size)
{
int32_t copy_pos = 0;
int32_t copy_size = 0;
uint8_t* pos = (uint8_t *)data;
uint64_t now_timer = CBaseTimeValue::get_time_value().msec();
if(!send_data_.empty()) //粘包
{
RUDPSendSegment* last_seg = send_data_.back();
if(last_seg != NULL && last_seg->data_size_ < MAX_SEGMENT_SIZE)
{
copy_size = MAX_SEGMENT_SIZE - last_seg->data_size_;
if( copy_size > data_size)
copy_size = data_size;
memcpy(last_seg->data_ + last_seg->data_size_, pos, copy_size);
copy_pos += copy_size;
pos += copy_size;
last_seg->data_size_ += copy_size;
}
}
//分片
while(copy_pos < data_size)
{
GAIN_SEND_SEG(last_seg);
//设置初始化的的时刻
last_seg->push_ts_ = now_timer;
last_seg->seq_ = buffer_seq_;
buffer_seq_ ++;
//确定拷贝的块长度
copy_size = (data_size - copy_pos);
if(copy_size > MAX_SEGMENT_SIZE)
copy_size = MAX_SEGMENT_SIZE;
memcpy(last_seg->data_, pos, copy_size);
copy_pos += copy_size;
pos += copy_size;
last_seg->data_size_ = copy_size;
send_data_.push_back(last_seg);
if (buffer_data_size_ + copy_pos > buffer_size_)
break;
}
buffer_data_size_ += copy_pos;
//尝试发送,立即发送
//if(!nagle_)
attempt_send(now_timer);
return copy_pos;
}
void RUDPSendBuffer::on_ack(uint64_t seq)
{
//ID错误
if(cwnd_max_seq_ < seq)
return;
if(!send_window_.empty())
{
//删除窗口的数据片
SendWindowMap::iterator it = send_window_.begin();
while(it != send_window_.end() && it->first <= seq)
{
//更新数据缓冲的大小
if(buffer_data_size_ > it->second->data_size_)
buffer_data_size_ = - it->second->data_size_;
else
buffer_data_size_ = 0;
bandwidth_ += it->second->data_size_;
RETURN_SEND_SEG(it->second);
send_window_.erase(it);
it = send_window_.begin();
ccc_->add_recv(1);
}
//重发发送窗口中仅有的一个未确认的数据片
if(send_window_.size() == 1)
{
it = send_window_.begin();
uint64_t now_timer = CBaseTimeValue::get_time_value().msec();
net_channel_->send_data(0, it->second->seq_, it->second->data_, it->second->data_size_, now_timer);
it->second->last_send_ts_ = now_timer;
it->second->send_count_++;
ccc_->add_resend();
}
}
if (dest_max_seq_ < seq)
dest_max_seq_ = seq;
check_buffer();
//尝试发送
/*attempt_send(CBaseTimeValue::get_time_value().msec());*/
}
void RUDPSendBuffer::on_nack(uint64_t base_seq, const LossIDArray& loss_ids)
{
uint64_t now_timer = CBaseTimeValue::get_time_value().msec();
SendWindowMap::iterator it;
uint64_t seq = base_seq;
uint32_t sz = loss_ids.size();
for (size_t i = 0; i < sz; ++i)
{
/*设置成立即重发*/
seq = loss_ids[i] + base_seq;
it = send_window_.find(seq);
if (it != send_window_.end())
{
net_channel_->send_data(0, it->second->seq_, it->second->data_, it->second->data_size_, now_timer);
it->second->last_send_ts_ = now_timer;
it->second->send_count_++;
ccc_->add_resend();
}
}
if (loss_ids.size() > 0)
ccc_->on_loss(base_seq, loss_ids);
on_ack(base_seq);
}
void RUDPSendBuffer::on_timer(uint64_t now_timer)
{
attempt_send(now_timer);
check_buffer();
}
void RUDPSendBuffer::check_buffer()
{
buffer_size_ = core_max(buffer_size_, (ccc_->get_send_window_size() * MAX_SEGMENT_SIZE));
//检查是否可以写
if(buffer_data_size_ < buffer_size_ && net_channel_ != NULL)
net_channel_->on_write();
}
//CCC控制清除接口
void RUDPSendBuffer::clear_loss()
{
loss_set_.clear();
}
uint32_t RUDPSendBuffer::get_threshold(uint32_t rtt)
{
uint32_t rtt_threshold = 10;
uint32_t var_rtt = core_max(ccc_->get_rtt_var(), rtt / 16);
if (rtt < 10)
rtt_threshold = 10;
else
rtt_threshold = rtt + var_rtt;
if(rtt < 80)
rtt_threshold = rtt + var_rtt + 3;
else if (rtt < 100)
rtt_threshold = rtt + var_rtt + 10;
else
rtt_threshold = rtt + var_rtt;
return rtt_threshold;
}
uint32_t RUDPSendBuffer::calculate_snd_size(uint64_t now_timer)
{
uint32_t ret;
uint32_t timer;
if (now_timer < send_ts_ + 10)
ret = 0;
timer = now_timer - send_ts_;
ret = ccc_->get_send_window_size() * timer / get_threshold(ccc_->get_rtt());
send_ts_ = now_timer;
return core_min(ret, ccc_->get_send_window_size());
}
void RUDPSendBuffer::attempt_send(uint64_t now_timer)
{
uint32_t cwnd_size;
uint32_t ccc_cwnd_size = ccc_->get_send_window_size();
RUDPSendSegment* seg = NULL;
SendWindowMap::iterator map_it;
cwnd_size = 0;
//判断是否可以发送新的报文
while (!send_data_.empty() && send_window_.size() < ccc_cwnd_size)
{
RUDPSendSegment* seg = send_data_.front();
//判断NAGLE算法,NAGLE最少需要在100MS凑1024个字节报文
if(nagle_ && seg->push_ts_ + NAGLE_DELAY > now_timer && seg->data_size_ < MAX_SEGMENT_SIZE - 256)
break;
//判断发送窗口
send_data_.pop_front();
send_window_.insert(SendWindowMap::value_type(seg->seq_, seg));
cwnd_size += seg->data_size_;
now_timer = CBaseTimeValue::get_time_value().msec();
seg->push_ts_ = now_timer;
seg->last_send_ts_ = now_timer;
seg->send_count_ = 1;
net_channel_->send_data(0, seg->seq_, seg->data_, seg->data_size_, now_timer);
if(cwnd_max_seq_ < seg->seq_)
cwnd_max_seq_ = seg->seq_;
}
}
//计算带宽
//单位 b/s
uint32_t RUDPSendBuffer::get_bandwidth()
{
uint32_t ret = 0;
uint64_t cur_ts = CBaseTimeValue::get_time_value().msec();
if(cur_ts > bandwidth_ts_)
ret = static_cast<uint32_t>(bandwidth_ * 1000 / (cur_ts - bandwidth_ts_));
else
ret = bandwidth_ * 1000;
bandwidth_ts_ = cur_ts;
bandwidth_ = 0;
return ret;
}
BASE_NAMESPACE_END_DECL
【实现解析】
[1].int32_t send(const uint8_t* data,int32_t data_size)。将数据片插入待发送数据片链表中。首先判断待发送的链表是否为空,若不为空,则从链表中取出最后一个数据片。若数据片数据部分的长度小于数据片的最大数据长度。计算最后一个数据片的剩余空间,若剩余空间可以放下当前数据,则将当前数据追加到最后一个数据片的后面;若最后一个数据片的长度不小于数据片的最大数据长度或者最后一个数据片的剩余空间不足以放下当前数据,则要分片将当前数据放入待发送数据片链表。
[2].void attempt_send(uint64_t now_timer)。尝试发送滑动窗口里面的数据片。调用当前函数时,首先判断,首先判断当前待发送数据片组成的链表是否为空且当前滑动窗口的尺寸是否小于速率控制器计算出的滑动窗口尺寸。若前面两个条件均满足,则依次从待发送数据链表首部中取出数据片,一份送入滑动窗口。一份从从网络通道接口发送数据。同时更新当前滑动窗口中的已发送的最大数据片ID。
[3].void on_ack(uint64_t seq)。根据当前的回复的ack的ID,处理ACK。首先,判断ACK的ID,若ACK的ID大于当前发送窗口最大的数据片的ID,则说明当前的ACK是错误的ACK,直接结束函数的调用;若ACK的ID是正常的,则要判断滑动窗口是否为空,若滑动窗口为空,则说明当前已经发送出去的数据片均已得到了确认,若滑动窗口不为空,则遍历滑动窗口中的每一个数据片,对于ID小于ACK的ID的数据片,将其从滑动窗口中删除,在此期间,要更新数据缓冲的大小,计算带宽的字节数,以及速率控制器中的接收数据片的数量。若上一步结束之后,滑动窗口中仅有一个数据片,则重新发送这最后一个数据片并将速率控制器中的重发次数加一。然后,更新接收端已接收的最大的数据片的ID。最后,检查是否可以写。
[4].void on_nack(uint64_t base_seq,const LossIDArray& loss_ids)。根据nack包来重发丢失的包,其中参数base_seq表示接收端接收到的连续的数据片的最大的ID,LossIDArray是下面这种结构:
typedef vector<uint16_t> LossIDArray;
loss_ids实际上是丢失的片的ID相对base_seq的偏移量。因此,知道base_seq与loss_ids,就知道了接收端未成功接收的数据片的ID。因此,对于接收端尚未接收到的数据片,全部重新发送。要更新每一个数据片发送时的发送时间与发送次数,速率控制器中的重发次数也需要更新。
[4].void on_timer(uint64_t now_timer)。定时调用。尝试从滑动窗口中发送数据与检查网络通道接口是否可写。
[5].void check_buffer()。更新发送缓冲区的尺寸,并检查网络通道接口是否可写。
[6].计算带宽。单位为b/s。注:只有接受到了ACK之后数据包才算接收到,这个数据包的尺寸才可用于计算带宽。