Disruptor简介

Disruptor 是一个用于在线程间通信的高效低延时的开源框架,它被用在了LMAX系统中。这个系统是建立在JVM平台上,核心是一个业务逻辑处理器,官方号称它能够在一个线程里每秒处理6百万订单。
Disruptor的设计似乎和BlockingQueue相近(生产者—消费者),但是 Disruptor能够在无锁的情况下实现Queue并发操作,也就是 Disruptor实际上是非阻塞的。下图是官方给出的和ArrayBlockingQueue对比测试结果:

Unicast: 1P – 1C指的是一个生产者,一个消费者。
Pipeline: 1P – 3C指的是一个生产者产生的事件要顺序被三个消费者使用。
Sequencer: 3P – 1C指的是三个生产者,一个消费者。
Multicast: 1P – 3C指的是一个生产者,三个消费者。
Diamond: 1P – 3C指的是一个生产者,三个消费者(c1和c2同时处理完成后,c3才能处理)。

从上面可以明显看出: ArrayBlockingQueue的效率比Disruptor低很多。


Disruptor构成

RingBuffer

RingBuffer是一个由数组**(有界)**构成的环形队列,它是不同线程之间传递数据的缓冲区,也就是真正存储消息数据的地方。
RingBuffer维护了一个指针(cursor/Sequence)来向队列中插入或者读取消息。这个指针是java long类型的(64位有符号数),指针采用往上计数自增的方式,通过对指针按照数组的长度取模找出数组的下标来定位入口,为了提高性能通常将ring buffer的size大小设置成2的N次方。
RingBuffer对数据的写入是一个两段操作:先获取可以写入的位置,再执行真正的写入。
具体过程可以参考这篇文章。

Producer

Producer即生产者,泛指调用 Disruptor 发布事件(把写入缓冲队列的一个元素定义为一个事件)的用户代码。

有两种实现策略,一个是SingleThreadedStrategy(单线程策略)另一个是 MultiThreadedStrategy(多线程策略),两种策略对应的实现类为SingleProducerSequencer、MultiProducerSequencer,两者都实现了Sequencer类,之所以叫Sequencer是因为他们都是通过Sequence来实现数据写 。
它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。具体使用哪个根据自己的场景来定,多线程的策略使用了AtomicLong(Java提供的CAS操作),而单线程的使用long,没有锁也没有CAS。这意味着单线程版本会非常快,因为它只有一个生产者,不会产生序号上的冲突。

Consumer/EventProcessor

Consumer和EventProcessor是一个概念,新的版本中由EventProcessor概念替代了Consumer。

Sequence

Sequence是一个递增的序号,说白了就是计数器;其次,由于需要在线程间共享,所以Sequence是引用传递,并且是线程安全的;再次,Sequence支持CAS操作;最后,为了提高效率,Sequence通过padding来避免伪共享。

通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。

生产者对RingBuffer的互斥访问,生产者与消费者之间的协调以及消费者之间的协调,都是通过Sequence实现。几乎每一个重要的组件都包含Sequence。

说明:虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。

Sequence Barrier

用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。SequenceBarrier用来在消费者之间以及消费者和RingBuffer之间建立依赖关系

在Disruptor中,依赖关系实际上指的是Sequence的大小关系,消费者A依赖于消费者B指的是消费者A的Sequence一定要小于等于消费者B的Sequence,这种大小关系决定了处理某个消息的先后顺序。因为所有消费者都依赖于RingBuffer,所以消费者的Sequence一定小于等于RingBuffer中名为cursor的Sequence,即消息一定是先被生产者放到Ringbuffer中,然后才能被消费者处理。

SequenceBarrier在初始化的时候会收集需要依赖的组件的Sequence,RingBuffer的cursor会被自动的加入其中。需要依赖其他消费者和/或RingBuffer的消费者在消费下一个消息时,会先等待在SequenceBarrier上,直到所有被依赖的消费者和RingBuffer的Sequence大于等于这个消费者的Sequence。当被依赖的消费者或RingBuffer的Sequence有变化时,会通知SequenceBarrier唤醒等待在它上面的消费者。

Wait Strategy

