目录

线程初始化

流管理逻辑处理函数

主循环

获取超时的流

处理超时的流

其他超时

线程退出


流管理线程的创建与注册的slot 和 TmModule 对应的关系参见我的之前的一遍文章中创建非工作线程的子线程部分

suricata 各个线程干的事情 -- 主线程_xuwaiwai的博客-CSDN博客suricata 各个线程的作用 -- 主线程https://blog.csdn.net/xuwaiwai/article/details/120086508?spm=1001.2014.3001.5501

线程函数:tv->tm_func()对应的函数,TmThreadsManagement。

线程初始化

设置线程名字和绑定cpu。

注册的模块的初始化,管理线程只注册了一个slot(即 FlowManager 模块),

初始化函数为 s->SlotThreadInit 在此对应为

static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)

函数中执行:

static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)
{FlowManagerThreadData *ftd = SCCalloc(1, sizeof(FlowManagerThreadData));if (ftd == NULL)return TM_ECODE_FAILED;ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);SCLogDebug("flow manager instance %u", ftd->instance);/* set the min and max value used for hash row walking* each thread has it's own section of the flow hash */uint32_t range = flow_config.hash_size / flowmgr_number;if (ftd->instance == 0)ftd->max = range;else if ((ftd->instance + 1) == flowmgr_number) {ftd->min = (range * ftd->instance) + 1;ftd->max = flow_config.hash_size;} else {ftd->min = (range * ftd->instance) + 1;ftd->max = (range * (ftd->instance + 1));}BUG_ON(ftd->min > flow_config.hash_size || ftd->max > flow_config.hash_size);SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);/* pass thread data back to caller */*data = ftd;FlowCountersInit(t, &ftd->cnt);PacketPoolInit();return TM_ECODE_OK;
}

可能会有多个管理线程,所以将flow_config.hash_size 根据管理线程数等分,并根据每等份设置ftd->min 和 ftd->max,相当于把 FlowBucket *flow_hash 这个hash桶数组等分,这样管理线程循环中根据这个区间去遍历 FlowBucket *flow_hash 数组中各个区间中流。

初始化当前管理线程的packetpool。

状态统计数据初始化。

设置tv的flag为THV_INIT_DONE,以便主线程将此 tv->flag 的暂停标记去除。

流管理逻辑处理函数

r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));

由于slot对应的是FlowManager 模块,则此函数为

static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)

此函数主要作用是管理流表并使流超时和流紧急时间处理,处理的数据来源是各个FlowBucket中的 Flow *evicted 和 Flow *head,主要在线程循环中执行。

流超时处理:

  • 在紧急模式下,对哈希表进行完整遍历
  • 非紧急模式下,只扫描部分哈希表,只是进一步缩小遍历的范围

其都最终在 static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, struct timeval *ts, const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters)函数中检测超时,在FlowBucket *flow_hash中 检测此区间【uint32_t hash_min, const uint32_t hash_max】的流。

主循环

获取超时的流

一上来就搞了一个看不懂的东西。先是将上面的检测区间按照每个32或64分组(操作系统位数),这里就认为是64位。

#if __WORDSIZE==64
#define BITS 64
#define TYPE uint64_t
#else
#define BITS 32
#define TYPE uint32_t
#endif

