Disruptor是什么?
Disruptor是一个高性能的异步处理框架,一个轻量级的JMS,和JDK中的BlockingQueue有相似处,但是它的处理速度非常快,获得2011年程序框架创新大奖,号称“一个线程一秒钟可以处理600W个订单”(这有点吓人吧),并且Disruptor不仅仅只有buffer,它提供的功能非常强大,比如它可以帮助我们轻松构建数据流处理(比如一个数据先交给A和B这2个消费者并行处理后再交给C处理,是不是有点想起storm这种流处理,实际上strom的底层就是应用了disruptor来实现worker内部threads的通信)。本文将使用disruptor最新版3.3.6进行介绍,可以在
https://github.com/LMAX-Exchange/disruptor/releases 下载最新的JAR包开始disruptor之旅吧。

轮胎:RingBuffer

RingBuffer,环形缓冲区,在disruptor中扮演着非常重要的角色,理解RingBuffer的结构有利于我们理解disruptor为什么这么快、无锁的实现方式、生产者/消费者模式的实现细节。如下图所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bV2qKzfA-1591960121369)(https://upload-images.jianshu.io/upload_images/22478635-fa10d4054e44f757?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

数组

这个类似于轮胎的东西实际上就是一个数组,使用数组的好处当然是由于预加载的缘故使得访问比链表要快的多。

序号

RingBuffer中元素拥有序号的概念,并且序号是一直增长的,这怎么理解?比如RingBuffer大小为10,那么序号从0开始增长,当到9的时候,相当于转了一圈,如果继续增长的话,那么将覆盖0号元素。也即是说通过 序号%SIZE 来定位元素,实现set/get操作。这里也发现了和队列不同的一个方式,就是不存在元素的删除操作,只有覆盖而已,实际上RingBuffer的这种特别的环形存储方式,使得不需要花费大量的时间用于内存清理/垃圾回收。

由于涉及到取模操作,为了CPU进行位运算更加高效,RingBuffer的大小应该是2的N次方。

无锁的机制

在生产者/消费者模式下,disruptor号称“无锁并行框架”(要知道BlockingQueue是利用了Lock锁机制来实现的),这是怎么做到的呢?下面我们来具体分析下:

一个生产者 + 一个消费者

生产者维护一个生产指针P,消费者维护一个消费者指针C,当然P和C本质上就是序号。2者各操作各的,不需要锁,仅仅需要注意的是生产者和消费者的速度问题,当然这个在disruptor内部已经为我们做了处理,就是判断一下P和C之间不能超过一圈的大小。

一个生产者 + 多个消费者

多个消费者当然持有多个消费指针C1,C2,…,消费者依据C进行各自读取数据,只需要保证生产者的速度“协调”最慢的消费者的速度,就是那个不能超出一圈的概念。此时也不需要进行锁定。

多个生产者 + N个消费者

很显然,无论生产者有几个,生产者指针P只能存在一个,否则数据就乱套了。那么多个生产者之间共享一个P指针,在disruptor中实际上是利用了CAS机制来保证多线程的数据安全,也没有使用到锁。

Disruptor初体验:简单的生产者和消费者

业务数据对象POJO(Event)

public class Order {
    /* 订单ID */
    private long id;

/* 订单信息 */
    private String info;

/* 订单价格 */
    private double price;

public long getId()
    {
        return(id);
    }

public void setId( long id )
    {
        this.id = id;
    }

public String getInfo()
    {
        return(info);
    }

public void setInfo( String info )
    {
        this.info = info;
    }

public double getPrice()
    {
        return(price);
    }

public void setPrice( double price )
    {
        this.price = price;
    }
}

业务数据工厂(Factory)

public class OrderFactory implements EventFactory {
    @Override
    public Object newInstance()
    {
        System.out.println( "OrderFactory.newInstance" );
        return(new Order() );
    }
}

事件处理器(Handler,即消费者处理逻辑)

public class OrderHandler implements EventHandler<Order>{
    @Override
    public void onEvent( Order order, long l, boolean b ) throws Exception
    {
        System.out.println( Thread.currentThread().getName() + " 消费者处理中:" + l );
        order.setInfo( "info" + order.getId() );
        order.setPrice( Math.random() );
    }
}

Main

public class Main {
    public static void main( String[] args ) throws InterruptedException
    {
        /* 创建订单工厂 */
        OrderFactory orderFactory = new OrderFactory();

/* ringbuffer的大小 */
        int RINGBUFFER_SIZE = 1024;

/* 创建disruptor */
        Disruptor<Order> disruptor = new Disruptor<Order>( orderFactory, RINGBUFFER_SIZE, Executors.defaultThreadFactory() );

/* 设置事件处理器 即消费者 */
        disruptor.handleEventsWith( new OrderHandler() );

disruptor.start();

RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();

/* -------------生产数据 */
        for ( int i = 0; i < 3; i++ )
        {
            long sequence = ringBuffer.next();

Order order = ringBuffer.get( sequence );

order.setId( i );

ringBuffer.publish( sequence );
            System.out.println( Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i );
        }

Thread.sleep( 1000 );

disruptor.shutdown();
    }
}

运行结果:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-V25Xei7s-1591960121372)(https://upload-images.jianshu.io/upload_images/22478635-fc42aec1fcc42b38?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

说明:

其实上面的结果已经很明显的说明了,在初始阶段构造Disruptor的时候,会调用工厂Factory去实例化RingBuffer中的Event数据对象。

另外在构造Disruptor的时候,在3.3.6之前使用的是API:

到了3.3.6这些API都不推荐使用了,即不再推荐传入Executor这样的线程池,而是推荐传入ThreadFactory线程工厂。这样的话,关闭disruptor就会自动关闭Executor线程池,而不需要像以前那样必须在关闭disruptor的时候再关闭线程池了。

构造Disruptor时,需要注意ProducerType(SINGLE or MULTI 指示是单个生产者还是多个生产者模式)、WaitStrategy(策略选择,决定了消费者如何等待生产者)。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xosRRHuN-1591960121378)(https://upload-images.jianshu.io/upload_images/22478635-2569d715193c3998?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

单独使用RingBuffer:WorkerPool

如果场景比较简单,我们完全可以不用创建Disruptor,而是仅仅使用RingBuffer功能。

public static void main( String[] args ) throws InterruptedException
{
    ExecutorService        executor    = Executors.newFixedThreadPool( 3 );
    RingBuffer<Order>    ringBuffer    = RingBuffer.create( ProducerType.SINGLE, new OrderFactory(), 1024, new YieldingWaitStrategy() );
    WorkerPool<Order>    workerPool    = new WorkerPool<Order>( ringBuffer, ringBuffer.newBarrier(), new IgnoreExceptionHandler(), new OrderHandler() );

workerPool.start( executor );

/* -------------生产数据 */
    for ( int i = 0; i < 30; i++ )
    {
        long sequence = ringBuffer.next();

Order order = ringBuffer.get( sequence );
        order.setId( i );

ringBuffer.publish( sequence );

System.out.println( Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i );
    }

Thread.sleep( 1000 );

workerPool.halt();
    executor.shutdown();
}

实际上是利用WorkerPool辅助连接消费者。

一个生产者+多个消费者

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wboXIggd-1591960121380)(https://upload-images.jianshu.io/upload_images/22478635-764f278a5cf381cb?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

public static void main( String[] args ) throws InterruptedException
{
    /* 创建订单工厂 */
    OrderFactory orderFactory = new OrderFactory();

/* ringbuffer的大小 */
    int RINGBUFFER_SIZE = 1024;

/* 创建disruptor */
    Disruptor<Order> disruptor = new Disruptor<Order>( orderFactory, RINGBUFFER_SIZE, Executors.defaultThreadFactory() );

/* 设置事件处理器 即消费者 */
    EventHandlerGroup<Order> eventHandlerGroup = disruptor.handleEventsWith( new OrderHandler(), new OrderHandler2() );
    eventHandlerGroup.then( new OrderHandler3() );
    disruptor.start();

RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();

/* -------------生产数据 */
    for ( int i = 0; i < 3; i++ )
    {
        long sequence = ringBuffer.next();

Order order = ringBuffer.get( sequence );

order.setId( i );

ringBuffer.publish( sequence );
        System.out.println( Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i );
    }

Thread.sleep( 1000 );
    disruptor.shutdown();
}

运行结果:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lbaEDZcZ-1591960121382)(https://upload-images.jianshu.io/upload_images/22478635-00beb3bb71654ffa?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

生产者生产了3条消息,一个消费者线程1消费了这3条数据,另一个消费者线程2也消费了这3条数据,2者是并行的,待消费者线程1和2完毕后,3条数据交给消费者线程3处理。

如果我们想顺序的按照A->B->C呢?

public class Order {
    /* 订单ID */
    private long id;

/* 订单信息 */
    private String info;

/* 订单价格 */
    private double price;

public long getId()
    {
        return(id);
    }

public void setId( long id )
    {
        this.id = id;
    }

public String getInfo()
    {
        return(info);
    }

public void setInfo( String info )
    {
        this.info = info;
    }

public double getPrice()
    {
        return(price);
    }

public void setPrice( double price )
    {
        this.price = price;
    }
}

如果我们想六边形操作呢?

Handler1    h1    = new Handler1();
Handler2    h2    = new Handler2();
Handler3    h3    = new Handler3();
Handler4    h4    = new Handler4();
Handler5    h5    = new Handler5();
disruptor.handleEventsWith( h1, h2 );
disruptor.after( h1 ).handleEventsWith( h4 );
disruptor.after( h2 ).handleEventsWith( h5 );
disruptor.after( h4, h5 ).handleEventsWith( h3 );

到这里相信你对Disruptor已经有所了解了,那么多个生产者多个消费者如何实现呢,其实和上面的代码非常类似,无非是多个生产者都持有RingBuffer可以publish而已。

Java并发编程框架Disruptor相关推荐

  1. java 并发编程框架

    java 并发编程 线程组管理 Executor 框架 Fork-Join框架 线程组管理 线程组: 线程的集合 大线程可包含小线程. 可管理多个线程,但效率较低 重复的创建线程和关闭线程 可通过en ...

  2. disruptor框架为什么不流行_Java并发编程框架Disruptor

    Disruptor是什么? Disruptor是一个高性能的异步处理框架,一个轻量级的JMS,和JDK中的BlockingQueue有相似处,但是它的处理速度非常快,获得2011年程序框架创新大奖,号 ...

  3. java 并发框架源码_某网Java并发编程高阶技术-高性能并发框架源码解析与实战(云盘下载)...

    第1章 课程介绍(Java并发编程进阶课程) 什么是Disruptor?它一个高性能的异步处理框架,号称"单线程每秒可处理600W个订单"的神器,本课程目标:彻底精通一个如此优秀的 ...

  4. Java 并发编程——Executor框架和线程池原理

    Java 并发编程系列文章 Java 并发基础--线程安全性 Java 并发编程--Callable+Future+FutureTask java 并发编程--Thread 源码重新学习 java并发 ...

  5. Java并发编程实战————Executor框架与任务执行

    引言 本篇博客介绍通过"执行任务"的机制来设计应用程序时需要掌握的一些知识.所有的内容均提炼自<Java并发编程实战>中第六章的内容. 大多数并发应用程序都是围绕&qu ...

  6. Java并发编程(08):Executor线程池框架

    本文源码:GitHub·点这里 || GitEE·点这里 一.Executor框架简介 1.基础简介 Executor系统中,将线程任务提交和任务执行进行了解耦的设计,Executor有各种功能强大的 ...

  7. 《Java并发编程的艺术》——Java中的并发工具类、线程池、Execute框架(笔记)

    文章目录 八.Java中的并发工具类 8.1 等待多线程完成的CountDownLatch 8.2 同步屏障CyclicBarrier 8.2.1 CyclicBarrier简介 8.2.2 Cycl ...

  8. 【Java】Java并发编程

    文章主要目的帮助开发人员创建安全和高性能的并发类,提供各种实用设计规则,同时更加轻松应对并发编程相关面试. 简介 线程是实现并发的基础,能使复杂的异步代码变得更简单,极大简化了复杂系统的开发.充分发挥 ...

  9. 【极客时间】《Java并发编程实战》学习笔记

    目录: 开篇词 | 你为什么需要学习并发编程? 内容来源:开篇词 | 你为什么需要学习并发编程?-极客时间 例如,Java 里 synchronized.wait()/notify() 相关的知识很琐 ...

最新文章

  1. HALCON标定倾斜安装镜头
  2. 数字发行:电子书、电影、游戏、音乐
  3. python教程:一篇文章让你理解字符串的格式化
  4. 2020年推荐系统工程师炼丹手册
  5. STL源码剖析---红黑树原理详解下
  6. matlab贝叶斯优化工具箱_经济学人的神器——BEAR(贝叶斯估计、分析和回归工具包)...
  7. Spring AOP原理浅析及入门实例
  8. 2020计算机考研只考数据结构的学校,【择校必看】十三所计算机专业课只考数据结构的985院校!...
  9. 工况密度和标况密度怎么换算_什么是载流量?导线的载流量与电流密度怎么计算?图文详解!...
  10. Eclipse alt+/ 失效 解决
  11. 再学 GDI+[22]: TGPLinearGradientBrush - 之一: TLinearGradientMode
  12. 就在刚刚,人工智能微专业来啦
  13. Linux进阶之VMware Linux虚拟机运行提示“锁定文件失败 虚拟机开启模块snapshot失败”的解决办法...
  14. OpenSSH服务及其相关应用
  15. java 设置sesion 生命周期
  16. BUUCTF:[ACTF新生赛2020]swp
  17. mysql 时间相关问题
  18. jquery获取复选框checkbox被选中的值
  19. Linux下的打包和解压缩命令
  20. 支付宝json_ua加密分析

热门文章

  1. 决策树算法python实现_决策树之python实现ID3算法(例子)
  2. 安卓适配器类中怎么调用intent_设计模式:代理模式/中介者模式 / 桥接模式/适配器 - 七星望...
  3. 自动化wms仓储系统发展五个阶段?
  4. 成像反了_宋国荣, 窦致夏:快速超声 C 扫描成像中的信号频域分析法及其应用...
  5. TensorFlow2.0:数据统计
  6. Redis基础(四)——持久化
  7. HDU 3584 三维树状数组
  8. 在git上面找开源项目遇到的坑
  9. Oracle提升查询性能之-简单范围分区表的创建
  10. 'int' object has no attribute 'backward'报错 使用Pytorch编写 Hinge loss函数