当前位置: 首页 > 工具软件 > fq > 使用案例 >

FQ队列throttled流管理

诸葛利
2023-12-01

FQ队列中对发送时间未到的流结构单独存放在delayed红黑树中,这样的流结构即不在new_flow链表,也不在old_flow链表,其next指针执行一个特定的值:throttled。内核函数通过检查流结构的next指针,来判断流是否处于throttled状态。

new_flow链表中的流结构的优先级高于old_flows中的流。

/* special value to mark a detached flow (not on old/new list) */
static struct fq_flow detached, throttled;

static bool fq_flow_is_throttled(const struct fq_flow *f)
{
    return f->next == &throttled;
}

当流结构中的报文到达发送时间之后,函数fq_flow_unset_throttled将其由红黑树中删除,并将其添加到old_flows链表中。函数fq_flow_add_tail会更新流结构的next指针。

static void fq_flow_unset_throttled(struct fq_sched_data *q, struct fq_flow *f)
{
    rb_erase(&f->rate_node, &q->delayed);
    q->throttled_flows--;
    fq_flow_add_tail(&q->old_flows, f);
}

1. 流分类时退出throttled

如果新接收的报文所属的套接口与FQ中对应的套接口相等,但是两者的hash值不同,表明内核的套接口经过了重新分配(reallocated),此时,如果流处于throttled状态,将其退出throttled,并且设置time_next_packet为零0,重新开始发送此流结构的报文。

static struct fq_flow *fq_classify(struct sk_buff *skb, struct fq_sched_data *q)
{
    ...
    p = &root->rb_node;
    parent = NULL;
    while (*p) {
        parent = *p;

        f = rb_entry(parent, struct fq_flow, fq_node);
        if (f->sk == sk) {
            /* socket might have been reallocated, so check if its sk_hash is the same.
             * It not, we need to refill credit with initial quantum
             */
            if (unlikely(skb->sk && f->socket_hash != sk->sk_hash)) {
                f->credit = q->initial_quantum;
                f->socket_hash = sk->sk_hash;
                if (fq_flow_is_throttled(f))
                    fq_flow_unset_throttled(q, f);
                f->time_next_packet = 0ULL;
            }
            return f;

2. 出队列中throttled流处理

在fq_dequeue中,调用fq_check_throttled检查delayed树中是否有可发送报文的流结构。变量time_next_delayed_flow大于当前时刻,表明整个delayed树中没有到期可发送报文。否则,遍历delayed树,如果当前遍历的流结构的应发送时间戳小于当前时刻,将其退出throttled状态,否则,更新time_next_delayed_flow记录的delayed树中最早应发送报文的时间戳,并退出遍历。

另外,统计值unthrottle_latency_ns使用EWMA算法生成,最近一次采样的占比为1/8。

static void fq_check_throttled(struct fq_sched_data *q, u64 now)
{
    unsigned long sample;
    struct rb_node *p;

    if (q->time_next_delayed_flow > now) return;

    /* Update unthrottle latency EWMA.
     * This is cheap and can help diagnosing timer/latency problems.
     */
    sample = (unsigned long)(now - q->time_next_delayed_flow);
    q->unthrottle_latency_ns -= q->unthrottle_latency_ns >> 3;
    q->unthrottle_latency_ns += sample >> 3;

    q->time_next_delayed_flow = ~0ULL;
    while ((p = rb_first(&q->delayed)) != NULL) {
        struct fq_flow *f = rb_entry(p, struct fq_flow, rate_node);

        if (f->time_next_packet > now) {
            q->time_next_delayed_flow = f->time_next_packet;
            break;
        }
        fq_flow_unset_throttled(q, f);

在每次fq_dequeue执行时,首先检查fq中throttled状态的流结构,由以上函数fq_check_throttled实现。接下来,如果new_flows和old_flows都为空,并且delayed树中有待发送的流结构,此时启动watchdog定时器。

static struct sk_buff *fq_dequeue(struct Qdisc *sch)
{
    struct fq_sched_data *q = qdisc_priv(sch);
    struct fq_flow_head *head;
    struct fq_flow *f;

    if (!sch->q.qlen) return NULL;

    skb = fq_dequeue_head(sch, &q->internal);
    if (skb) goto out;

    now = ktime_get_ns();
    fq_check_throttled(q, now);
begin:
    head = &q->new_flows;
    if (!head->first) {
        head = &q->old_flows;
        if (!head->first) {
            if (q->time_next_delayed_flow != ~0ULL)
                qdisc_watchdog_schedule_ns(&q->watchdog,
                               q->time_next_delayed_flow);
            return NULL;
        }
    }
    f = head->first;

    if (f->credit <= 0) {
        f->credit += q->quantum;
        head->first = f->next;
        fq_flow_add_tail(&q->old_flows, f);
        goto begin;
    }

对于当前处理的流结构,如果其credit有值,但是报文的下一次发送时间还没有到,将此流结构由链表(new或者old链表)中移除,并将此流结构设置为节流状态(throttled)。链表(new或者old)的头部指向下一个流结构。

    skb = f->head;
    if (skb) {
        u64 time_next_packet = max_t(u64, ktime_to_ns(skb->tstamp),
                         f->time_next_packet);

        if (now < time_next_packet) {
            head->first = f->next;
            f->time_next_packet = time_next_packet;
            fq_flow_set_throttled(q, f);
            goto begin;
        }

所有throttled状态的流结构链接在delayed表示的红黑树中,以下一个报文发送时间作为键值。由于流结构的next指针还是指向new_flow或者old_flow链表中的元素,这里将next执行全局静态变量throttled,用于表示此流结构的throttled状态,函数fq_flow_is_throttled即通过判断流结构的next决定其是否处于throttled状态。

变量time_next_delayed_flow保存了整个delayed红黑树中,最早要发送的报文的发送时间。

static void fq_flow_set_throttled(struct fq_sched_data *q, struct fq_flow *f)
{
    struct rb_node **p = &q->delayed.rb_node, *parent = NULL;

    while (*p) {
        struct fq_flow *aux;

        parent = *p;
        aux = rb_entry(parent, struct fq_flow, rate_node);
        if (f->time_next_packet >= aux->time_next_packet)
            p = &parent->rb_right;
        else
            p = &parent->rb_left;
    }
    rb_link_node(&f->rate_node, parent, p);
    rb_insert_color(&f->rate_node, &q->delayed);
    q->throttled_flows++;
    q->stat_throttled++;

    f->next = &throttled;
    if (q->time_next_delayed_flow > f->time_next_packet)
        q->time_next_delayed_flow = f->time_next_packet;
}

内核版本 5.0

 类似资料: