licode源码分析-线程模型

服务器一般都会服务于大量的用户,所以服务端程序的性能往往决定服务用户的多少。现在服务器上的CPU都是多核的,服务端程序为了充分发挥CPU的性能,会使用多进程或多线程。而使用多线程会造成资源的竞争,一般情况下都会使用锁了解决资源竞争。在使用锁时,应该尽可能的减小临界区,以提高程序的并发性能。

在流媒体服务器中,往往要处理数据量是巨大的,若锁使用不当,会造成服务器性能低下。MediaSoup使用的是多进程单线程模型,进程间通过管道和socket进行通信,并不需要使用锁。licode通过编程的方式的方式,使得临界区非常小,尽可能的减小由于锁带来的性能降低。

说明:由于我对javascript不熟,并没有阅读licode js部分的代码,仅仅阅读了C++。至于licode js层是以何种逻辑调用C++代码的,我不太清楚。以下的分析,全部是基于对C++代码的理解,若和licode js层调用逻辑不同,请见谅。

使用任务队列减小临界区

当多个线程访问同一个类对象时,往往会造成数据竞争。一般的方式是,在类对象修改函数内使用锁,这样就能保证程序的正确性,但这样性能往往是低下的。

class A{public:void add(){// lock_guard<mutex> guard(m_);count_ += 1;}void print(){cout<<"count = "<<count_<<endl;}
private:int count_ = 0;//std::mutex m_;
};void func(A & a){for(int i=0;i<10000000;i++){a.add();}
}int main(){A a;thread t1(func,std::ref(a));thread t2(func,std::ref(a));t1.join();t2.join();a.print();return 0;
}
root@learner:~/vscode/tmp# time ./a.out     #不使用锁
count = 17688684real   0m0.061s
user    0m0.060s
sys 0m0.000sroot@learner:~/vscode/tmp# time ./a.out      #使用锁
count = 20000000real   0m0.784s
user    0m0.730s
sys 0m0.004s

使用两个线程同时对同一个a对象调用一千万次加一,可以看出不使用锁时,结果是不正确的。使用了锁,结果是正确的,但消耗的时间更长了。

假设有线程x、线程y和线程z和一个类A的对象a,现在存在x、y、z线程同时访问a的情况。为了避免a对象内部出现竞态,通常会在修改a对象的代码处,加上锁,同一个时刻只有一个线程可以访问,其他线程会被阻塞住。

换一个思路,能不能让a对象只属于线程x,只有线程x才能修改a对象,其他线程不能直接修改a对象。线程y在需要修改a对象的地方,向线程x的任务队列投递一个修改的任务,这个任务由线程x处理。此时,其他的线程都不会直接的修改a对象,仅有线程x,所以在修改a对象的时候,并不需要上锁。

现在用到锁的地方是,在线程y向线程x投递任务时,会修改线程x的任务队列,此时是需要上锁的。同理,线程x从任务队列取任务时,也是需要锁的。现在临界区变为了任务队列,在修改任务队列一般很快能就能完成,这样的临界区很小。

licode就是使用的这种方式,每一个实例对象都有其所属的线程,当其他线程想修改该实例对象时,需要向实例对象所属线程的任务队列投递任务,由其所属的线程进行修改。

licode中有两种线程,一种是Worker线程,另一种是IOWorker线程。IOWoker用于连接的建立和网络数据的收发,Worker线程用于数据流的处理。

类在创建对象的时候,需要指定其所属的线程对象。在licode中有四个重要的类,其中NicerConnection属于IOworker线程管理,WebRtcConnection、MediaStream、DtlsTransport属于Worker线程管理。

这些类在创建的时候,需要指定所属的IOworker线程对象或Worker线程对象。

NicerConnection(std::shared_ptr<IOWorker> io_worker, ...);
DtlsTransport(..., std::shared_ptr<IOWorker> io_worker);
MediaStream(std::shared_ptr<Worker> worker, ...);
WebRtcConnection(std::shared_ptr<Worker> worker, ...);

以上是这些类的构造器,在构造类对象的时候,需要指定线程对象。

例如WebRtcConnection在创建对象wc时,指定了worker线程对象,那么之后wc对象会在worker线程能执行,当其他线程想要修改wc对新时,需要向worker线程内投递任务,用于修改wc对象。

Worker线程

Worker类使用了ASIO库处理异步的任务,使用ThreadPool提供的Scheduler处理定时事件。在默认的参数下,ThreadPool会开启两个线程运行Scheduler,当Worker遇到定时任务时,需要将定时任务抛到Scheduler中。Scheduler使用std::multimap<std::chrono::system_clock::time_point, Function>存放定时任务,当任务定时到期后,Scheduler会重新将这个定时任务投递到原来的Worker线程中,交由原来的Worker线程处理。所以Scheduler并不处理任何任务,只负责任务的定时。如果在Scheduler中处理任务,也会造成数据竞争。例如WebRtcConnection对象向Worker线程投递了一个定时任务,Worker线程将这个任务投递到Scheduler线程,当该任务定时到期后,若在Scheduler线程中执行,则可能和Worker线程之间发生数据竞争。

IOWorker线程

IOWorker线程用于连接的建立和网络数据的收发。nICEr库就在这个线程内运行。

void IOWorker::start(std::shared_ptr<std::promise<void>> start_promise)
{if (started_.exchange(true)){return;}thread_ = std::unique_ptr<std::thread>(new std::thread([this, start_promise] {start_promise->set_value();   /*告知调用者,线程已启动。*/while (!closed_) {int events;struct timeval towait = {0, 100000};  struct timeval tv;int r = NR_async_event_wait2(&events, &towait);if (r == R_EOD)   /*没有注册事件时*/{std::this_thread::sleep_for(std::chrono::milliseconds(10));   /*睡眠10ms*/}/*更新当前时间*/gettimeofday(&tv, 0);NR_async_timer_update_time(&tv);/*一次取出所有的任务*/std::vector<Task> tasks; {std::unique_lock<std::mutex> lock(task_mutex_);    tasks.swap(tasks_);  /*尽可能的减小临界区*/}/*执行从任务队列取消的所有任务*/for (Task &task : tasks) {task();}}}));
}

向IOWorker投递的任务,都保存在std::vector<Task> tasks_中,线程在处理任务队列中的任务时,并不是加上锁,让后将所有任务处理完毕后,再释放锁,这样临界区很大。该线程在处理队列中任务时,通过swap一次vector,一次性将所有任务全部取出,然后立即释放锁,离开临界区。在临界区之外,没有线程竞争的地方再去依次的处理每个任务。

交换vector仅仅是交换三个指针,所以速度很快。

线程池

licode的提供了两个线程池,一个是IOThreadPool,另一个是ThreadPool。

std::shared_ptr<Worker> ThreadPool::getLessUsedWorker();
std::shared_ptr<IOWorker> IOThreadPool::getLessUsedIOWorker();

这两个线程池在提供线程时,总是返回被使用次数最少的线程。

现在我们假设,在创建IOThreadPool时,指定创建两个IOWorker线程。在创建ThreadPool时,指定创建两个Worker线程。应用处理层有一个线程。licode连接一个发送客户端和两个接收客户端,先与发送客户端相连,之后再和接收客户端相连。并且同一个客户端内WebRtcConnection和MediaStream使用同一个Worker线程。

此时licode的内部,各个线程和各个类的关系如下图:

线程间传递数据,是通过投递任务实现的。以下是NicerConnection给Transport传递数据时,通过将数据封装成任务投递到Transport所在线程任务队列.

void Transport::onPacketReceived(packetPtr packet)
{std::weak_ptr<Transport> weak_transport = Transport::shared_from_this();/*将packet封装成为一个任务,投递到Transport所在线程任务队列。*/worker_->task([weak_transport, packet]() {if (auto this_ptr = weak_transport.lock()){if (packet->length > 0) {this_ptr->onIceData(packet);   }if (packet->length == -1){this_ptr->running_ = false;return;}}});}

这个函数会在IOWorker线程内被调用。

使用future和promise实现线程同步

future和promise是在C++11提供的新的功能,可以用于线程间同步。

void foo(shared_ptr<promise<void>> pp,int i)
{while(i--){cout<<"sleep "<<i<<" ..."<<endl;this_thread::sleep_for(chrono::seconds(1));}pp->set_value();this_thread::sleep_for(chrono::seconds(5));cout<<"over ..."<<endl;
}int main()
{shared_ptr<promise<void>> pp = make_shared<promise<void>>();thread t(foo,pp,3);future<void> f = pp->get_future();cout<<"waiting ..."<<endl;f.get();     //阻塞,等待子线程中pp调用set_value()。cout<<"waitint over ..."<<endl;t.join();cout<<"finish ..."<<endl;return 0;
}
waiting ...
sleep 2 ...
sleep 1 ...
sleep 0 ...
waitint over ...
over ...
finish ...
boost::future<void> WebRtcConnection::asyncTask(std::function<void(std::shared_ptr<WebRtcConnection>)> f)
{/*创建promise*/auto task_promise = std::make_shared<boost::promise<void>>();std::weak_ptr<WebRtcConnection> weak_this = shared_from_this();/*封装任务,并将任务投递到worker_线程。*/worker_->task([weak_this, f, task_promise] {if (auto this_ptr = weak_this.lock()) {     f(this_ptr);       /*执行任务*/}/*任务处理后,通知发送任务的线程。*/task_promise->set_value();});/*返回promise对应的future,用于任务的同步执行。*/return task_promise->get_future();
}

future和promise可以使,线程投递任务后,阻塞的等待任务被处理后,再接着往下执行。

boost::future<void> WebRtcConnection::addRemoteCandidate(std::string mid, int mLineIndex, CandidateInfo candidate)
{return asyncTask([mid, mLineIndex, candidate] (std::shared_ptr<WebRtcConnection> connection) {connection->addRemoteCandidateSync(mid, mLineIndex, candidate);});
}

应用层线程收到远端的Candidate后,调用WebRtcConnection::addRemoteCandidate()函数,这个函数通过asyncTask()函数向WebRtcConnection对象所在的Worker线程投递一个任务,同时返回future对象。应用层线程拿到future对象,然后调用future::get()函数,该线程就会阻塞。直到投递的任务被Worker线程处理后,Worker线程会调用任务中的task_promise->set_value();语句,此时阻塞在future::get()上的应用线程,将被唤醒,并接着往下执行。

使用weak_ptr判断对象是否还活着

投递到任务队列中的任务,在执行的时候,可能任务中的对象已经被销毁了。若继续处理这个任务,则会导致内存崩溃。还需要一种方式,在处理任务的时候,可以判断任务中的对象是否还有效,若有效则继续处理,否则丢弃这个任务。

boost::future<void> WebRtcConnection::asyncTask(std::function<void(std::shared_ptr<WebRtcConnection>)> f)
{auto task_promise = std::make_shared<boost::promise<void>>();std::weak_ptr<WebRtcConnection> weak_this = shared_from_this();worker_->task([weak_this, f, task_promise] {/*判断WebRtcConnection对象是否还存在,若已经被销毁,则丢弃任务。*/if (auto this_ptr = weak_this.lock()) {f(this_ptr);    }task_promise->set_value();});return task_promise->get_future();
}

weak_ptr是对象的一种弱引用,引用对象是并不会增加引用计数。

在处理任务时,如果weak_ptr指向的对象依然存在,则lock()将返回shared_ptr,否则返回一个空shared_ptr。

licode源码分析-线程模型相关推荐