1、先设置一个 uint64_t check_bits 的位图,每1位表示一个bucket是否有超时的flow,方法是每次循环64次检查t每个bucket的next_ts与最新时间,有超时的话在check_bits中对对应的第i为设为1。
2、循环完成,再循环这个整数,检查每一位同时检查next_ts,都符合超时条件则进行回收。

    for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) {TYPE check_bits = 0;const uint32_t check = MIN(BITS, (hash_max - idx));for (uint32_t i = 0; i < check; i++) {FlowBucket *fb = &flow_hash[idx+i];check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT(fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= (int32_t)ts->tv_sec) << (TYPE)i;}if (check_bits == 0)continue;for (uint32_t i = 0; i < check; i++) {FlowBucket *fb = &flow_hash[idx+i];if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= (int32_t)ts->tv_sec) {if (FMTryLockBucket(fb) == 0) {Flow *evicted = NULL;if (fb->evicted != NULL || fb->head != NULL) {/* if evicted is set, we only process that list right now.* Since its set we've had traffic that touched this row* very recently, and there is a good chance more of it will* come in in the near future. So unlock the row asap and leave* the possible eviction of flows to the packet lookup path. */if (fb->evicted != NULL) {/* transfer out of bucket so we can do additional work outside* of the bucket lock */evicted = fb->evicted;fb->evicted = NULL;} else if (fb->head != NULL) {int32_t next_ts = 0;FlowManagerHashRowTimeout(td,fb->head, ts, emergency, counters, &next_ts);if (SC_ATOMIC_GET(fb->next_ts) != next_ts)SC_ATOMIC_SET(fb->next_ts, next_ts);}if (fb->evicted == NULL && fb->head == NULL) {SC_ATOMIC_SET(fb->next_ts, INT_MAX);} else if (fb->evicted != NULL && fb->head == NULL) {SC_ATOMIC_SET(fb->next_ts, 0);}} else {SC_ATOMIC_SET(fb->next_ts, INT_MAX);rows_empty++;}FBLOCK_UNLOCK(fb);/* processed evicted list */if (evicted) {FlowManagerHashRowClearEvictedList(td, evicted, ts, counters);}} else {rows_busy++;}} else {rows_skipped++;}}if (td->aside_queue.len) {cnt += ProcessAsideQueue(td, counters);}}
  • 如果fb->evicted 不为NULL,则里面所有的流都要回收,暂时保存在此函数的 Flow *evicted的临时队列中。再在后续的流程中调用 static void FlowManagerHashRowClearEvictedList (FlowManagerTimeoutThread *td, Flow *f, struct timeval *ts, FlowTimeoutCounters *counters)函数处理,将出evicted 中的所有flow 放入td->aside_queue 这个队列。

    static void FlowManagerHashRowClearEvictedList(FlowManagerTimeoutThread *td,Flow *f, struct timeval *ts, FlowTimeoutCounters *counters)
    {do {FLOWLOCK_WRLOCK(f);Flow *next_flow = f->next;f->next = NULL;f->fb = NULL;DEBUG_VALIDATE_BUG_ON(f->use_cnt > 0 || !FlowBypassedTimeout(f, ts, counters));FlowQueuePrivateAppendFlow(&td->aside_queue, f);/* flow is still locked in the queue */f = next_flow;} while (f != NULL);
    }
    
  • 如果 fb->head 不为NULL,调用FlowManagerHashRowTimeout 进行超时处理, 超时的流放到了 td->aside_queue 这个队列;
    static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td,Flow *f, struct timeval *ts,int emergency, FlowTimeoutCounters *counters, int32_t *next_ts)
    {uint32_t checked = 0;Flow *prev_f = NULL;do {checked++;/* check flow timeout based on lastts and state. Both can be* accessed w/o Flow lock as we do have the hash row lock (so flow* can't disappear) and flow_state is atomic. lastts can only* be modified when we have both the flow and hash row lock *//* timeout logic goes here */if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == 0) {counters->flows_notimeout++;prev_f = f;f = f->next;continue;}FMFlowLock(f); //FLOWLOCK_WRLOCK(f);Flow *next_flow = f->next;counters->flows_timeout++;/* never prune a flow that is used by a packet we* are currently processing in one of the threads */if (f->use_cnt > 0 || !FlowBypassedTimeout(f, ts, counters)) {FLOWLOCK_UNLOCK(f);prev_f = f;counters->flows_timeout_inuse++;f = f->next;continue;}RemoveFromHash(f, prev_f);FlowQueuePrivateAppendFlow(&td->aside_queue, f);/* flow is still locked in the queue */f = next_flow;} while (f != NULL);counters->flows_checked += checked;if (checked > counters->rows_maxlen)counters->rows_maxlen = checked;
    }

处理超时的流

处理上面获得的 td->aside_queue 中的超时的流,调用static uint32_t ProcessAsideQueue (FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters) 函数:

