5. 通过ProcessingSequenceBarrier#waitFor来获取可以消费的下标。检查警告标志是否正常,

public long waitFor(final long sequence)throws AlertException, InterruptedException, TimeoutException{checkAlert();long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);if (availableSequence < sequence){return availableSequence;}return sequencer.getHighestPublishedSequence(sequence, availableSequence);}

通过阻塞策略获取下标BlockingWaitStrategy#waitFor,cursorSequence为提供端的下标,当没有消息产生时线程会等待mutex.wait(),被唤醒后也会检查依赖的其他线程的定序器是否已经消费过dependentSequence,否的话进入死循环。

public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)throws AlertException, InterruptedException{long availableSequence;if (cursorSequence.get() < sequence){synchronized (mutex){while (cursorSequence.get() < sequence){barrier.checkAlert();mutex.wait();}}}while ((availableSequence = dependentSequence.get()) < sequence){barrier.checkAlert();ThreadHints.onSpinWait();}return availableSequence;}

选取下标和可用标志位的最大值。MultiProducerSequencer

 public long getHighestPublishedSequence(long lowerBound, long availableSequence){for (long sequence = lowerBound; sequence <= availableSequence; sequence++){if (!isAvailable(sequence)){return sequence - 1;}}return availableSequence;}

6. 保存本线程本次获取的可用下标最大值cachedAvailableSequence,WorkProcessor#run,第二次循环processedSequence已经是false,不获取workSequence值,cachedAvailableSequence >= nextSequence满足条件,获取对应的值

public E get(long sequence){return elementAt(sequence);}protected final E elementAt(long sequence){return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));}

消费线程处理workHandler.onEvent(event);最后把processedSequence设置为true,这样下一次循环就重新从workSequence中获取互斥的下标。

7. 发布消息Disruptor#publishEvent=》RingBuffer#publishEvent

public void publishEvent(final EventTranslator<T> eventTranslator){ringBuffer.publishEvent(eventTranslator);}public void publishEvent(EventTranslator<E> translator){final long sequence = sequencer.next();translateAndPublish(translator, sequence);}

获取提供者定序器的下一个可用下标MultiProducerSequencer#next,获取当前下标以及下一个下标,wrapPoint为正数代表已经设置一圈数据了,获取gatingSequenceCache下标,初始值为-1,刚开始时设置成功cursor.compareAndSet(current, next)返回next下标,第二圈时,wrapPoint为0,cachedGatingSequence为-1,获取消费端的下标和当前提供者的下标的最小下标gatingSequence,当wrapPoint大于gatingSequence时,代表消费者还没有消费当前下标,所以不能覆盖当前位置的对象,所以就需要等待LockSupport.parkNanos(1);如果小于的话,就把那个最小位置设置到gatingSequenceCache保存下来。

public long next(){return next(1);}public long next(int n){if (n < 1 || n > bufferSize){throw new IllegalArgumentException("n must be > 0 and < bufferSize");}long current;long next;do{current = cursor.get();next = current + n;long wrapPoint = next - bufferSize;long cachedGatingSequence = gatingSequenceCache.get();if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current){long gatingSequence = Util.getMinimumSequence(gatingSequences, current);if (wrapPoint > gatingSequence){LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?continue;}gatingSequenceCache.set(gatingSequence);}else if (cursor.compareAndSet(current, next)){break;}}while (true);return next;}

7. 转化设置发布,获取对应位置的值,EventTranslator是用来设置对应数组entries下标的值。

private void translateAndPublish(EventTranslator<E> translator, long sequence){try{translator.translateTo(get(sequence), sequence);}finally{sequencer.publish(sequence);}}

设置完后发布对应的下标,设置该下标的可用标志,

 public void publish(final long sequence){setAvailable(sequence);waitStrategy.signalAllWhenBlocking();}private void setAvailable(final long sequence){setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));}private int calculateAvailabilityFlag(final long sequence){return (int) (sequence >>> indexShift);}private int calculateIndex(final long sequence){return ((int) sequence) & indexMask;}private void setAvailableBufferValue(int index, int flag){long bufferAddress = (index * SCALE) + BASE;UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);}

最后唤醒所有在等待的消费线程。

  public void signalAllWhenBlocking(){synchronized (mutex){mutex.notifyAll();}}