当消费者等待在SequenceBarrier上时,有许多可选的等待策略,不同的等待策略在延迟和CPU资源的占用上有所不同,可以视应用场景选择:

  • BusySpinWaitStrategy : 自旋等待,类似Linux Kernel使用的自旋锁。低延迟但同时对CPU资源的占用也多。
  • BlockingWaitStrategy : 使用锁和条件变量。CPU资源的占用少,延迟大。
  • SleepingWaitStrategy : 在多次循环尝试不成功后,选择让出CPU,等待下次调度,多次调度后仍不成功,尝试前睡眠一个纳秒级别的时间再尝试。这种策略平衡了延迟和CPU资源占用,但延迟不均匀。
  • YieldingWaitStrategy : 在多次循环尝试不成功后,选择让出CPU,等待下次调。平衡了延迟和CPU资源占用,但延迟也比较均匀。
  • PhasedBackoffWaitStrategy : 上面多种策略的综合,CPU资源的占用少,延迟大。

Event

在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。

EventProcessor

EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。通过把EventProcessor提交到线程池来真正执行,有两类Processor。

其中一类消费者是BatchEvenProcessor。每个BatchEvenProcessor有一个Sequence,来记录自己消费RingBuffer中消息的情况。所以,一个消息必然会被每一个BatchEvenProcessor消费。

另一类消费者是WorkProcessor。每个WorkProcessor也有一个Sequence,多个WorkProcessor还共享一个Sequence用于互斥的访问RingBuffer。一个消息被一个WorkProcessor消费,就不会被共享一个Sequence的其他WorkProcessor消费。这个被WorkProcessor共享的Sequence相当于尾指针。

EventHandler

Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。开发者实现EventHandler,然后作为入参传递给EventProcessor的实例。

总上来看:Producer生产event数据,EventHandler作为消费者消费event并进行逻辑处理。消费消息的进度通过Sequence来控制。费者之间以及消费者和RingBuffer之间的依赖关系由SequenceBarrier来控制。


Disruptor入门Demo

Disruptor创建过程有四步:

  • 建立一个工厂Event类,用于创建Event类实例对象
  • 建立一个监听事件类,用于处理数据
  • 实例化Disruptor实例,配置相应的参数,编写Disruptor核心组件
  • 编写生产者组件,向Disruptor容器中投递数据

下面以一个具体例子说明:

Evevt类:商品信息

public class OrderEvent {private long value;public long getValue() {return value;}public void setValue(long value) {this.value = value;}
}

工厂Event类

public class OrderEventFactory implements EventFactory<OrderEvent> {public OrderEvent newInstance() {return new OrderEvent();}
}

EventHandler类:

public class OrderEventHandler implements EventHandler<OrderEvent> {public void onEvent(OrderEvent orderEvent, long l, boolean b) throws Exception {System.out.println("消费的内容是:" + orderEvent.getValue());}
}

Producer类:产生商品

public class OrderEventProducer {private RingBuffer<OrderEvent> ringBuffer;public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {this.ringBuffer = ringBuffer;}public void sendData(ByteBuffer data){//1 在生产者发送消息的时候, 首先从我们的ringBuffer里面 获取一个可用的序号long sequence = ringBuffer.next();   //0try {//2 根据这个序号, 找到具体的 "OrderEvent" 元素 注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"OrderEvent event = ringBuffer.get(sequence);//3 进行实际的赋值处理event.setValue(data.getLong(0));} finally {//4 提交发布操作ringBuffer.publish(sequence);}}}

执行的Main方法:

    public static void main(String[] args) {int ringBufferSize = 1024 * 1024;ExecutorService executor = Executors.newFixedThreadPool(4);/*** 1 eventFactory: 消息(event)工厂对象* 2 ringBufferSize: 容器的长度* 3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler* 4 ProducerType: 单生产者 还是 多生产者* 5 waitStrategy: 等待策略*///1. 实例化disruptor对象Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(new OrderEventFactory(),ringBufferSize,executor,ProducerType.SINGLE,new BlockingWaitStrategy());//2. 添加消费者的监听 (构建disruptor 与 消费者的一个关联关系)disruptor.handleEventsWith(new OrderEventHandler());//3. 启动disruptordisruptor.start();//4. 获取实际存储数据的容器: RingBufferRingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();OrderEventProducer producer = new OrderEventProducer(ringBuffer);ByteBuffer data = ByteBuffer.allocate(8) ;for(long i=0;i<100;i++){data.putLong(0,i);producer.sendData(data);}disruptor.shutdown();executor.shutdown();}

参考文章:

  • LMAX-Exchange/disruptor
  • 每秒钟承载600万订单级别的无锁并行计算框架-Disruptor
  • Disruptor(无锁并发框架)-发布
  • 并发框架Disruptor场景应用

初探Disruptor相关推荐

  1. 2021年大数据Flink(九):Flink原理初探

    Flink原理初探 Flink角色分工 在实际生产中,Flink 都是以集群在运行,在运行的过程中包含了两类进程. JobManager: 它扮演的是集群管理者的角色,负责调度任务.协调 checkp ...

  2. 从壹开始微服务 [ DDD ] 之一 ║ D3模式设计初探 与 我的计划书

    缘起 哈喽大家周四好!又是开心的一天,时间过的真快,我们的 <从壹开始 .net core 2.1 + vue 2.5 >前后端分离系列共 34 篇已经完结了,当然以后肯定还会有更新和修改 ...

  3. 经典算法研究系列:二、Dijkstra 算法初探

    经典算法研究系列:二.Dijkstra 算法初探  July   二零一一年一月 ====================== 本文主要参考:算法导论 第二版.维基百科. 写的不好之处,还望见谅. 本 ...

  4. 你需要知道的高性能并发框架Disruptor原理

    Disruptor的小史 现在要是不知道Disruptor真的已经很outer了,Disruptor是英国外汇交易公司LMAX开发的一款开源的高性能队列,LMAX Disruptor是一个高性能的线程 ...

  5. 小编带你进入强如 Disruptor 也发生内存溢出?

    前言 OutOfMemoryError 问题相信很多朋友都遇到过,相对于常见的业务异常(数组越界.空指针等)来说这类问题是很难定位和解决的. 本文以最近碰到的一次线上内存溢出的定位.解决问题的方式展开 ...

  6. 浅谈Disruptor

    Disruptor是一个低延迟(low-latency),高吞吐量(high-throughput)的事件发布订阅框架.通过Disruptor,可以在一个JVM中发布事件,和订阅事件.相对于Java中 ...

  7. las格式测井曲线_邹榕,等:顺北和托甫台区块奥陶系断裂结构单元测井响应特征初探...

    引用格式:邹榕,徐中祥,张晓明,等.顺北和托甫台区块奥陶系断裂结构单测井响应特征初探[J].油气藏评价与开发,2020,10(2):18-23.ZOUR, XU Z X, ZHANG X M, et ...

  8. 2018-4-15摘录笔记,《网络表征学习前沿与实践》 崔鹏以及《网络表征学习中的基本问题初探》 王啸 崔鹏 朱文武

    1.来源:<网络表征学习前沿与实践>  崔鹏 (1)随着数据的增加以及计算机计算速度的增加,想当然的以为速度快了,数据再多也是可以自己算的,但是若是数据之间存在着复杂的关系,那么处理一个样 ...

  9. 太牛逼了!项目中用了Disruptor之后,性能提升了2.5倍

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! ‍存储设备往往是速度越快价格越昂贵,速度越快价格越低廉.在 ...

最新文章

  1. SDN/NFV:现状,挑战和未来
  2. JAVA修饰符类型(public,protected,private,friendly)
  3. 如何能在git bash中使用mvn命令_使用Github Actions完成CI/CD工作
  4. cryptojs php 互通_PHP7实现和CryptoJS的AES加密方式互通示例【AES-128-ECB加密】
  5. Ruby 的环境搭建及安装
  6. 【RK3399Pro学习笔记】十、ROS服务端Server的编程实现
  7. 01.神经网络和深度学习 W1.深度学习概论
  8. ssh自动登陆和scp自动拷贝文件
  9. 华为数通设备配置导出
  10. 虚拟机VMware安装学习过程中遇到的几个问题
  11. java编程基本基本框架_盘点Java编程中常用的框架
  12. c是面向什么的语言,c语言是什么语言
  13. Springboot微信公众号开发入门流程(校验签名、access_token获取、生成带参二维码、发送文字、图文消息、被动回复消息、图文消息静默跳转)
  14. matlab中if语句的条件,matlabif条件语句
  15. python图片镜像翻转_OpenCV Python 翻转(镜像)图片 cv.flip
  16. 关于嵌入式的发展方向
  17. java中的nio是啥,java中的NIO
  18. 如何修改PDF中图片的大小尺寸
  19. Excel从身份证号提取生日
  20. Oracle+SQL优化第二弹

热门文章

  1. 大学生ppt汇报中出现的一些问题及解决
  2. RISCV - 2 “Zicsr“, CSR Instructions
  3. Linux中从一个目录路径中获取文件名
  4. codeforces 750D New Year and Fireworks【DFS】
  5. 发光二极管的导通压降导通电流
  6. 【回答问题】ChatGPT上线了!推荐30个以上比较好的中文bert系列的模型/压缩模型
  7. Oracle Database 9i/10g安装后的基本环境与服务
  8. 解决java.lang.NoClassDefFoundError错误的一种方案
  9. 配置Goland的自定义快捷键#1 配置sout快捷键
  10. GPU更多参与 Windows7 WDDM1.1版浅析