简介

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。

目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。在美团技术团队它也有不少应用,有的项目架构借鉴了它的设计机制。本文从实战角度剖析了Disruptor的实现原理。

需要特别指出的是,这里所说的队列是系统内部的内存队列,而不是Kafka这样的分布式队列

Java内置队列

介绍Disruptor之前,我们先来看一看常用的线程安全的内置队列有什么问题。Java的内置队列如下表所示。

队列 有界性 数据结构
ArrayBlockingQueue bounded 加锁 arraylist
LinkedBlockingQueue optionally-bounded 加锁 linkedlist
ConcurrentLinkedQueue unbounded 无锁 linkedlist
LinkedTransferQueue unbounded 无锁 linkedlist
PriorityBlockingQueue unbounded 加锁 heap
DelayQueue unbounded 加锁 heap

队列的底层一般分成三种:数组、链表和堆。其中,堆一般情况下是为了实现带有优先级特性的队列,暂且不考虑。

我们就从数组和链表两种数据结构来看,基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全;基于链表的线程安全队列分成LinkedBlockingQueue和ConcurrentLinkedQueue两大类,前者也通过锁的方式来实现线程安全,而后者以及上面表格中的LinkedTransferQueue都是通过原子变量compare and swap(以下简称“CAS”)这种不加锁的方式来实现的。

通过不加锁的方式实现的队列都是无界的(无法保证队列的长度在确定的范围内);而加锁的方式,可以实现有界队列。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;同时,为了减少Java的垃圾回收对系统性能的影响,会尽量选择array/heap格式的数据结构。这样筛选下来,符合条件的队列就只有ArrayBlockingQueue。

ArrayBlockingQueue的问题

ArrayBlockingQueue在实际使用过程中,会因为加锁和伪共享等出现严重的性能问题,我们下面来分析一下。

加锁

现实编程过程中,加锁通常会严重地影响性能。线程会因为竞争不到锁而被挂起,等锁被释放的时候,线程又会被恢复,这个过程中存在着很大的开销,并且通常会有较长时间的中断,因为当一个线程正在等待锁时,它不能做任何其他事情。如果一个线程在持有锁的情况下被延迟执行,例如发生了缺页错误、调度延迟或者其它类似情况,那么所有需要这个锁的线程都无法执行下去。如果被阻塞线程的优先级较高,而持有锁的线程优先级较低,就会发生优先级反转。

Disruptor论文中讲述了一个实验:

  • 这个测试程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次。
  • 机器环境:2.4G 6核
  • 运算: 64位的计数器累加5亿次

|Method | Time (ms) | |— | —| |Single thread | 300| |Single thread with CAS | 5,700| |Single thread with lock | 10,000| |Single thread with volatile write | 4,700| |Two threads with CAS | 30,000| |Two threads with lock | 224,000|

CAS操作比单线程无锁慢了1个数量级;有锁且多线程并发的情况下,速度比单线程无锁慢3个数量级。可见无锁速度最快。

单线程情况下,不加锁的性能 > CAS操作的性能 > 加锁的性能。

在多线程情况下,为了保证线程安全,必须使用CAS或锁,这种情况下,CAS的性能超过锁的性能,前者大约是后者的8倍。

综上可知,加锁的性能是最差的。

关于锁和CAS

保证线程安全一般分成两种方式:锁和原子变量。

采取加锁的方式,默认线程会冲突,访问数据时,先加上锁再访问,访问之后再解锁。通过锁界定一个临界区,同时只有一个线程进入。如上图所示,Thread2访问Entry的时候,加了锁,Thread1就不能再执行访问Entry的代码,从而保证线程安全。

但是大家都知道加锁的代码效率会比较低

Disruptor的设计方案

Disruptor通过以下设计来解决队列速度慢的问题:

  • 环形数组结构

为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。

  • 元素位置定位

数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。

  • 无锁设计

每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

下面忽略数组的环形结构,介绍一下如何实现无锁设计。整个过程通过原子变量CAS,保证操作的线程安全。

一个生产者

写数据

生产者单线程写数据的流程比较简单:

  1. 申请写入m个元素;
  2. 若是有m个元素可以入,则返回最大的序列号。这儿主要判断是否会覆盖未读的元素;
  3. 若是返回的正确,则生产者开始写入元素。

多个生产者

多个生产者的情况下,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor的解决方法是,每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可。