static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters)
{FlowQueuePrivate recycle = { NULL, NULL, 0 };counters->flows_aside += td->aside_queue.len;uint32_t cnt = 0;Flow *f;while ((f = FlowQueuePrivateGetFromTop(&td->aside_queue)) != NULL) {/* flow is still locked */if (f->proto == IPPROTO_TCP &&!(f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) &&
#ifdef CAPTURE_OFFLOADf->flow_state != FLOW_STATE_CAPTURE_BYPASSED &&
#endiff->flow_state != FLOW_STATE_LOCAL_BYPASSED &&FlowForceReassemblyNeedReassembly(f) == 1){FlowForceReassemblyForFlow(f);/* flow ownership is passed to the worker thread *//* flow remains locked */counters->flows_aside_needs_work++;continue;}FLOWLOCK_UNLOCK(f);FlowQueuePrivateAppendFlow(&recycle, f);if (recycle.len == 100) {FlowQueueAppendPrivate(&flow_recycle_q, &recycle);}cnt++;}if (recycle.len) {FlowQueueAppendPrivate(&flow_recycle_q, &recycle);}return cnt;
}

主要是放入td->aside_queue队列的超时flow放入回收线程的回收队列flow_recycle_q,每100个放一次,等待recycle线程处理。

如果这里的超时流需要重组,则是又打回原来处理它的线程,由原线程的FlowWorker中的函数FlowWorkerProcessInjectedFlows 完成重组回收,FlowForceReassemblyForFlow函数把flow放入原线程的tv->flow_queue队列中,放入队列后,这里有两种处理方式:

  1. outqh_name是 "flow" (TMQH_FLOW)或者 "simple"(TMQH_SIMPLE) 时:对应的工作线程首先从tv->flow_queue这个队列中获取数据并处理,如果没有将等待管理线程发送信号激活。  管理线程就在此给flow所属线程发送信号(SCCondSignal(&tv->inq->pq->cond_q);),唤醒那个线程处理这个flow的重组回收。如果tv->flow_queue有多余的数据,则对饮的子线程直接获取处理。这种方式不深究,{ RUNMODE_AFP_DEV + "workers" }使用下面的第二种方式。

    /* same as 'simple' */
    Packet *TmqhInputFlow(ThreadVars *tv)
    {PacketQueue *q = tv->inq->pq;StatsSyncCountersIfSignalled(tv);SCMutexLock(&q->mutex_q);if (q->len == 0) {/* if we have no packets in queue, wait... */SCCondWait(&q->cond_q, &q->mutex_q);}if (q->len > 0) {Packet *p = PacketDequeue(q);SCMutexUnlock(&q->mutex_q);return p;} else {/* return NULL if we have no pkt. Should only happen on signals. */SCMutexUnlock(&q->mutex_q);return NULL;}
    }
  2. outqh_name是 "packetpool" (TMQH_PACKETPOOL)时:{ RUNMODE_AFP_DEV + "workers" } ,放入tv->flow_queue队列后不在做任何处理。这个tv线程 在FlowWorker 函数中执行FlowWorkerProcessInjectedFlows时自然会处理。

其他超时

        if (ftd->instance == 0 &&(other_last_sec == 0 || other_last_sec < (uint32_t)ts.tv_sec)) {DefragTimeoutHash(&ts);//uint32_t hosts_pruned =HostTimeoutHash(&ts);IPPairTimeoutHash(&ts);other_last_sec = (uint32_t)ts.tv_sec;}

线程退出

执行线程退出函数,然后退出。


凡是过往,即为序章

suricata 各个线程干的事情 -- FlowManagerThread相关推荐

  1. suricata 各个线程干的事情 -- WorkerThread

    目录 线程初始化 packet pool线程池初始化 主要模块的初始化: 线程进入循环 捕获循环 每个模块的数据处理 DecodeAFP FlowWorker RespondReject 拓展内容 W ...

  2. suricata 各个线程干的事情 -- 主线程

    目录 线程类型 0. 主线程 注册runmodes 根据配置初始化 注册应用协议检测器 注册应用层协议解析器 加载规则关键字 tag初始化 几种存储结构的初始化与注册 注册所有模块的函数 检测引擎初始 ...

  3. Windows 能干而 Linux 干不了的事情,那就是不需要干的事情(转贴)

    完全用 GNU/Linux 工作理解 GNU/Linux "UNIX 是简单的,你不需要成为天才也能理解这种简单." 由于GNU/Linux这个词太长,下面如果没有特别指明,&qu ...

  4. 新电脑到手要干的事情

    记录了本人一些常用的软件的下载地址和要干的事情 一些软件设置 Windows Linux 一些软件设置 Vscode 增加 80 vertical rulers, 以及字体: "editor ...

  5. 如何学会垂直搜索,让想干的事情能干成

    如何学会垂直搜索,让想干的事情能干成 很多人搜索的时候,会想着用一个通用搜索引擎解决所有问题,大多数人最常用的应该就是百度了.但是事实上百度能搜到的信息虽然比较全面,但搜索的结果往往会面临信息量过大. ...

  6. 外贸版ChatGPT,每天节省5小时开发客户时间,可以用来干这些事情

    最近很多外贸朋友跟我说订单下降了很多,外贸市场行情不好,客户越来越难谈,获客成本也越来越高了等等 但是我还是会经常提醒:有尝试过升级转型吗,比如启用高效开发客户方法.提高跟进技巧等等 最近火爆出圈的C ...

  7. 有些星座天生不适合干有些事情,如果一定要为之恐怕就会落到饿死的地步了!你知道自己这个星座有哪些事情是容易让自己饿死的吗?

    白羊座--警方谈判专家 羊儿对生命有着极浓的热情,最看不得别人动不动就寻死觅活,即使作为警方谈判专家,首要职责是让谈判对象放弃自杀念头,羊儿也很难抑止住自己激动的情绪.估计前两句还像模像样,要是企图自 ...

  8. 工科生安装Ubuntu18.04后干的事情

    目录 一.安装拼音输入法 1.安装fcitx 2.Language support调整为fcitx 3.安装输入法 4.重启电脑 5.或进入fictx config tool input method ...

  9. 学习了爬虫之后总想干些事情,这是一些常用的API接口,希望对你有用

    下面列举了100多个国内常用API接口,并按照 笔记.出行.词典.电商.地图.电影.即时通讯.开发者网站.快递查询.旅游.社交.视频.天气.团队协作.图片与图像处理.外卖.消息推送.音乐.云.语义识别 ...

最新文章

  1. linux驱动模块编译入内核,Linux内核驱动模块编译
  2. LeetCode实战:2的幂
  3. LuaLuaMemorySnapshotDump-master
  4. python有道翻译接口-Python通过调用有道翻译api实现翻译功能示例
  5. Windows Embedded CE 6.0开发初体验(二)CE开发环境 收藏
  6. 计算机应用基础(本)实训任务1,计算机应用基础(本)实训任务1-2.pdf
  7. Intel:Larrabee浮点运算能力2TFlops
  8. tab键怎么关闭_C/C++应用无障碍化如何支持Tab键浏览
  9. 《微课实战:Camtasia Studio入门精要》——第2章 录制视频 2.1 录制视频基本常识...
  10. mysql 无缓冲的查询_MySQL缓冲和无缓冲查询对比
  11. Dialog高仿Toast实现
  12. wordpress入门基本操作,网站安全防护及常用插件(建站必看教程)
  13. 和秋叶一起学Excel 阿里云盘
  14. 软件评测师考试内容纲要
  15. 计算机水平cet2是什么等级,英语cet2等级考试试题
  16. 只用一个div制作太极图
  17. vue 富文本编辑器提取纯文字
  18. c语言怎么字体加粗,html 字体加粗
  19. 基于Jsp的手机应用商店的设计与实现mysql
  20. c语言 字符金字塔问题

热门文章

  1. Linux网络编程小项目sqlite,嵌入式数据库sqlite
  2. Win10 端口被占用解决方案
  3. C++匿名函数lambda详解
  4. linux如何ubuntu解压tar.gz格式的文件
  5. Word文档怎么转换为PDF格式?介绍两种方式
  6. python神经网络编程 豆瓣,python训练神经网络模型
  7. http协议的状态码(常见网页错误代码)
  8. Google Magenta简介, 安装,使用简例
  9. linux grep 多个文件,Linux grep 多个关键字-Fun言
  10. 企业搜索领域专业名词翻译