在介绍FQ入队列操作之前,先看一下流量的识别部分。

1 流量识别

对于一些协议报文,比如HSR(High-availability Seamless Redundancy)、IGMP和HDLC等,其将priority字段设置为了TC_PRIO_CONTROL,对于此类报文,FQ使用一个内部特定的流处理(q->internal)。

static struct fq_flow *fq_classify(struct sk_buff *skb, struct fq_sched_data *q)
{struct rb_node **p, *parent;struct sock *sk = skb->sk;struct rb_root *root;struct fq_flow *f;/* warning: no starvation prevention... */if (unlikely((skb->priority & TC_PRIO_MAX) == TC_PRIO_CONTROL))return &q->internal;

对于SYNACK报文,其关联在一个状态为TCP_NEW_SYN_RECV的请求套接口(request socket)上,或者,在SYNCOOKIE模式下,其关联在监听套接口(listen socket)上。前一种情况下,请求套接口并不是完整的套接口,没有sk_pacing_rate成员字段;对于后一种情况,如果监听套接口没有设置SO_MAX_PACING_RATE选项,也不会对报文进行限速。

在这两种情况下,报文都还不是某个流的一部分,内核中按照orphaned报文进行处理。以下将报文的哈希值与上命令行设置的orphan_mask,并且在最末尾设置1,这样得到的地址将不同于正常的套接口(sock)地址,因为正常套接口地址是按照word对其的,最后两个bit位为零。

注意这里得到的sk并不是真正的套接口,不能当做套接口使用。

    /* SYNACK messages are attached to a TCP_NEW_SYN_RECV request socket* or a listener (SYNCOOKIE mode)* 1) request sockets are not full blown,*    they do not contain sk_pacing_rate* 2) They are not part of a 'flow' yet* 3) We do not want to rate limit them (eg SYNFLOOD attack),*    especially if the listener set SO_MAX_PACING_RATE* 4) We pretend they are orphaned*/if (!sk || sk_listener(sk)) {unsigned long hash = skb_get_hash(skb) & q->orphan_mask;/* By forcing low order bit to 1, we make sure to not* collide with a local flow (socket pointers are word aligned)*/sk = (struct sock *)((hash << 1) | 1UL);skb_orphan(skb);}

以下根据套接口(sk)地址做哈希运算,定位到bucket,即红黑树的根root。如果流的数量大于等于2倍的bucket数量,并且不活动的流数量超过总数量的一半,进行回收操作。

    root = &q->fq_root[hash_ptr(sk, q->fq_trees_log)];if (q->flows >= (2U << q->fq_trees_log) &&q->inactive_flows > q->flows/2)fq_gc(q, root, sk);

接下来,遍历红黑树,如果发现树中已经有与以上键值sk相同的流结构,返回此结构。但是在返回之前,需要检查报文是否具有真实的套接口结构sk,并且sk中的哈希成员sk_hash与流结构中保存的套接口哈希socket_hash是否相同?如果hash不同,表示套接口结构可能经过了重新分配,流结构中保存的sk值已经不是原来的套接口了,此时,重新为流赋予初始的initial_quantum,更新哈希值。并且,如果此流之前处于限速throttled状态,解除此状态。

    p = &root->rb_node;parent = NULL;while (*p) {parent = *p;f = rb_entry(parent, struct fq_flow, fq_node);if (f->sk == sk) {/* socket might have been reallocated, so check if its sk_hash is the same.* It not, we need to refill credit with initial quantum*/if (unlikely(skb->sk && f->socket_hash != sk->sk_hash)) {f->credit = q->initial_quantum;f->socket_hash = sk->sk_hash;if (fq_flow_is_throttled(f))fq_flow_unset_throttled(q, f);f->time_next_packet = 0ULL;}return f;}if (f->sk > sk)p = &parent->rb_right;elsep = &parent->rb_left;}

在以上没有找到对应的流结构之后,需要重新创建。如果报文skb具有真实的套接口,使用套接口结构中的哈希值sk_hash来初始化流结构中的套接口哈希值socket_hash,对于新的流,赋予初始的quantum值,即initial_quantum。此新分配的流,状态为detached,并且标记为不活动流,递增inactive_flows。

将新流结构插入到对应的fq_root[]红黑树中。在下一节的fq_enqueue函数中,将会看到,条件合适的话,新创建的流被链接到new_flows链表中。