但是会遇到一个新问题:如何防止读取的时候,读到还未写的元素。Disruptor在多个生产者的情况下,引入了一个与Ring Buffer大小相同的buffer:available Buffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。

下面分读数据和写数据两种情况介绍。

读数据

生产者多线程写入的情况会复杂很多:

  1. 申请读取到序号n;
  2. 若writer cursor >= n,这时仍然无法确定连续可读的最大下标。从reader cursor开始读取available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置;
  3. 消费者读取元素。

防止不同生产者对同一段空间写入的代码,如下所示:

public long tryNext(int n) throws InsufficientCapacityException
{if (n < 1){throw new IllegalArgumentException("n must be > 0");}long current;long next;do{current = cursor.get();next = current + n;if (!hasAvailableCapacity(gatingSequences, n, current)){throw InsufficientCapacityException.INSTANCE;}}while (!cursor.compareAndSet(current, next));return next;
}

通过do/while循环的条件cursor.compareAndSet(current, next),来判断每次申请的空间是否已经被其他生产者占据。假如已经被占据,该函数会返回失败,While循环重新执行,申请写入空间。

消费者的流程与生产者非常类似,这儿就不多描述了。

通过无锁CAS实现多线程高并发顺序处理

使用Disruptor比使用ArrayBlockingQueue略微复杂,为方便读者上手,增加代码样例。

代码实现的功能:每10ms向disruptor中插入一个元素,消费者读取数据,并打印到终端。详细逻辑请细读代码。

以下代码基于3.3.4版本的Disruptor包

