FQ队列中对发送时间未到的流结构单独存放在delayed红黑树中,这样的流结构即不在new_flow链表,也不在old_flow链表,其next指针执行一个特定的值:throttled。内核函数通过检查流结构的next指针,来判断流是否处于throttled状态。
new_flow链表中的流结构的优先级高于old_flows中的流。
/* special value to mark a detached flow (not on old/new list) */
static struct fq_flow detached, throttled;
static bool fq_flow_is_throttled(const struct fq_flow *f)
{
return f->next == &throttled;
}
当流结构中的报文到达发送时间之后,函数fq_flow_unset_throttled将其由红黑树中删除,并将其添加到old_flows链表中。函数fq_flow_add_tail会更新流结构的next指针。
static void fq_flow_unset_throttled(struct fq_sched_data *q, struct fq_flow *f)
{
rb_erase(&f->rate_node, &q->delayed);
q->throttled_flows--;
fq_flow_add_tail(&q->old_flows, f);
}
如果新接收的报文所属的套接口与FQ中对应的套接口相等,但是两者的hash值不同,表明内核的套接口经过了重新分配(reallocated),此时,如果流处于throttled状态,将其退出throttled,并且设置time_next_packet为零0,重新开始发送此流结构的报文。
static struct fq_flow *fq_classify(struct sk_buff *skb, struct fq_sched_data *q)
{
...
p = &root->rb_node;
parent = NULL;
while (*p) {
parent = *p;
f = rb_entry(parent, struct fq_flow, fq_node);
if (f->sk == sk) {
/* socket might have been reallocated, so check if its sk_hash is the same.
* It not, we need to refill credit with initial quantum
*/
if (unlikely(skb->sk && f->socket_hash != sk->sk_hash)) {
f->credit = q->initial_quantum;
f->socket_hash = sk->sk_hash;
if (fq_flow_is_throttled(f))
fq_flow_unset_throttled(q, f);
f->time_next_packet = 0ULL;
}
return f;
在fq_dequeue中,调用fq_check_throttled检查delayed树中是否有可发送报文的流结构。变量time_next_delayed_flow大于当前时刻,表明整个delayed树中没有到期可发送报文。否则,遍历delayed树,如果当前遍历的流结构的应发送时间戳小于当前时刻,将其退出throttled状态,否则,更新time_next_delayed_flow记录的delayed树中最早应发送报文的时间戳,并退出遍历。
另外,统计值unthrottle_latency_ns使用EWMA算法生成,最近一次采样的占比为1/8。
static void fq_check_throttled(struct fq_sched_data *q, u64 now)
{
unsigned long sample;
struct rb_node *p;
if (q->time_next_delayed_flow > now) return;
/* Update unthrottle latency EWMA.
* This is cheap and can help diagnosing timer/latency problems.
*/
sample = (unsigned long)(now - q->time_next_delayed_flow);
q->unthrottle_latency_ns -= q->unthrottle_latency_ns >> 3;
q->unthrottle_latency_ns += sample >> 3;
q->time_next_delayed_flow = ~0ULL;
while ((p = rb_first(&q->delayed)) != NULL) {
struct fq_flow *f = rb_entry(p, struct fq_flow, rate_node);
if (f->time_next_packet > now) {
q->time_next_delayed_flow = f->time_next_packet;
break;
}
fq_flow_unset_throttled(q, f);
在每次fq_dequeue执行时,首先检查fq中throttled状态的流结构,由以上函数fq_check_throttled实现。接下来,如果new_flows和old_flows都为空,并且delayed树中有待发送的流结构,此时启动watchdog定时器。
static struct sk_buff *fq_dequeue(struct Qdisc *sch)
{
struct fq_sched_data *q = qdisc_priv(sch);
struct fq_flow_head *head;
struct fq_flow *f;
if (!sch->q.qlen) return NULL;
skb = fq_dequeue_head(sch, &q->internal);
if (skb) goto out;
now = ktime_get_ns();
fq_check_throttled(q, now);
begin:
head = &q->new_flows;
if (!head->first) {
head = &q->old_flows;
if (!head->first) {
if (q->time_next_delayed_flow != ~0ULL)
qdisc_watchdog_schedule_ns(&q->watchdog,
q->time_next_delayed_flow);
return NULL;
}
}
f = head->first;
if (f->credit <= 0) {
f->credit += q->quantum;
head->first = f->next;
fq_flow_add_tail(&q->old_flows, f);
goto begin;
}
对于当前处理的流结构,如果其credit有值,但是报文的下一次发送时间还没有到,将此流结构由链表(new或者old链表)中移除,并将此流结构设置为节流状态(throttled)。链表(new或者old)的头部指向下一个流结构。
skb = f->head;
if (skb) {
u64 time_next_packet = max_t(u64, ktime_to_ns(skb->tstamp),
f->time_next_packet);
if (now < time_next_packet) {
head->first = f->next;
f->time_next_packet = time_next_packet;
fq_flow_set_throttled(q, f);
goto begin;
}
所有throttled状态的流结构链接在delayed表示的红黑树中,以下一个报文发送时间作为键值。由于流结构的next指针还是指向new_flow或者old_flow链表中的元素,这里将next执行全局静态变量throttled,用于表示此流结构的throttled状态,函数fq_flow_is_throttled即通过判断流结构的next决定其是否处于throttled状态。
变量time_next_delayed_flow保存了整个delayed红黑树中,最早要发送的报文的发送时间。
static void fq_flow_set_throttled(struct fq_sched_data *q, struct fq_flow *f)
{
struct rb_node **p = &q->delayed.rb_node, *parent = NULL;
while (*p) {
struct fq_flow *aux;
parent = *p;
aux = rb_entry(parent, struct fq_flow, rate_node);
if (f->time_next_packet >= aux->time_next_packet)
p = &parent->rb_right;
else
p = &parent->rb_left;
}
rb_link_node(&f->rate_node, parent, p);
rb_insert_color(&f->rate_node, &q->delayed);
q->throttled_flows++;
q->stat_throttled++;
f->next = &throttled;
if (q->time_next_delayed_flow > f->time_next_packet)
q->time_next_delayed_flow = f->time_next_packet;
}
内核版本 5.0