licode源码分析-线程模型
licode源码分析-线程模型
服务器一般都会服务于大量的用户,所以服务端程序的性能往往决定服务用户的多少。现在服务器上的CPU都是多核的,服务端程序为了充分发挥CPU的性能,会使用多进程或多线程。而使用多线程会造成资源的竞争,一般情况下都会使用锁了解决资源竞争。在使用锁时,应该尽可能的减小临界区,以提高程序的并发性能。
在流媒体服务器中,往往要处理数据量是巨大的,若锁使用不当,会造成服务器性能低下。MediaSoup使用的是多进程单线程模型,进程间通过管道和socket进行通信,并不需要使用锁。licode通过编程的方式的方式,使得临界区非常小,尽可能的减小由于锁带来的性能降低。
说明
:由于我对javascript不熟,并没有阅读licode js部分的代码,仅仅阅读了C++。至于licode js层是以何种逻辑调用C++代码的,我不太清楚。以下的分析,全部是基于对C++代码的理解,若和licode js层调用逻辑不同,请见谅。
使用任务队列减小临界区
当多个线程访问同一个类对象时,往往会造成数据竞争。一般的方式是,在类对象修改函数内使用锁,这样就能保证程序的正确性,但这样性能往往是低下的。
class A{public:void add(){// lock_guard<mutex> guard(m_);count_ += 1;}void print(){cout<<"count = "<<count_<<endl;}
private:int count_ = 0;//std::mutex m_;
};void func(A & a){for(int i=0;i<10000000;i++){a.add();}
}int main(){A a;thread t1(func,std::ref(a));thread t2(func,std::ref(a));t1.join();t2.join();a.print();return 0;
}
root@learner:~/vscode/tmp# time ./a.out #不使用锁
count = 17688684real 0m0.061s
user 0m0.060s
sys 0m0.000sroot@learner:~/vscode/tmp# time ./a.out #使用锁
count = 20000000real 0m0.784s
user 0m0.730s
sys 0m0.004s
使用两个线程同时对同一个a对象调用一千万次加一,可以看出不使用锁时,结果是不正确的。使用了锁,结果是正确的,但消耗的时间更长了。
假设有线程x、线程y和线程z和一个类A的对象a,现在存在x、y、z线程同时访问a的情况。为了避免a对象内部出现竞态,通常会在修改a对象的代码处,加上锁,同一个时刻只有一个线程可以访问,其他线程会被阻塞住。
换一个思路,能不能让a对象只属于线程x,只有线程x才能修改a对象,其他线程不能直接修改a对象。线程y在需要修改a对象的地方,向线程x的任务队列投递一个修改的任务,这个任务由线程x处理。此时,其他的线程都不会直接的修改a对象,仅有线程x,所以在修改a对象的时候,并不需要上锁。
现在用到锁的地方是,在线程y向线程x投递任务时,会修改线程x的任务队列,此时是需要上锁的。同理,线程x从任务队列取任务时,也是需要锁的。现在临界区变为了任务队列,在修改任务队列一般很快能就能完成,这样的临界区很小。
licode就是使用的这种方式,每一个实例对象都有其所属的线程,当其他线程想修改该实例对象时,需要向实例对象所属线程的任务队列投递任务,由其所属的线程进行修改。
licode中有两种线程,一种是Worker线程,另一种是IOWorker线程。IOWoker用于连接的建立和网络数据的收发,Worker线程用于数据流的处理。
类在创建对象的时候,需要指定其所属的线程对象。在licode中有四个重要的类,其中NicerConnection属于IOworker线程管理,WebRtcConnection、MediaStream、DtlsTransport属于Worker线程管理。
这些类在创建的时候,需要指定所属的IOworker线程对象或Worker线程对象。
NicerConnection(std::shared_ptr<IOWorker> io_worker, ...);
DtlsTransport(..., std::shared_ptr<IOWorker> io_worker);
MediaStream(std::shared_ptr<Worker> worker, ...);
WebRtcConnection(std::shared_ptr<Worker> worker, ...);
以上是这些类的构造器,在构造类对象的时候,需要指定线程对象。
例如WebRtcConnection在创建对象wc时,指定了worker线程对象,那么之后wc对象会在worker线程能执行,当其他线程想要修改wc对新时,需要向worker线程内投递任务,用于修改wc对象。
Worker线程
Worker类使用了ASIO库处理异步的任务,使用ThreadPool提供的Scheduler处理定时事件。在默认的参数下,ThreadPool会开启两个线程运行Scheduler,当Worker遇到定时任务时,需要将定时任务抛到Scheduler中。Scheduler使用std::multimap<std::chrono::system_clock::time_point, Function>
存放定时任务,当任务定时到期后,Scheduler会重新将这个定时任务投递到原来的Worker线程中,交由原来的Worker线程处理。所以Scheduler并不处理任何任务,只负责任务的定时。如果在Scheduler中处理任务,也会造成数据竞争。例如WebRtcConnection对象向Worker线程投递了一个定时任务,Worker线程将这个任务投递到Scheduler线程,当该任务定时到期后,若在Scheduler线程中执行,则可能和Worker线程之间发生数据竞争。
IOWorker线程
IOWorker线程用于连接的建立和网络数据的收发。nICEr库就在这个线程内运行。
void IOWorker::start(std::shared_ptr<std::promise<void>> start_promise)
{if (started_.exchange(true)){return;}thread_ = std::unique_ptr<std::thread>(new std::thread([this, start_promise] {start_promise->set_value(); /*告知调用者,线程已启动。*/while (!closed_) {int events;struct timeval towait = {0, 100000}; struct timeval tv;int r = NR_async_event_wait2(&events, &towait);if (r == R_EOD) /*没有注册事件时*/{std::this_thread::sleep_for(std::chrono::milliseconds(10)); /*睡眠10ms*/}/*更新当前时间*/gettimeofday(&tv, 0);NR_async_timer_update_time(&tv);/*一次取出所有的任务*/std::vector<Task> tasks; {std::unique_lock<std::mutex> lock(task_mutex_); tasks.swap(tasks_); /*尽可能的减小临界区*/}/*执行从任务队列取消的所有任务*/for (Task &task : tasks) {task();}}}));
}
向IOWorker投递的任务,都保存在std::vector<Task> tasks_中,线程在处理任务队列中的任务时,并不是加上锁,让后将所有任务处理完毕后,再释放锁,这样临界区很大。该线程在处理队列中任务时,通过swap一次vector,一次性将所有任务全部取出,然后立即释放锁,离开临界区。在临界区之外,没有线程竞争的地方再去依次的处理每个任务。
交换vector仅仅是交换三个指针,所以速度很快。
线程池
licode的提供了两个线程池,一个是IOThreadPool,另一个是ThreadPool。
std::shared_ptr<Worker> ThreadPool::getLessUsedWorker();
std::shared_ptr<IOWorker> IOThreadPool::getLessUsedIOWorker();
这两个线程池在提供线程时,总是返回被使用次数最少的线程。
现在我们假设,在创建IOThreadPool时,指定创建两个IOWorker线程。在创建ThreadPool时,指定创建两个Worker线程。应用处理层有一个线程。licode连接一个发送客户端和两个接收客户端,先与发送客户端相连,之后再和接收客户端相连。并且同一个客户端内WebRtcConnection和MediaStream使用同一个Worker线程。
此时licode的内部,各个线程和各个类的关系如下图:
线程间传递数据,是通过投递任务实现的。以下是NicerConnection给Transport传递数据时,通过将数据封装成任务投递到Transport所在线程任务队列.
void Transport::onPacketReceived(packetPtr packet)
{std::weak_ptr<Transport> weak_transport = Transport::shared_from_this();/*将packet封装成为一个任务,投递到Transport所在线程任务队列。*/worker_->task([weak_transport, packet]() {if (auto this_ptr = weak_transport.lock()){if (packet->length > 0) {this_ptr->onIceData(packet); }if (packet->length == -1){this_ptr->running_ = false;return;}}});}
这个函数会在IOWorker线程内被调用。
使用future和promise实现线程同步
future和promise是在C++11提供的新的功能,可以用于线程间同步。
void foo(shared_ptr<promise<void>> pp,int i)
{while(i--){cout<<"sleep "<<i<<" ..."<<endl;this_thread::sleep_for(chrono::seconds(1));}pp->set_value();this_thread::sleep_for(chrono::seconds(5));cout<<"over ..."<<endl;
}int main()
{shared_ptr<promise<void>> pp = make_shared<promise<void>>();thread t(foo,pp,3);future<void> f = pp->get_future();cout<<"waiting ..."<<endl;f.get(); //阻塞,等待子线程中pp调用set_value()。cout<<"waitint over ..."<<endl;t.join();cout<<"finish ..."<<endl;return 0;
}
waiting ...
sleep 2 ...
sleep 1 ...
sleep 0 ...
waitint over ...
over ...
finish ...
boost::future<void> WebRtcConnection::asyncTask(std::function<void(std::shared_ptr<WebRtcConnection>)> f)
{/*创建promise*/auto task_promise = std::make_shared<boost::promise<void>>();std::weak_ptr<WebRtcConnection> weak_this = shared_from_this();/*封装任务,并将任务投递到worker_线程。*/worker_->task([weak_this, f, task_promise] {if (auto this_ptr = weak_this.lock()) { f(this_ptr); /*执行任务*/}/*任务处理后,通知发送任务的线程。*/task_promise->set_value();});/*返回promise对应的future,用于任务的同步执行。*/return task_promise->get_future();
}
future和promise可以使,线程投递任务后,阻塞的等待任务被处理后,再接着往下执行。
boost::future<void> WebRtcConnection::addRemoteCandidate(std::string mid, int mLineIndex, CandidateInfo candidate)
{return asyncTask([mid, mLineIndex, candidate] (std::shared_ptr<WebRtcConnection> connection) {connection->addRemoteCandidateSync(mid, mLineIndex, candidate);});
}
应用层线程收到远端的Candidate后,调用WebRtcConnection::addRemoteCandidate()函数,这个函数通过asyncTask()函数向WebRtcConnection对象所在的Worker线程投递一个任务,同时返回future对象。应用层线程拿到future对象,然后调用future::get()函数,该线程就会阻塞。直到投递的任务被Worker线程处理后,Worker线程会调用任务中的task_promise->set_value();语句,此时阻塞在future::get()上的应用线程,将被唤醒,并接着往下执行。
使用weak_ptr判断对象是否还活着
投递到任务队列中的任务,在执行的时候,可能任务中的对象已经被销毁了。若继续处理这个任务,则会导致内存崩溃。还需要一种方式,在处理任务的时候,可以判断任务中的对象是否还有效,若有效则继续处理,否则丢弃这个任务。
boost::future<void> WebRtcConnection::asyncTask(std::function<void(std::shared_ptr<WebRtcConnection>)> f)
{auto task_promise = std::make_shared<boost::promise<void>>();std::weak_ptr<WebRtcConnection> weak_this = shared_from_this();worker_->task([weak_this, f, task_promise] {/*判断WebRtcConnection对象是否还存在,若已经被销毁,则丢弃任务。*/if (auto this_ptr = weak_this.lock()) {f(this_ptr); }task_promise->set_value();});return task_promise->get_future();
}
weak_ptr是对象的一种弱引用,引用对象是并不会增加引用计数。
在处理任务时,如果weak_ptr指向的对象依然存在,则lock()将返回shared_ptr,否则返回一个空shared_ptr。
licode源码分析-线程模型相关推荐
- brpc源码分析——线程模型
brpc线程模型 从一个server的启动过程谈起,我们这里以echo server为例: int main(int argc, char* argv[]) {// gflags介绍:https:// ...
- Elasticsearch源码分析—线程池(十一) ——就是从队列里处理请求
Elasticsearch源码分析-线程池(十一) 转自:https://www.felayman.com/articles/2017/11/10/1510291570687.html 线程池 每个节 ...
- JUC源码分析-线程池篇(五):ForkJoinPool - 2
通过上一篇(JUC源码分析-线程池篇(四):ForkJoinPool - 1)的讲解,相信同学们对 ForkJoinPool 已经有了一个大概的认识,本篇我们将通过分析源码的方式来深入了解 ForkJ ...
- v21.07 鸿蒙内核源码分析(线程概念) | 是谁在不断的折腾CPU | 百篇博客分析OpenHarmony源码
子曰:"若圣与仁,则吾岂敢.抑为之不厌,诲人不倦,则可谓云尔已矣." <论语>:述而篇 百篇博客系列篇.本篇为: v21.xx 鸿蒙内核源码分析(线程概念篇) | 是谁 ...
- ThreadPoolExecutor的应用和实现分析(中)—— 任务处理相关源码分析 线程利用(转)...
前面一篇文章从Executors中的工厂方法入手,已经对ThreadPoolExecutor的构造和使用做了一些整理.而这篇文章,我们将接着前面的介绍,从源码实现上对ThreadPoolExecuto ...
- 从源码分析线程池(池化技术)的实现原理
线程池是一个非常重要的知识点,也是池化技术的一个典型应用,相信很多人都有使用线程池的经历,但是对于线程池的实现原理大家都了解吗?本篇文章我们将深入线程池源码来一探究竟. 线程池的起源 背景: 随着计算 ...
- licode源码分析-媒体数据的处理流程
本文主要分析licode C++部分对视频流的处理流程.主要介绍licode从发送客户端接收视频流,然后经过内部的处理,再将视频流发送到接收客户端. licode虽然是MCU模型,但提供的主要功能还是 ...
- 第一次作业:深入Linux源码分析进程模型
一.进程的概念 第一,进程是一个实体.每一个进程都有它自己的地址空间,一般情况下,包括文本区域(text region).数据区域(data region)和堆栈(stack region).文本区域 ...
- 第一次作业:深入源码分析进程模型
本文针对linux罗列一些个人的理解或看法 这是本人第一次写博客,有写得不好的地方请见谅. 进程是什么 当我们打开任务管理器的时候,我们可以看到这样的画面: 从这里我们可以看到进程的页面,当然这样的理 ...
最新文章
- redhat6.4中手动创建oracle11g数据库
- Android 自定义viewpager 三张图片在同一屏幕轮播的效果
- Centos环境下部署游戏服务器-软件安装
- 怎么通过id渲染页面_「快页面」动态配置化页面渲染器原理介绍
- grub2从usb启动
- Vue中使用axios的响应拦截器处理请求失败的情况(处理token过期问题)以及 登录成功跳转回原来页面问题
- JS的typeof力所能及已经力所不及
- 不要62(HDU-2089)
- Python数据清理之数据质量
- 程序员的自我修养(2)——计算机网络
- (84)多周期路径约束基础
- 短命的 CentOS 8 将停止维护
- 几种web报表打印方案的比较
- python中进制转换函数_Python内置函数进制转换的用法
- VS2015密钥 VS2017密钥
- 什么是软件开发生命周期?
- Android微信如何退版本,微信7.0降级教程 微信版本回退教程
- IO流文件指针(移动和获取文件读指针)
- 排查 java 程序CPU飙升问题
- C语言时间库操作-->协调时转本地时