    f = kmem_cache_zalloc(fq_flow_cachep, GFP_ATOMIC | __GFP_NOWARN);if (unlikely(!f)) {q->stat_allocation_errors++;return &q->internal;}fq_flow_set_detached(f);f->sk = sk;if (skb->sk)f->socket_hash = sk->sk_hash;f->credit = q->initial_quantum;rb_link_node(&f->fq_node, parent, p);rb_insert_color(&f->fq_node, root);q->flows++;q->inactive_flows++;return f;

2 FQ入队列处理

FQ入队函数fq_enqueue如下,当队列长度qlen大于设定的限值时,丢弃报文,不进行入队处理。

static int fq_enqueue(struct sk_buff *skb, struct Qdisc *sch, struct sk_buff **to_free)
{struct fq_sched_data *q = qdisc_priv(sch);struct fq_flow *f;if (unlikely(sch->q.qlen >= sch->limit))return qdisc_drop(skb, sch, to_free);

调用上一节函数fq_classify获取相应的流结构(fq_flow),如果此流结构中的队列长度超过了TC命令的限定值(flow_plimit),对报文进行丢弃处理。否则,递增流队列计数(f->qlen++)。

    f = fq_classify(skb, q);if (unlikely(f->qlen >= q->flow_plimit && f != &q->internal)) {q->stat_flows_plimit++;return qdisc_drop(skb, sch, to_free);}f->qlen++;qdisc_qstats_backlog_inc(sch, skb);

如果流处于detached状态,比如刚刚创建等,将其添加到new_flows链表的末尾。如果此流的时间戳age,距离当下已经超过refill_delay时长,重新为其增加quantum值,但是其信用不超过其当前的信用credit。这种情况是针对老的流,即old_flows链表中的流由于没有报文,被设置为detached状态,其可能还有未耗尽的credit值。

另外,对于真实的套接口,如果开启了pacing功能(rate_enable),为套接口设置SK_PACING_FQ标志。由于将流添加到了new_flows链表中,递减不活动流的数量。

    if (fq_flow_is_detached(f)) {struct sock *sk = skb->sk;fq_flow_add_tail(&q->new_flows, f);if (time_after(jiffies, f->age + q->flow_refill_delay))f->credit = max_t(u32, f->credit, q->quantum);if (sk && q->rate_enable) {if (unlikely(smp_load_acquire(&sk->sk_pacing_status) != SK_PACING_FQ))smp_store_release(&sk->sk_pacing_status, SK_PACING_FQ);}q->inactive_flows--;}

以下,将报文添加到流队列中,递增相应的计数。

    /* Note: this overwrites f->age */flow_queue_add(f, skb);if (unlikely(f == &q->internal)) {q->stat_internal_packets++;}sch->q.qlen++;return NET_XMIT_SUCCESS;

3 FQ出队列

首先,处理高优先级的内部流结构(internal),如果其中没有可处理报文,进行以下处理。

static struct sk_buff *fq_dequeue(struct Qdisc *sch)
{struct fq_sched_data *q = qdisc_priv(sch);struct fq_flow_head *head;struct fq_flow *f;if (!sch->q.qlen)return NULL;skb = fq_dequeue_head(sch, &q->internal);if (skb)goto out;

以下检查throttled状态的流结构,查看是否有可解除throttled状态的流。

    now = ktime_get_ns();fq_check_throttled(q, now);

之后,如果new_flows和old_flows两个链表都为空,判断是否有被延迟的流,启动watchdog定时器,处理下一个到达发送时间的流,下一个发送时间为time_next_delayed_flow。定时处理函数为qdisc_watchdog。

begin:head = &q->new_flows;if (!head->first) {head = &q->old_flows;if (!head->first) {if (q->time_next_delayed_flow != ~0ULL)qdisc_watchdog_schedule_ns(&q->watchdog, q->time_next_delayed_flow);return NULL;}}

如果new_flows或者old_flows链表有一个不为空,取出首部的流结构,如果其信用credit已经用完,重新给与其quantum数量的信用,但是将其添加到old_flows链表的末尾,即下一次轮询才会处理。调回到开始begin处,处理链表中的下一个流结构。

    f = head->first;if (f->credit <= 0) {f->credit += q->quantum;head->first = f->next;fq_flow_add_tail(&q->old_flows, f);goto begin;}

找到一个信用值大于零的流结构,取出其中头部的报文,由报文的时间戳和流结构中的下一个报文发送时间戳中的较大值来确定报文的发送时间,如果还没有到达其发送时间,不发送此报文。否则,如果当前时间已经超过发送时间,并且超过了ce_threshold定义的时长,表明发生了拥塞,导致报文未能及时发送,设置ECN标志。

    skb = f->head;if (skb) {u64 time_next_packet = max_t(u64, ktime_to_ns(skb->tstamp),f->time_next_packet);if (now < time_next_packet) {head->first = f->next;f->time_next_packet = time_next_packet;fq_flow_set_throttled(q, f);goto begin;}if (time_next_packet &&(s64)(now - time_next_packet - q->ce_threshold) > 0) {INET_ECN_set_ce(skb);q->stat_ce_mark++;}}

此时,正式将报文由流结构队列头部取出,如果skb为空,表明流队列已空,需要处理下一个流结构。此时,如果空的流结构处于new_flows链表上,并且old_flows链表不为空,将此空的流结构添加到old_flows链表尾部。

否则,如果空的流结构处在old_flows链表,或者old_flows链表为空,直接将空的流结构设置为detached状态,将非活动流的数量递增一。返回开头处理下一个流结构。

    skb = fq_dequeue_head(sch, f);if (!skb) {head->first = f->next;/* force a pass through old_flows to prevent starvation */if ((head == &q->new_flows) && q->old_flows.first) {fq_flow_add_tail(&q->old_flows, f);} else {fq_flow_set_detached(f);q->inactive_flows++;}goto begin;}

可见,流结构的移动为: new_flows --> old_flows --> detached。另外,以上已经看到,old_flows链表上的流在credit耗尽之后,还是会重新添加到old_flows链表的末尾。但是当old_flows链表上的流credit有值,而没有报文时,将流设置为detached状态。

以下代码由流结构的信用值中减去要发送的报文的长度值,如果没有开启pacing功能,结束处理,返回要发送的报文。

    prefetch(&skb->end);plen = qdisc_pkt_len(skb);f->credit -= plen;if (!q->rate_enable)  goto out;

否则,进行限速处理,如果报文中没有给出发送时间(tstamp为零),速率限制将选取TC命令设定的最大速率flow_max_rate和套接口设置的sk_pacing_rate速率,两者之间的较小值。如果此速率小于TC命令设置的low_rate_threshold值,清空流结构的信用值,制造一定的延时。否则,这里修改限速处理中使用的报文长度,即如果plen小于quantum,使用quantum。如果流结构的信用值此时大于零,直接返回发送报文。

    rate = q->flow_max_rate;/* If EDT time was provided for this skb, we need to* update f->time_next_packet only if this qdisc enforces a flow max rate.*/if (!skb->tstamp) {if (skb->sk)rate = min(skb->sk->sk_pacing_rate, rate);if (rate <= q->low_rate_threshold) {f->credit = 0;} else {plen = max(plen, q->quantum);if (f->credit > 0)goto out;}}

在速率值为合法值的情况下,计算发送报文需要的时长,如果此时长超过一秒钟,将其限定在一秒,并且记录下这个超长的报文。下一个报文的发送时刻time_next_packet,等于当前时间,加上修正之后的当前报文发送时长。

    if (rate != ~0UL) {u64 len = (u64)plen * NSEC_PER_SEC;if (likely(rate))len = div64_ul(len, rate);/* Since socket rate can change later, clamp the delay to 1 second.* Really, providers of too big packets should be fixed !*/if (unlikely(len > NSEC_PER_SEC)) {len = NSEC_PER_SEC;q->stat_pkts_too_long++;}/* Account for schedule/timers drifts.* f->time_next_packet was set when prior packet was sent,* and current time (@now) can be too late by tens of us.*/if (f->time_next_packet)len -= min(len/2, now - f->time_next_packet);f->time_next_packet = now + len;}
out:qdisc_bstats_update(sch, skb);return skb;

内核版本 5.0

Linux FQ队列操作相关推荐

  1. Linux FQ 队列实现原理浅析

    2018/04/21 又到了周六早上,依然是后半夜属于自己的时间.起床,读了几篇古文,准备梳理一下Linux FQ和RPS方面的原理,正好昨天也有人问到我.本文先从FQ开始. enqueue入队过程 ...

  2. linux消息队列操作

    对消息队列的操作无非有以下三种类型: 1. 打开或创建消息队列 消息队列的内核持续性要求每一个消息队列都在系统范围内相应唯一的键值,所以,要获得一个消息队列的描写叙述字,仅仅需提供该消息队列的键值就可 ...

  3. linux fq队列,理解fq_codel之概述

    fq_codel这个新的机制进入内核已经有一段时间了,主要是在Linux的Wi-Fi子系统中使用.但是目前好像并没有像样的中文的介绍.正好之前参与开源组织合作开发Airtime fairness(AT ...

  4. linux fq队列,QOS各种队列详解(FIFO,FQ,CBWFQ,PQ).doc

    QOS各种队列详解(FIFO,FQ,CBWFQ,PQ) QOS各种队列详解(FIFO,FQ,CBWFQ,PQ) 对于拥塞管理,一般采用队列技术,使用一个队列算法对流量进行分类,之后用某种优先级别算法将 ...

  5. PHP下操作Linux消息队列完成进程间通信的方法

    2019独角兽企业重金招聘Python工程师标准>>> 来源:http://www.jb51.net/article/24353.htm 关于Linux系统进程通信的概念及实现可查看 ...

  6. Linux公平队列FQ接口实现

    用户层面的tc配置命令如下: $ sudo tc qdisc add dev ens38 root fq $ $ sudo tc -s qdisc show dev ens38 qdisc fq 80 ...

  7. linux消息队列非亲缘,linux进程

    linux进程Tag内容描述: 1.linux消息队列进程通信 一.消息队列的基本概念消息队列(也叫做报文队列)是Unix系统V版本中3种进程间通信机制之一.另外两种是信号灯和共享内存.这些IPC机制 ...

  8. linux.调整收发队列,linux消息队列通信

    程序目的:学习linux消息队列通信 所用主要函数:msgget(),msgsnd(),msgrcv(),msgctl() 首先介绍每个函数的用法: (1)msgget 使用格式: #include ...

  9. linux消息队列总结

    1.消息队列简介 实现linux进程通信的方式有5种: --信号(Singal) --管道(Pipe) --消息队列(Message) --信号量(Semaphore) 每种进程通信方式实现方式和功能 ...

最新文章

  1. Linux 忘记登录密码?破解系统登陆密码
  2. PHP中魔术方法的用法
  3. 手机-字符串替换显示
  4. 人工智能免费公开课一网打尽!14个类别、230门课程,GitHub标星6000+
  5. 究竟什么样的简历才能拿到面试?
  6. 各类最新Asp .Net Core 项目和示例源码
  7. 在word中使用notepad++实现代码的语法高亮
  8. 谁天生就是干程序员的料?
  9. saltstack python3安装_如何在linux下升级python以及saltstack安装
  10. 【QA】哈工大张伟男:任务型对话系统
  11. 第五章平稳过程(1)
  12. 通过tomcat插件启动Maven工程
  13. 几款经典css框架下载
  14. 快逸报表为报表添加无数据的日期(二)
  15. 机器学习 | 数学基础
  16. 财会法规与职业道德【2】
  17. android ibeacon service,android – iBeacon后台扫描
  18. java-php-python-ssm学生学籍信息管理系统计算机毕业设计
  19. 惠普台式机EliteDesk TWR安装双系统
  20. mysql按经纬度排序_mysql根据经纬度查找排序

热门文章

  1. 人生少走弯路的十条忠告
  2. 时延、丢包、抖动是什么?
  3. SYN Flood攻击防范技术白皮书
  4. 面对疫情防控的多摄像头社交距离检测方案
  5. 【KDD2022教程】图算法公平性:方法与趋势
  6. 学习java第二周——面向对象、堆栈方法区、代码块、抽象类、接口和异常
  7. 锂电回收行业硫酸镍溶液除硅
  8. 一文带你深入浅出C语言数据
  9. 在网易实习是种什么体验?
  10. 北京市政交通一卡通系统的应用情况研究