concurrentqueue介绍
由于最近在做一个项目,但是框架本身有个不合理的设计。其中的代码是单线程的,数据的读取和计算都在一个线程里面完成。也就是说,我们的程序有很大的一部分时间在读取文件数据,导致最终的运行速度很慢。这里就可以使用多线程来优化。
这里需要使用最基本的生产者消费者模式。
使用若干个线程作为生产者,负责数据的读取和预处理,这部分任务是IO密集型的,也就是不太占CPU,但是比较占带宽,而且有延时。在处理完数据之后,将数据放到一个队列中。
同时,使用若干个线程充当消费者,从这个队列里面获取数据,然后进行计算。计算的部分是CPU密集型的(其实我这里计算是GPU做的,就只有一个消费者),计算完成之后输出结果。
那么贯穿这一整套方案的,就是我们的队列。
在并发任务中,通常都需要一个队列机制,将并行的任务转化成串行的任务,或者将串行的任务提供给并行工作的线程。这个队列会同时被多个线程读写,因此也必须是线程安全的。
一、线程安全的实现策略
对于线程安全的队列的实现,似乎经常成为企业的面试题,常见的实现方法就是互斥量和条件变量,本质上就是锁的机制。同一时间只有一个线程具有读写的权限。锁的机制在并发量不大情况下,十分的清晰有效。在并发量较大的时候,会因为对锁的竞争而越发不高效。同时,锁本身也需要维护一定的资源,也需要消耗性能。
这时候,大家肯定会想问,不使用锁机制,还可以处理这种并发的情况吗?
答案是肯定的,首先我们知道锁主要有两种,悲观锁和乐观锁。对于悲观锁,它永远会假定最糟糕的情况,就像我们上面说到的互斥机制,每次我们都假定会有其他的线程和我们竞争资源,因此必须要先拿到锁,之后才放心的进行我们的操作,这就使得争夺锁成为了我们每次操作的第一步。乐观锁则不同,乐观锁假定在很多情况下,资源都不需要竞争,因此可以直接进行读写,但是如果碰巧出现了多线程同时操控数据的情况,那么就多试几次,直到成功(也可以设置重试的次数)。
我们生活的时候,总会碰到很多的不顺心的事情,比如模型训练崩了,被某些库搞得头大,或者女票又生气了什么的,不妨学习一下乐观锁的精神,再训一次?再编译一次?大不了再哄一次。一次不行就两次。
回到乐观锁上,乐观锁中,每次读写都不考虑锁的存在,那么他是如何知道自己这次操作和其他线程是冲突的呢?这就是Lock-free队列的关键——原子操作。原子操作可以保证一次操作在执行的过程中不会被其他线程打断,因此在多线程程序中也不需要同步操作。在C++的STL中其实也提供了atomic这个库,可以保证多线程在操控同一个变量的时候,即使不加锁也能保证起最终结果的正确性。而我们乐观锁需要的一个原子操作就是CAS(Compare And Swap),绝大多数的CPU都支持这个操作。
CAS操作的定义如下(STL中的一个):
1 |
bool atomic_compare_exchange_weak (atomic<T>* obj, T* expected, T val); |
首先函数会将obj与expected的内容作比较:
- 如果相等,那么将交换obj和val的值,并返回true。
- 如果不相等,则什么也不做,之后返回false。
那么使用这个奇怪的操作,为什么就可以实现乐观锁了呢?这里我们看一个例子。这也是我学习的时候看的例子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
struct list { std::atomic<node*> head; }; ... void append(list* s, node* n) { node* head; do { head = s->head; n->next = head; } while (!std::atomic_compare_exchange_weak(&(s->head), &head, n)); // or while (!s->head.compare_exchange_weak(head, n)); } |
在我们向list中插入元素的时候,首先获取到当前的头指针的值head,然后我们在写数据的时候,首先和此刻的头指针值作对比,如果相同,那么就把新的节点插入。如果不相同,说明有线程先我们一步成功了,那么我们就多尝试一次,直到写入成功。
以上就是使用CAS操作实现的乐观锁。上面的这个append就是最简单的Lock-free且线程安全的操作。
二、concurrentqueue
最近在做这个项目的时候,就被安利了一个header only的C++并发队列库 concurrentqueue。本着不重复造轮子的原则,我在项目中用了这个库,由于它只是两个头文件,特别方便的就加入到了项目中。关于这个库的特点,项目的github上写了很多。这里直接照搬下来,不做解释。
- Knock-your-socks-off blazing fast performance.
- Single-header implementation. Just drop it in your project.
- Fully thread-safe lock-free queue. Use concurrently from any number of threads.
- C++11 implementation — elements are moved (instead of copied) where possible.
- Templated, obviating the need to deal exclusively with pointers — memory is managed for you.
- No artificial limitations on element types or maximum count.
- Memory can be allocated once up-front, or dynamically as needed.
- Fully portable (no assembly; all is done through standard C++11 primitives).
- Supports super-fast bulk operations.
- Includes a low-overhead blocking version (BlockingConcurrentQueue).
- Exception safe.
我体验了一下,感觉最舒服的有以下几点:
- 这个库确实可以很好的实现线程安全队列,而且速度很快。接口也比较简单。很容易上手。
- 整个库就是两个头文件,而且没有其他的依赖,使用C++11实现,兼容各大平台,很容易融入项目。
- 这个并发队列支持阻塞和非阻塞两种。(只在获取元素的时候可以阻塞)
因为这个队列的用法十分简单,这里就直接贴上官网的介绍,然后针对一些细节,补充说明一下,下面是非阻塞队列的常用接口:
ConcurrentQueue(size_t initialSizeEstimate)
Constructor which optionally accepts an estimate of the number of elements the queue will holdenqueue(T&& item)
Enqueues one item, allocating extra space if necessarytry_enqueue(T&& item)
Enqueues one item, but only if enough memory is already allocatedtry_dequeue(T& item)
Dequeues one item, returning true if an item was found or false if the queue appeared empty
ConcurrentQueue(size_t initialSizeEstimate)
这个没什么好说的,一个构造函数,可以指定队列的容量。
enqueue(T&& item)
入队操作。比较有意思的是,如果我们的队列已经满了的话,那么这个还是会把数据放到队列里,使得队列的容量变大。所以,如果希望队列的长度不变的话,尽量还是不要使用这个函数。
try_enqueue(T&& item)
这个也是入队操作,与上一个不同,这个函数当队列已经满了的时候,并不会进行入队操作,而是返回一个bool类型的值,表示是否入队成功。我在使用的时候,会判断这个bool值,如果是false,就让线程等待10ms之后重试。
try_dequeue(T& item)
这个是出队操作,如果队列有值的话,则得到数据(放到参数item里面)。他也会返回一个bool类型的值,表示时候出队成功。
在阻塞版本的队列中,主要多了如下两个函数:
wait_dequeue(T&& item)
和
wait_dequeue_timed(T&& item, std::int64_t timeout_usecs)
。这两个函数的功能类似,都是进行出队操作,如果队列为空,则等待。唯一的区别是,前者永久等待,而后者可以指定等待的时间,如果超时,则会停止等待并返回false。
最后是块操作,两种模式的队列都支持批量插入的操作。这里,我没有用过这些接口,所以大家自行看文档就好。
转自https://www.miaoerduo.com
concurrentqueue介绍相关推荐
- 无锁队列 concurrentqueue介绍(转载)
背景 由于最近在做一个项目,但是框架本身有个不合理的设计.其中的代码是单线程的,数据的读取和计算都在一个线程里面完成.也就是说,我们的程序有很大的一部分时间在读取文件数据,导致最终的运行速度很慢.这里 ...
- ai人工智能与it_保护您的IT业务必须了解的有关网络安全和人工智能的6件事
ai人工智能与it With the year almost coming to a close, there is a need for reflection and tweaks in vario ...
- 如何成为一个更好的开发者
Programming is mostly about learning a language, but how good you are at it is not just dependent on ...
- 第十四节: 介绍四大并发集合类并结合单例模式下的队列来说明线程安全和非安全的场景及补充性能调优问题。...
一. 四大并发集合类 背景:我们目前使用的所有集合都是线程不安全的 . A. ConcurrentBag:就是利用线程槽来分摊Bag中的所有数据,链表的头插法,0代表移除最后一个插入的值. (等价于同 ...
- [一起读源码]走进C#并发队列ConcurrentQueue的内部世界 — .NET Core篇
在上一篇<走进C#并发队列ConcurrentQueue的内部世界>中解析了Framework下的ConcurrentQueue实现原理,经过抛砖引玉,得到了一众大佬的指点,找到了.NET ...
- 第十四节: 介绍四大并发集合类并结合单例模式下的队列来说明线程安全和非安全的场景及补充性能调优问题。
一. 四大并发集合类 背景:我们目前使用的所有集合都是线程不安全的 . A. ConcurrentBag:就是利用线程槽来分摊Bag中的所有数据,链表的头插法,0代表移除最后一个插入的值. (等价于同 ...
- C++ concurrentqueue资料
##特征 - 让人震惊的[快速性能] [基准]. - 单头实现.把它放在你的项目中. - 完全线程安全的无锁队列.同时使用任意数量的线程. - C ++ 11实现 - 在可能的情况下(c11中的mov ...
- iOS的GCD、NSThread、NSOperation、锁、Runloop的介绍和使用
GCD GCD (Grand Central Dispatch) GCD两个核心概念:任务和队列 任务 任务就是执行操作的意思,也就是block那段代码.执行操作有两种:同步执行和异步执行. 同步执行 ...
- 介绍四大并发集合类并结合单例模式下的队列来说明线程安全和非安全的场景及补充性能调优问题。
一. 四大并发集合类 背景:我们目前使用的所有集合都是线程不安全的 . A. ConcurrentBag:就是利用线程槽来分摊Bag中的所有数据,链表的头插法,0代表移除最后一个插入的值. (等价于同 ...
最新文章
- chapter3.3字典
- LeetCode集锦(十) - 第28题 Implement StrStr
- C#导出EXCEL的几种方法
- linux 远程控制权限,总结一下linux远程控制方法
- 谷歌开源下一代推荐系统模拟器:RecSim NG
- PDH光端机的作用及其特点
- 测试服务器性能常用算法,服务器性能剖析(profiling)之——简介
- Redis-Bitmap介绍及使用
- 6 键盘高级操作技巧
- php 获取css值,如何通过JS获取CSS属性值
- javascript匿名函数及闭包深入理解及应用
- 带头结点的单链表的插入删除和遍历操作
- JavaScript延时函数
- 小米摄像头有onvif协议_小米8SE、魅族16T、荣耀畅玩9A对比
- C++的64位整数[转]+gyy整理
- #留言板(五)#留言板界面
- Oracle Temp临时表空间及其故障处理
- 基于VUE的SSR技术-姜威-专题视频课程
- 如何编译DD-WRT
- [讨论]一个真正的IT人来谈中国与印度的软件
热门文章
- 名企2012校园招聘网申截止日期汇总
- 感恩计算机专业作文,感恩作文(通用10篇)
- JQuery——BreakingNews.js新闻滚动效果
- 树莓派初体验,超级便宜的服务器~
- 网络 #TCP #UDP #文件下载器 #网络通信
- 喵喵驿站系统:马里奥|嗷呜系列MA ——可行性分析与需求分析文档
- 探索Hive用户权限(二):HiveServer2安全访问Hive
- 基于量能的抄底摸底+追涨杀跌的交易策略
- mysql失物招领系统_java+ssh+mysql高校失物招领系统
- 优酷VIP会员周卡只需7.5元,看《沉香如屑》用优酷视频