Disruptor(3)相关推荐

  1. Java 并发框架Disruptor(七)

    Disruptor VS BlockingQueue的压测对比: import java.util.concurrent.ArrayBlockingQueue;public class ArrayBl ...

  2. Disruptor(1):Disruptor简介

    1 什么是Disruptor Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易.这个系统是建立在JV ...

  3. Disruptor(一)简介

    Disruptor类似于Java中的BlockingQueue,用于线程之间移动数据(事件 Event),但是Disruptor有自己的特点: 把事件多播给多个消费者,并且可以建立消费者之间的依赖关系 ...

  4. Disruptor(二)Sequencer

    1.概述: Sequencer有单生产者和多生产者两种实现.先来看看两个生产者实现的类图. 单生产者:                                                  ...

  5. 构建高性能服务(三)Java高性能缓冲设计 vs Disruptor vs LinkedBlockingQueue--转载

    原文地址:http://maoyidao.iteye.com/blog/1663193 一个仅仅部署在4台服务器上的服务,每秒向Database写入数据超过100万行数据,每分钟产生超过1G的数据.而 ...

  6. 多线程与高并发(九):单机压测工具JMH,单机最快MQ - Disruptor原理解析

    单机压测工具JMH JMH Java准测试工具套件 什么是JMH 官网 http://openjdk.java.net/projects/code-tools/jmh/ 创建JMH测试 1.创建Mav ...

  7. 高并发数据结构Disruptor解析(5)

    WaitStrategy 在Disruptor中,有很多需要等待的情况.例如:使用了SequenceBarrier的消费者需要在某种条件下等待,比如A消费者和B消费者,假设A消费者必须消费B消费者消费 ...

  8. SpringMVC学习(三)——SpringMVC+Slf4j+Log4j+Logback日志集成实战分享

    文章目录 1.概述 1.1 说明 1.2 日志体系 1.2.1 JCL日志面门介绍 1.2.2 Slf4j日志面门介绍 2.几种日志系统介绍: 2.1 Slf4j 2.2 Commons-loggin ...

  9. 并发译文翻译计划(二)

    Doug Lea 的文献 Synchronizer Framework  译者:ClarenceAu (已翻译完成,在校对) Fork/Join  译者:Alex(陆续发表中) Java Concur ...

最新文章

  1. MySQL高级能量预警
  2. SAP NetWeaver 业务运作面向服务平台 介绍
  3. CxImage的使用及基本用法
  4. SpringBoot--HelloWord
  5. python画人口迁徙图_echarts 手把手教你画迁徙图(城市内部级别+百度地图支持)2...
  6. 配电基础知识汇总,99%的人都收藏了!
  7. 【Python】吊打pyecharts,又一超级棒的开源可视化库
  8. Android 视频播放器 VideoView 的使用,播放本地视频 和 网络 视频
  9. VS2010 运行库设置
  10. jsonp跨域请求响应结果处理函数(python)
  11. udp模拟tcp java_Java简单实现UDP和TCP
  12. 看过这五条,再离职!
  13. JavaScript(三)—— JavaScript 函数/JavaScript 作用域/JavaScript 预解析/JavaScript 对象
  14. 作为现代计算机理论的基础的,作为现代计算机理论基础的冯·诺依曼原理和思想是()。...
  15. 算法提高 超级玛丽(java)
  16. 002649:springboot下mybatis运行原理
  17. selinux为enforcing模式时,运行anonymous上传之后无法删除
  18. 运算符优先级(cpp/c)
  19. Dataset之图片数据增强:设计自动生成汽车车牌图片算法(cv2+PIL)根据指定七个字符自动生成逼真车牌图片数据集(带各种噪声效果)
  20. JS实现实时显示时间

热门文章

  1. python我想对你说_python学习第4天----is和==区别、小数据池、编码解码
  2. Mac os上MAMP连接mysql失败 和mysql访问问题。关于Access denied for user root @ localhost (using password: YES)的解决
  3. [ouc]走向深度学习-1.0
  4. 亿方云面试经验(后台开发工程师实习)
  5. js获取指定class和id的值
  6. 英语口语练习系列-C25-冒险-课堂用语-葬我
  7. Hadoop HA 重新格式化
  8. LeetCode473. 火柴拼正方形
  9. 哪些方法可以有效降低论文的重复率
  10. Hbase面试题(持续更新)