package com.meituan.Disruptor;/*** @description disruptor代码样例。每10ms向disruptor中插入一个元素,消费者读取数据,并打印到终端*/
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;import java.util.concurrent.ThreadFactory;public class DisruptorMain
{public static void main(String[] args) throws Exception{// 队列中的元素class Element {private int value;public int get(){return value;}public void set(int value){this.value= value;}}// 生产者的线程工厂ThreadFactory threadFactory = new ThreadFactory(){@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "simpleThread");}};// RingBuffer生产工厂,初始化RingBuffer的时候使用EventFactory<Element> factory = new EventFactory<Element>() {@Overridepublic Element newInstance() {return new Element();}};// 处理Event的handlerEventHandler<Element> handler = new EventHandler<Element>(){@Overridepublic void onEvent(Element element, long sequence, boolean endOfBatch){System.out.println("Element: " + element.get());}};// 阻塞策略BlockingWaitStrategy strategy = new BlockingWaitStrategy();// 指定RingBuffer的大小int bufferSize = 16;// 创建disruptor,采用单生产者模式Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);// 设置EventHandlerdisruptor.handleEventsWith(handler);// 启动disruptor的线程disruptor.start();RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();for (int l = 0; true; l++){// 获取下一个可用位置的下标long sequence = ringBuffer.next();  try{// 返回可用位置的元素Element event = ringBuffer.get(sequence); // 设置该位置元素的值event.set(l); }finally{ringBuffer.publish(sequence);}Thread.sleep(10);}}
}

等待策略

生产者的等待策略

暂时只有休眠1ns。

LockSupport.parkNanos(1);

消费者的等待策略

名称 措施 适用场景
BlockingWaitStrategy 加锁 CPU资源紧缺,吞吐量和延迟并不重要的场景
BusySpinWaitStrategy 自旋 通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的CPU的场景下使用
PhasedBackoffWaitStrategy 自旋 + yield + 自定义策略 CPU资源紧缺,吞吐量和延迟并不重要的场景
SleepingWaitStrategy 自旋 + yield + sleep 性能和CPU资源之间有很好的折中。延迟不均匀
TimeoutBlockingWaitStrategy 加锁,有超时限制 CPU资源紧缺,吞吐量和延迟并不重要的场景
YieldingWaitStrategy 自旋 + yield + 自旋 性能和CPU资源之间有很好的折中。延迟比较均匀

目前我们使用disruptor已经更新到了3.x版本,比之前的2.x版本性能更加的优秀,提供更多的API使用方式

    <!-- https://mvnrepository.com/artifact/com.lmax/disruptor --><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.4.2</version></dependency>

高并发线程内存事件处理器 disruptor 一 初步了解相关推荐

  1. 高并发线程/锁/内存处理模型

    文章目录 锁与线程 一.进程/线程的基本介绍 进程 线程 1 线程的调度与时间片 2 优先级 3 生命周期 进程与线程的区别 二.线程的使用 2.1 Thread类的介绍 2.2 创建线程的方法 Th ...

  2. 27.Linux网络编程socket变成 tcp 高并发 线程池 udp

    好,咱们开始上课了,从今天开始咱们连续讲 8 天的,网络编程这个还是在linux环境下去讲,咱们先看一下咱们这 8 天都讲什么东西,跟大家一块来梳理一下,你先有个大概的印象,这些你也不要记,那么网络编 ...

  3. **Java有哪些悲观锁的实现_淘宝Java研发面试:Redis+Mybatis+高并发+线程池

    在面试的这件事上,我做了大量的"功课",首先我研究了几乎所有大厂的面试题,还和负责招聘工作的几个朋友,详细的探讨了 Java 面试所要涉及的知识点 并发 简单描述下悲观锁乐观锁 J ...

  4. C++项目:高并发内存池

    文章目录 项目介绍 什么是内存池 池化技术 内存池 malloc 页 定长的内存池 对比测试 高并发内存池整体框架设计 thread cache 整体设计 哈希桶映射对齐规则 TLS无锁访问 Cent ...

  5. 【C】高并发内存池设计

    高并发内存池设计 高并发下传统方式的弊端 在传统C语言中,我们使用malloc.calloc.realloc.free来进行内存的申请分配与释放,函数原型如下.C++中则是new.delete. vo ...

  6. 【项目设计】高并发内存池

    文章目录 项目介绍 内存池介绍 定长内存池的实现 高并发内存池整体框架设计 threadcache threadcache整体设计 threadcache哈希桶映射对齐规则 threadcacheTL ...

  7. [项目设计]高并发内存池

    目录 1.项目介绍 2.高并发内存池整体框架设计 3.thread cache <1>thread cache 哈希桶对齐规则 <2>Thread Cache类设计 4.Cen ...

  8. 高并发内存池设计_内存池

    高并发内存池设计 1. 常用的内存操作函数 2. 高性能内存池设计_弊端解决之道 弊端一 弊端二 弊端三 弊端四 3. 弊端解决之道 内存管理维度分析 内存管理组件选型 4. 高并发内存管理最佳实践 ...

  9. 为什么nodejs是单进程的_Nodejs探秘:深入理解单线程实现高并发原理

    导语:在我接触Nodejs的时候,听的最多的关键字就是:事件驱动.非阻塞I/O.高效.轻量,是单线程且支持高并发的脚本语言.可为什么单线程的nodejs可以支持高并发呢?很多人都不明白其原理,自己也在 ...

最新文章

  1. 简述python程序的基本构成_(一)Python入门-2编程基本概念:01程序的构成
  2. 数据结构与算法 / 冒泡排序最坏情况下的时间复杂度解析
  3. JavaScript 中最​​重要的保留字
  4. mysql的表导出er关系图_使用Navicat生成ER关系图并导出的方法
  5. angularjs上传文件到服务器,AngularJS:如何使用multipart表单实现简单的文件上传?...
  6. 【Joomla】Gallery 中的图片失去了 Popup 效果
  7. Android File存储
  8. Linux环境下编译运行大型C语言项目
  9. html如何删除表单中的行,用jQuery remove()方法删除表格行(table tr)的写法
  10. 【Java 8 新特性】使用Collectors.toList()方法将Map转化成List的示例
  11. 字节跳动“车轮”收割
  12. ttl计算机,TTL接口
  13. 电脑扬声器没有声音,插上耳机也检测不到
  14. 清音驱腐启鸿蒙,竹韵清音-格律诗词41期
  15. iphoneipad图标尺寸
  16. 算法模型---关联规则挖掘学习
  17. 抖音跳转微信加好友功能实现解析
  18. 初创电商步步谈(一)- 前期准备的内容真不少
  19. 手把手教你利用 python 爬虫分析基金、股票
  20. Linux PPP 实现源码分析

热门文章

  1. J型曲线与如何做出正确决策
  2. 干涩皮肤的解渴神器,水肌美玻尿酸补水保湿面膜
  3. PD 重要监控指标详解
  4. c++ 调用 python脚本, runtime error r6034
  5. 关于lcd1602A的使用
  6. 什么是连通图,(强)连通图
  7. css样式,背景渐变+图片,三角-实心-空心
  8. 多云及多云管理概念简单说明-行云管家
  9. 什么是embedding(把物体编码为一个低维稠密向量),pytorch中nn.Embedding原理及使用
  10. 达梦中创建dblink链接oracle