在介绍FQ入队列操作之前,先看一下流量的识别部分。
对于一些协议报文,比如HSR(High-availability Seamless Redundancy)、IGMP和HDLC等,其将priority字段设置为了TC_PRIO_CONTROL,对于此类报文,FQ使用一个内部特定的流处理(q->internal)。
static struct fq_flow *fq_classify(struct sk_buff *skb, struct fq_sched_data *q)
{
struct rb_node **p, *parent;
struct sock *sk = skb->sk;
struct rb_root *root;
struct fq_flow *f;
/* warning: no starvation prevention... */
if (unlikely((skb->priority & TC_PRIO_MAX) == TC_PRIO_CONTROL))
return &q->internal;
对于SYNACK报文,其关联在一个状态为TCP_NEW_SYN_RECV的请求套接口(request socket)上,或者,在SYNCOOKIE模式下,其关联在监听套接口(listen socket)上。前一种情况下,请求套接口并不是完整的套接口,没有sk_pacing_rate成员字段;对于后一种情况,如果监听套接口没有设置SO_MAX_PACING_RATE选项,也不会对报文进行限速。
在这两种情况下,报文都还不是某个流的一部分,内核中按照orphaned报文进行处理。以下将报文的哈希值与上命令行设置的orphan_mask,并且在最末尾设置1,这样得到的地址将不同于正常的套接口(sock)地址,因为正常套接口地址是按照word对其的,最后两个bit位为零。
注意这里得到的sk并不是真正的套接口,不能当做套接口使用。
/* SYNACK messages are attached to a TCP_NEW_SYN_RECV request socket
* or a listener (SYNCOOKIE mode)
* 1) request sockets are not full blown,
* they do not contain sk_pacing_rate
* 2) They are not part of a 'flow' yet
* 3) We do not want to rate limit them (eg SYNFLOOD attack),
* especially if the listener set SO_MAX_PACING_RATE
* 4) We pretend they are orphaned
*/
if (!sk || sk_listener(sk)) {
unsigned long hash = skb_get_hash(skb) & q->orphan_mask;
/* By forcing low order bit to 1, we make sure to not
* collide with a local flow (socket pointers are word aligned)
*/
sk = (struct sock *)((hash << 1) | 1UL);
skb_orphan(skb);
}
以下根据套接口(sk)地址做哈希运算,定位到bucket,即红黑树的根root。如果流的数量大于等于2倍的bucket数量,并且不活动的流数量超过总数量的一半,进行回收操作。
root = &q->fq_root[hash_ptr(sk, q->fq_trees_log)];
if (q->flows >= (2U << q->fq_trees_log) &&
q->inactive_flows > q->flows/2)
fq_gc(q, root, sk);
接下来,遍历红黑树,如果发现树中已经有与以上键值sk相同的流结构,返回此结构。但是在返回之前,需要检查报文是否具有真实的套接口结构sk,并且sk中的哈希成员sk_hash与流结构中保存的套接口哈希socket_hash是否相同?如果hash不同,表示套接口结构可能经过了重新分配,流结构中保存的sk值已经不是原来的套接口了,此时,重新为流赋予初始的initial_quantum,更新哈希值。并且,如果此流之前处于限速throttled状态,解除此状态。
p = &root->rb_node;
parent = NULL;
while (*p) {
parent = *p;
f = rb_entry(parent, struct fq_flow, fq_node);
if (f->sk == sk) {
/* socket might have been reallocated, so check if its sk_hash is the same.
* It not, we need to refill credit with initial quantum
*/
if (unlikely(skb->sk && f->socket_hash != sk->sk_hash)) {
f->credit = q->initial_quantum;
f->socket_hash = sk->sk_hash;
if (fq_flow_is_throttled(f))
fq_flow_unset_throttled(q, f);
f->time_next_packet = 0ULL;
}
return f;
}
if (f->sk > sk)
p = &parent->rb_right;
else
p = &parent->rb_left;
}
在以上没有找到对应的流结构之后,需要重新创建。如果报文skb具有真实的套接口,使用套接口结构中的哈希值sk_hash来初始化流结构中的套接口哈希值socket_hash,对于新的流,赋予初始的quantum值,即initial_quantum。此新分配的流,状态为detached,并且标记为不活动流,递增inactive_flows。
将新流结构插入到对应的fq_root[]红黑树中。在下一节的fq_enqueue函数中,将会看到,条件合适的话,新创建的流被链接到new_flows链表中。
f = kmem_cache_zalloc(fq_flow_cachep, GFP_ATOMIC | __GFP_NOWARN);
if (unlikely(!f)) {
q->stat_allocation_errors++;
return &q->internal;
}
fq_flow_set_detached(f);
f->sk = sk;
if (skb->sk)
f->socket_hash = sk->sk_hash;
f->credit = q->initial_quantum;
rb_link_node(&f->fq_node, parent, p);
rb_insert_color(&f->fq_node, root);
q->flows++;
q->inactive_flows++;
return f;
FQ入队函数fq_enqueue如下,当队列长度qlen大于设定的限值时,丢弃报文,不进行入队处理。
static int fq_enqueue(struct sk_buff *skb, struct Qdisc *sch, struct sk_buff **to_free)
{
struct fq_sched_data *q = qdisc_priv(sch);
struct fq_flow *f;
if (unlikely(sch->q.qlen >= sch->limit))
return qdisc_drop(skb, sch, to_free);
调用上一节函数fq_classify获取相应的流结构(fq_flow),如果此流结构中的队列长度超过了TC命令的限定值(flow_plimit),对报文进行丢弃处理。否则,递增流队列计数(f->qlen++)。
f = fq_classify(skb, q);
if (unlikely(f->qlen >= q->flow_plimit && f != &q->internal)) {
q->stat_flows_plimit++;
return qdisc_drop(skb, sch, to_free);
}
f->qlen++;
qdisc_qstats_backlog_inc(sch, skb);
如果流处于detached状态,比如刚刚创建等,将其添加到new_flows链表的末尾。如果此流的时间戳age,距离当下已经超过refill_delay时长,重新为其增加quantum值,但是其信用不超过其当前的信用credit。这种情况是针对老的流,即old_flows链表中的流由于没有报文,被设置为detached状态,其可能还有未耗尽的credit值。
另外,对于真实的套接口,如果开启了pacing功能(rate_enable),为套接口设置SK_PACING_FQ标志。由于将流添加到了new_flows链表中,递减不活动流的数量。
if (fq_flow_is_detached(f)) {
struct sock *sk = skb->sk;
fq_flow_add_tail(&q->new_flows, f);
if (time_after(jiffies, f->age + q->flow_refill_delay))
f->credit = max_t(u32, f->credit, q->quantum);
if (sk && q->rate_enable) {
if (unlikely(smp_load_acquire(&sk->sk_pacing_status) != SK_PACING_FQ))
smp_store_release(&sk->sk_pacing_status, SK_PACING_FQ);
}
q->inactive_flows--;
}
以下,将报文添加到流队列中,递增相应的计数。
/* Note: this overwrites f->age */
flow_queue_add(f, skb);
if (unlikely(f == &q->internal)) {
q->stat_internal_packets++;
}
sch->q.qlen++;
return NET_XMIT_SUCCESS;
首先,处理高优先级的内部流结构(internal),如果其中没有可处理报文,进行以下处理。
static struct sk_buff *fq_dequeue(struct Qdisc *sch)
{
struct fq_sched_data *q = qdisc_priv(sch);
struct fq_flow_head *head;
struct fq_flow *f;
if (!sch->q.qlen)
return NULL;
skb = fq_dequeue_head(sch, &q->internal);
if (skb)
goto out;
以下检查throttled状态的流结构,查看是否有可解除throttled状态的流。
now = ktime_get_ns();
fq_check_throttled(q, now);
之后,如果new_flows和old_flows两个链表都为空,判断是否有被延迟的流,启动watchdog定时器,处理下一个到达发送时间的流,下一个发送时间为time_next_delayed_flow。定时处理函数为qdisc_watchdog。
begin:
head = &q->new_flows;
if (!head->first) {
head = &q->old_flows;
if (!head->first) {
if (q->time_next_delayed_flow != ~0ULL)
qdisc_watchdog_schedule_ns(&q->watchdog, q->time_next_delayed_flow);
return NULL;
}
}
如果new_flows或者old_flows链表有一个不为空,取出首部的流结构,如果其信用credit已经用完,重新给与其quantum数量的信用,但是将其添加到old_flows链表的末尾,即下一次轮询才会处理。调回到开始begin处,处理链表中的下一个流结构。
f = head->first;
if (f->credit <= 0) {
f->credit += q->quantum;
head->first = f->next;
fq_flow_add_tail(&q->old_flows, f);
goto begin;
}
找到一个信用值大于零的流结构,取出其中头部的报文,由报文的时间戳和流结构中的下一个报文发送时间戳中的较大值来确定报文的发送时间,如果还没有到达其发送时间,不发送此报文。否则,如果当前时间已经超过发送时间,并且超过了ce_threshold定义的时长,表明发生了拥塞,导致报文未能及时发送,设置ECN标志。
skb = f->head;
if (skb) {
u64 time_next_packet = max_t(u64, ktime_to_ns(skb->tstamp),
f->time_next_packet);
if (now < time_next_packet) {
head->first = f->next;
f->time_next_packet = time_next_packet;
fq_flow_set_throttled(q, f);
goto begin;
}
if (time_next_packet &&
(s64)(now - time_next_packet - q->ce_threshold) > 0) {
INET_ECN_set_ce(skb);
q->stat_ce_mark++;
}
}
此时,正式将报文由流结构队列头部取出,如果skb为空,表明流队列已空,需要处理下一个流结构。此时,如果空的流结构处于new_flows链表上,并且old_flows链表不为空,将此空的流结构添加到old_flows链表尾部。
否则,如果空的流结构处在old_flows链表,或者old_flows链表为空,直接将空的流结构设置为detached状态,将非活动流的数量递增一。返回开头处理下一个流结构。
skb = fq_dequeue_head(sch, f);
if (!skb) {
head->first = f->next;
/* force a pass through old_flows to prevent starvation */
if ((head == &q->new_flows) && q->old_flows.first) {
fq_flow_add_tail(&q->old_flows, f);
} else {
fq_flow_set_detached(f);
q->inactive_flows++;
}
goto begin;
}
可见,流结构的移动为: new_flows --> old_flows --> detached。另外,以上已经看到,old_flows链表上的流在credit耗尽之后,还是会重新添加到old_flows链表的末尾。但是当old_flows链表上的流credit有值,而没有报文时,将流设置为detached状态。
以下代码由流结构的信用值中减去要发送的报文的长度值,如果没有开启pacing功能,结束处理,返回要发送的报文。
prefetch(&skb->end);
plen = qdisc_pkt_len(skb);
f->credit -= plen;
if (!q->rate_enable) goto out;
否则,进行限速处理,如果报文中没有给出发送时间(tstamp为零),速率限制将选取TC命令设定的最大速率flow_max_rate和套接口设置的sk_pacing_rate速率,两者之间的较小值。如果此速率小于TC命令设置的low_rate_threshold值,清空流结构的信用值,制造一定的延时。否则,这里修改限速处理中使用的报文长度,即如果plen小于quantum,使用quantum。如果流结构的信用值此时大于零,直接返回发送报文。
rate = q->flow_max_rate;
/* If EDT time was provided for this skb, we need to
* update f->time_next_packet only if this qdisc enforces a flow max rate.
*/
if (!skb->tstamp) {
if (skb->sk)
rate = min(skb->sk->sk_pacing_rate, rate);
if (rate <= q->low_rate_threshold) {
f->credit = 0;
} else {
plen = max(plen, q->quantum);
if (f->credit > 0)
goto out;
}
}
在速率值为合法值的情况下,计算发送报文需要的时长,如果此时长超过一秒钟,将其限定在一秒,并且记录下这个超长的报文。下一个报文的发送时刻time_next_packet,等于当前时间,加上修正之后的当前报文发送时长。
if (rate != ~0UL) {
u64 len = (u64)plen * NSEC_PER_SEC;
if (likely(rate))
len = div64_ul(len, rate);
/* Since socket rate can change later, clamp the delay to 1 second.
* Really, providers of too big packets should be fixed !
*/
if (unlikely(len > NSEC_PER_SEC)) {
len = NSEC_PER_SEC;
q->stat_pkts_too_long++;
}
/* Account for schedule/timers drifts.
* f->time_next_packet was set when prior packet was sent,
* and current time (@now) can be too late by tens of us.
*/
if (f->time_next_packet)
len -= min(len/2, now - f->time_next_packet);
f->time_next_packet = now + len;
}
out:
qdisc_bstats_update(sch, skb);
return skb;
内核版本 5.0