  1. brpc源码分析——线程模型

    brpc线程模型 从一个server的启动过程谈起,我们这里以echo server为例: int main(int argc, char* argv[]) {// gflags介绍:https:// ...

  2. Elasticsearch源码分析—线程池(十一) ——就是从队列里处理请求

    Elasticsearch源码分析-线程池(十一) 转自:https://www.felayman.com/articles/2017/11/10/1510291570687.html 线程池 每个节 ...

  3. JUC源码分析-线程池篇(五):ForkJoinPool - 2

    通过上一篇(JUC源码分析-线程池篇(四):ForkJoinPool - 1)的讲解,相信同学们对 ForkJoinPool 已经有了一个大概的认识,本篇我们将通过分析源码的方式来深入了解 ForkJ ...

  4. v21.07 鸿蒙内核源码分析(线程概念) | 是谁在不断的折腾CPU | 百篇博客分析OpenHarmony源码

    子曰:"若圣与仁,则吾岂敢.抑为之不厌,诲人不倦,则可谓云尔已矣." <论语>:述而篇 百篇博客系列篇.本篇为: v21.xx 鸿蒙内核源码分析(线程概念篇) | 是谁 ...

  5. ThreadPoolExecutor的应用和实现分析(中)—— 任务处理相关源码分析 线程利用(转)...

    前面一篇文章从Executors中的工厂方法入手,已经对ThreadPoolExecutor的构造和使用做了一些整理.而这篇文章,我们将接着前面的介绍,从源码实现上对ThreadPoolExecuto ...

  6. 从源码分析线程池(池化技术)的实现原理

    线程池是一个非常重要的知识点,也是池化技术的一个典型应用,相信很多人都有使用线程池的经历,但是对于线程池的实现原理大家都了解吗?本篇文章我们将深入线程池源码来一探究竟. 线程池的起源 背景: 随着计算 ...

  7. licode源码分析-媒体数据的处理流程

    本文主要分析licode C++部分对视频流的处理流程.主要介绍licode从发送客户端接收视频流,然后经过内部的处理,再将视频流发送到接收客户端. licode虽然是MCU模型,但提供的主要功能还是 ...

  8. 第一次作业:深入Linux源码分析进程模型

    一.进程的概念 第一,进程是一个实体.每一个进程都有它自己的地址空间,一般情况下,包括文本区域(text region).数据区域(data region)和堆栈(stack region).文本区域 ...

  9. 第一次作业:深入源码分析进程模型

    本文针对linux罗列一些个人的理解或看法 这是本人第一次写博客,有写得不好的地方请见谅. 进程是什么 当我们打开任务管理器的时候,我们可以看到这样的画面: 从这里我们可以看到进程的页面,当然这样的理 ...

最新文章

  1. redhat6.4中手动创建oracle11g数据库
  2. Android 自定义viewpager 三张图片在同一屏幕轮播的效果
  3. Centos环境下部署游戏服务器-软件安装
  4. 怎么通过id渲染页面_「快页面」动态配置化页面渲染器原理介绍
  5. grub2从usb启动
  6. Vue中使用axios的响应拦截器处理请求失败的情况(处理token过期问题)以及 登录成功跳转回原来页面问题
  7. JS的typeof力所能及已经力所不及
  8. 不要62(HDU-2089)
  9. Python数据清理之数据质量
  10. 程序员的自我修养(2)——计算机网络
  11. (84)多周期路径约束基础
  12. 短命的 CentOS 8 将停止维护
  13. 几种web报表打印方案的比较
  14. python中进制转换函数_Python内置函数进制转换的用法
  15. VS2015密钥 VS2017密钥
  16. 什么是软件开发生命周期?
  17. Android微信如何退版本,微信7.0降级教程 微信版本回退教程
  18. IO流文件指针(移动和获取文件读指针)
  19. 排查 java 程序CPU飙升问题
  20. C语言时间库操作-->协调时转本地时

热门文章

  1. 公网私网IP地址划分
  2. 学习python数据分析的30个练手数据+4个数据集网站
  3. 10年测开经验面试35K公司后,吐血整理出高频面试题和答案!
  4. python数据分析可视化大作业——对地铁数据的简单数据分析
  5. hosts文件中添加地址映射
  6. idea启动tomcat时日志中文乱码
  7. 部署Docker容器虚拟化平台
  8. 在公司写代码时,我总想回家带娃。
  9. 荧光纳米/多肽/聚合物纳米AIE微球/正电荷/pH响应性AIE荧光纳米微球的相关研究
  10. 哈希表的应用--vijos 毒药?解药?