本人原本从事C++的学习的,现在进行即时通讯的网络开发学习,涉及到了muduo库顾muduo库进行深入学习,muduo库是给予reactor模型的并发处理的网络库,其广泛的利用了回调函数的特性。

EventLoop的事件循环主要两个大部分:第一个部分,Poller监听socketfd、timerfd封装的Channel事件(网络事件、定时器事件),并执行对应IO事件的回调函数;第二个部分,在IO线程空闲时执行一些计算任务,充分利用线程资源。由于Poller是阻塞等待IO事件的,当有计算任务时需要被唤醒,使用eventfd的Channel的来实现。

针对EventLoop 的源码分析

  //该loop吃鱼的状态bool looping_; /* atomic */// 是否进行停止std::atomic<bool> quit_;bool eventHandling_; /* atomic */bool callingPendingFunctors_; /* atomic */int64_t iteration_;// 运行此线程的线程ID号const pid_t threadId_;Timestamp pollReturnTime_;std::unique_ptr<Poller> poller_;std::unique_ptr<TimerQueue> timerQueue_;int wakeupFd_;// unlike in TimerQueue, which is an internal class,// we don't expose Channel to client.std::unique_ptr<Channel> wakeupChannel_;boost::any context_;// scratch variables// 维护一个维护活跃的Channel对象ChannelList activeChannels_;// Channel* currentActiveChannel_;// 互斥锁mutable MutexLock mutex_;std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);

该对象的构造函数如下所示:

EventLoop::EventLoop(): looping_(false),               //该对象是否处于进行IO事件进行处理          quit_(false),                  //是否进行退出eventHandling_(false),         //是否正在处理IO事件callingPendingFunctors_(false),//是否正在进行计算任务iteration_(0),                 //处理的IO事件的次数                 threadId_(CurrentThread::tid()),                    //当前线程的tidpoller_(Poller::newDefaultPoller(this)),             //Poller对象的timerQueue_(new TimerQueue(this)),                   //定时器队列wakeupFd_(createEventfd()),                          //eventfd用于唤醒阻塞的pollwakeupChannel_(new Channel(this, wakeupFd_)),        //eventfd 封装的channel对象currentActiveChannel_(NULL)                          //临时变量挡墙的就绪IO事件的channel数组
{LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;if (t_loopInThisThread){LOG_FATAL << "Another EventLoop " << t_loopInThisThread<< " exists in this thread " << threadId_;}else{t_loopInThisThread = this;}//设置唤醒挡墙的Channel的可读事件的回调,并注册到Poller中wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));// we are always reading the wakeupfdwakeupChannel_->enableReading();
}

析构函数中,需要从显示地调用注销在Pollereventfd事件,并且关闭文件描述符。由于成员poller_和timerQueue_使用std::unique_ptr管理,不用手动处理资源释放的问题,在各自的析构函数中进行了资源释放处理。

EventLoop::~EventLoop()
{LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_<< " destructs in thread " << CurrentThread::tid();wakeupChannel_->disableAll(); wakeupChannel_->remove();::close(wakeupFd_);t_loopInThisThread = NULL;
}

进行事件的循环

函数EventLoop::loop()实现了事件循环流程,是EventLoop的核心:调用Poller->poll函数,返回就绪的有IO事件的channels,依次执行channels中的用户回调函数,接着处理IO线程中需要进行的额外计算任务。当没有IO事件时poll函数最多阻塞10s,会执行一次计算任务的处理,但是10s时间太长,因此使用eventfd来唤醒poll及时处理计算任务

void EventLoop::loop()
{assert(!looping_);// 判断是否为IO线程assertInLoopThread();looping_ = true;quit_ = false;  // FIXME: what if someone calls quit() before loop() ?LOG_TRACE << "EventLoop " << this << " start looping";while (!quit_){// 清空当前活跃空间activeChannels_.clear();// 获取最新活跃的Channel添加到活跃的channel表中pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);++iteration_;if (Logger::logLevel() <= Logger::TRACE){printActiveChannels();}// TODO sort channel by priority//IO事件处理的标志位eventHandling_ = true;// 对其中的活跃的IO事件进行 处理for (Channel* channel : activeChannels_){currentActiveChannel_ = channel;//执行某个事件的用户回调函数currentActiveChannel_->handleEvent(pollReturnTime_);}// currentActiveChannel_ = NULL;eventHandling_ = false;// 执行位于其中的一些计算任务(任务队列中依次执行)doPendingFunctors();}LOG_TRACE << "EventLoop " << this << " stop looping";looping_ = false;
}

计算任务的执行过程:

不是在临界区中一次调用Functor,而是把回调列表swap到局部变量functors中,这样减小了临界区的长度(不会阻塞其他线程调用queueInLoop(),降低锁竞争),也避免了死锁的发生(Functor中可能再调用queueInLoop()函数


void EventLoop::doPendingFunctors()
{//确地任务队列std::vector<Functor> functors;//标志执行计算任务callingPendingFunctors_ = true;{// 加锁,把回调任务队列swap到临时变量中;降低临界区长度MutexLockGuard lock(mutex_);functors.swap(pendingFunctors_);}//次执行队列中的用户回调任务for (const Functor& functor : functors){functor();}//设置的计算任务结束callingPendingFunctors_ = false;
}

添加计算任务到的IO下称中执行

主要是提升IO线程在空闲时的资源利用率,根据调用所处的线程划分为不同


void EventLoop::runInLoop(Functor cb)
{if (isInLoopThread()){cb();//如果是IO线程调用,则直接执行}else   //其他线程调用就放在的任务队列中之后再loop种植箱{queueInLoop(std::move(cb));}
}void EventLoop::queueInLoop(Functor cb)
{{// 加锁,放入到指向的任务队列中MutexLockGuard lock(mutex_);pendingFunctors_.push_back(std::move(cb));}// 根据执行是否进行唤醒if (!isInLoopThread() || callingPendingFunctors_){wakeup();}
}

如果在IO线程即当前EventLoop所在线程,可以选择runInLoop函数立即执行,也可以选择调用queueInLoop将任务放入队列中。在其他线程中,不论调用runInLoop还是queueInLoop都将先把用户的回调任务放入pendingFunctors_队列中,等待loop中处理。

由于IO线程平时阻塞在时间循环EventLoop::loop()中的poll函数上,为能让IO线程立刻执行用户回调,需要执行唤醒操作,有两种情况。第一种,是在其他线程调用,必须唤醒,否则阻塞直到超时才执行。第二种,在当前IO线程,但处于任务队列处理的过程中,Functor可能再调用queueInLoop,这种情况就必须执行wakeup,否则新加入到pendingFunctors_的cb就不能及时执行。

唤醒IO线程

在构造函数中注册唤醒channel的eventfd的可读事件、事件触发的回调函数。当调用wakeup()时,对eventfd执行write操作,那么poller_->poll将查询到当前eventfd上的可读事件,从而唤醒IO线程。接受事件之后,执行handleRead()回调从eventfd调用read,避免被再次唤醒。

void EventLoop::wakeup()
{  // 通过往eventfd写标志通知,让阻塞poll立马返回并执行回调函数uint64_t one = 1;ssize_t n = sockets::write(wakeupFd_, &one, sizeof one); if (n != sizeof one){LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";}
}void EventLoop::handleRead()
{uint64_t one = 1;ssize_t n = sockets::read(wakeupFd_, &one, sizeof one);if (n != sizeof one){LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";}
}

定时器任务

定时器任务添加通过EventLoop暴露给用户,封装函数更简单易用。TimeQueue::addTimer()内部实际是调用了EventLoop::runInLoop(),作为计算任务放入队列中在loop()函数中执行,保证线程安全;每添加一个定时器,都会更新一次TimerQueue中的timerfd的超时触发时间

// 在某个时刻执行
TimerId EventLoop::runAt(Timestamp time, TimerCallback cb)
{return timerQueue_->addTimer(std::move(cb), time, 0.0);
}
// 在延迟delay之后执行
TimerId EventLoop::runAfter(double delay, TimerCallback cb)
{Timestamp time(addTime(Timestamp::now(), delay));return runAt(time, std::move(cb));
}
// 间隔interval重复执行
TimerId EventLoop::runEvery(double interval, TimerCallback cb)
{Timestamp time(addTime(Timestamp::now(), interval));return timerQueue_->addTimer(std::move(cb), time, interval);
}// 取消定时器任务
void EventLoop::cancel(TimerId timerId)
{return timerQueue_->cancel(timerId);
}

Channel的注册、更新、移除

 Channel封装了socketfd、timerfd、eventfd等时间的封装处理,定时器TimerQueue、EventLoop中的唤醒通道、TCP c/s端都封装了channel,将用户的文件描述符事件、事件处理回调注册到了Poller,以供loop监听事件。

Channel对象构造后,需要至少设置一个关注事件及其回调函数,设置关注事件内部会调用EventLoop::updateChannel(this),将其注册到Poller上;更新关注事件也是。当关闭不再使用Channel,需要显示地关闭注册事件,并从Poller中注销,注销内部实现实际调用EventLoop::removeChannel(this)。因此,这三个操作实际都是在IO线程中执行的,不需要加锁。 三个函数的使用可以参考TimerQueue的构造函数和析构函数。

void EventLoop::updateChannel(Channel* channel)
{assert(channel->ownerLoop() == this);assertInLoopThread();poller_->updateChannel(channel);  // 将channel的事件和回调注册到Poller中
}void EventLoop::removeChannel(Channel* channel)
{assert(channel->ownerLoop() == this);assertInLoopThread();if (eventHandling_){assert(currentActiveChannel_ == channel ||std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end());}poller_->removeChannel(channel);   // 将channel的事件和回调从Poller中移除
}

原文链接:https://blog.csdn.net/wanggao_1990/article/details/119062060

陈硕Muduo库源码学习:EventLoop相关推荐

  1. 三方库源码学习2-Retrofit

    文章目录 一.参考博客 二.Retrofit的介绍 三.什么是动态代理 四. Retrofit.Create()方法 五. ServiceMethod 六. 初看HttpServiceMethod 七 ...

  2. Python Sklearn库源码学习--kmeans

    前言: 分析体检数据希望不拘泥于Sklearn库中已有的聚类算法,想着改一下Kmeans算法.本着学习的目的,现在开始查看sklearn的源代码.希望能够写成一个通用的包. 有必要先交代一下我使用的p ...

  3. Muduo库源码剖析(三)——获取线程tid方法

    相关知识点 __thread __thread修饰 表示使用 线程局部存储机制(threadlocal 机制) ,即会为修饰的变量在当前线程存储一份copy,别的线程是看不到这个变量的修改 __thr ...

  4. Masonry 源码学习整理

    @(第三方库源码学习) [TOC] Masonry框架的类结构 学习一.Masonry采用了经典的组合设计模式(Composite Pattern). 1.定义 将对象组合成树状结构以表示" ...

  5. android 三方_面试官送你一份Android热门三方库源码面试宝典及学习笔记

    前言 众所周知,优秀源码的阅读与理解是最能提升自身功力的途径,如果想要成为一名优秀的Android工程师,那么Android中优秀三方库源码的分析和理解则是必备技能.就拿比较热门的图片加载框架Glid ...

  6. postgresql源码学习(57)—— pg中的四种动态库加载方法

    一. 基础知识 1. 什么是库 库其实就是一些通用代码,可以在程序中重复使用,比如一些数学函数,可以不需要自己编写,直接调用相关函数即可实现,避免重复造轮子. 在linux中,支持两种类型的库: 1. ...

  7. DotText源码学习——ASP.NET的工作机制

    --本文是<项目驱动学习--DotText源码学习>系列的第一篇文章,在这之后会持续发表相关的文章. 概论 在阅读DotText源码之前,让我们首先了解一下ASP.NET的工作机制,可以使 ...

  8. PostgreSQL源码学习(1)--PG13代码结构

    PostgreSQL源码学习(1)–PG13代码结构 PostgreSQL代码结构 Bootstrap:用于支持Bootstrap运行模式,该模式主要用来创建初始的模板数据库. Main:主程序模块, ...

  9. AFNetworking源码学习

    AFNetworking源码学习 简介 AFNetWorking是使用Objective-c开发iOS程序主流的网络请求开源库. AFNetworking组织结构 AFNetWorking主要分为5个 ...

最新文章

  1. unittest笔记
  2. c++学习笔记之友元函数
  3. Python爬虫开发:requests库的使用--发送带参数get请求
  4. 苏州飘“彩云” 五年规模破百亿元
  5. nvidia控制面板点了没反应win7_为什么没有nvidia控制面板_win7没有nvidia控制面板怎么找回-系统城...
  6. 小程序学习(一):点击爱心变色 -- 最简单的事件实现
  7. Python面试题大全(三):Web开发(Flask、爬虫)
  8. java输出1-100之间的全部素数
  9. 不就是SELECT COUNT语句吗,竟然能被面试官虐的体无完肤
  10. 中国糖化酶行业市场供需与战略研究报告
  11. [转] Bound Service的三种方式(Binder、 Messenger、 AIDL)
  12. mvn clean install时出现 java.lang.ClassCastException
  13. JavaScript 对象 和 函数
  14. 小白用python处理excel文件-python高手之路python处理excel文件(方法汇总)
  15. Tableau:仪表板操作
  16. [gdc17]《守望先锋》的EntityComponent架构
  17. 团队中各成员间相互协作办公用哪一个软件
  18. dns被劫持怎么办,什么是dns劫持,有什么方法处理?
  19. java 学习7.13 正则表达式 Pattern和Matcher类 Math类 Random类 System类 BigDecimal类 Date类 SimpleDateFormat类 Cale
  20. 治理通胀首先要控制货币发行

热门文章

  1. 公司的收益如何预测?时间序列模型轻松搞定
  2. Unity3d Animator 动画倒放
  3. Redis set常用命令
  4. pycharm快捷键设置
  5. 前端自学路线图之前端自学大纲
  6. 大话:行锁 间隙锁 表锁 临键锁
  7. 系统交易策略 hylt
  8. 学会linux需要哪些技术,运维安全需要掌握哪些技术呢?linux基础知识学习
  9. Matlab中cell2mat,num2cell用法
  10. pvcreate 创建物理卷PV