第一步:引入pom

<dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.4.2</version>
</dependency>

第二步:定义MessageModel

@Data
public class MessageModel {private String message;
}

第三步:message配置管理


@Configuration
public class MQManager {@Autowiredprivate RedisUtils redisUtils;@Bean("messageModel")public RingBuffer<MessageModel> messageModelRingBuffer() {// 定义用于事件处理的线程池,// Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理ExecutorService executor = Executors.newFixedThreadPool(2);// 指定事件工厂HelloEventFactory factory = new HelloEventFactory();// 指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率int bufferSize = 1024 * 256;// 单线程模式,获取额外的性能Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor, ProducerType.SINGLE,new BlockingWaitStrategy());// 设置事件业务处理器---消费者disruptor.handleEventsWith(new HelloEventHandler(redisUtils));// 启动disruptor线程disruptor.start();// 获取ringbuffer环,用于接取生产者生产的事件RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();return ringBuffer;}
}

第四步:定义工厂类

public class HelloEventFactory implements EventFactory<MessageModel> {@Overridepublic MessageModel newInstance() {return new MessageModel();}
}

第五步:定义消费端事件处理器


@Slf4j
@Component
public class HelloEventHandler implements EventHandler<MessageModel> {private RedisUtils redisUtils;public HelloEventHandler (RedisUtils redisUtils) {this.redisUtils = redisUtils;}@Overridepublic void onEvent(MessageModel event, long sequence, boolean endOfBatch) {try {// 这里停止1000ms是为了确定消费消息是异步的Thread.sleep(10);redisUtils.buriedPointCount("sayHelloMqReceive");log.info("消费者处理消息开始");if (event != null) {log.info("消费者消费的信息是:{}", event);}} catch (Exception e) {log.info("消费者处理消息失败");}log.info("消费者处理消息结束");}
}

第六步:具体的实现

public interface DisruptorMqService {void sayHelloMq(String message);
}

@Slf4j
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {@Autowiredprivate RingBuffer<MessageModel> messageModelRingBuffer;@Autowiredprivate RedisUtils redisUtils;@Overridepublic void sayHelloMq(String message) {redisUtils.buriedPointCount("sayHelloMqSend");log.info("record the message: {}", message);// 获取下一个Event槽的下标long sequence = messageModelRingBuffer.next();try {// 给Event填充数据MessageModel event = messageModelRingBuffer.get(sequence);event.setMessage(message);log.info("往消息队列中添加消息:{}", event);} catch (Exception e) {log.error("failed to add event to messageModelRingBuffer for : e = {},{}", e, e.getMessage());} finally {// 发布Event,激活观察者去消费,将sequence传递给改消费者// 注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producermessageModelRingBuffer.publish(sequence);}}}

Spring boot 整合disruptor相关推荐

  1. spring boot整合spring security笔记

    最近自己做了一个小项目,正在进行springboot和spring Security的整合,有一丢丢的感悟,在这里分享一下: 首先,spring boot整合spring security最好是使用T ...

  2. RabbitMQ使用及与spring boot整合

    1.MQ 消息队列(Message Queue,简称MQ)--应用程序和应用程序之间的通信方法 应用:不同进程Process/线程Thread之间通信 比较流行的中间件: ActiveMQ Rabbi ...

  3. Spring Boot 教程(三): Spring Boot 整合Mybatis

    教程简介 本项目内容为Spring Boot教程样例.目的是通过学习本系列教程,读者可以从0到1掌握spring boot的知识,并且可以运用到项目中.如您觉得该项目对您有用,欢迎点击收藏和点赞按钮, ...

  4. 五、spring boot整合mybatis-plus

    spring boot整合mybatis-plus 简介 mybatis 增强工具包,简化 CRUD 操作. 文档 http://mp.baomidou.com http://mybatis.plus ...

  5. spring boot 整合mybatis 无法输出sql的问题

    使用spring boot整合mybatis,测试功能的时候,遇到到了sql问题,想要从日志上看哪里错了,但是怎么都无法输出执行的sql,我使用的是log4j2,百度了一下,很多博客都说,加上下面的日 ...

  6. Spring boot 整合 Mybatis 实现增删改查(MyEclipse版)

    1.首先搭建好一个Spring boot 程序,编写好启动类. 启动类代码如下: @SpringBootApplication public class Start {public static vo ...

  7. spring boot 系列之四:spring boot 整合JPA

    上一篇我们讲了spring boot 整合JdbcTemplate来进行数据的持久化, 这篇我们来说下怎么通过spring boot 整合JPA来实现数据的持久化. 一.代码实现 修改pom,引入依赖 ...

  8. freemarker ftl模板_Spring Boot2 系列教程(十)Spring Boot 整合 Freemarker

    今天来聊聊 Spring Boot 整合 Freemarker. Freemarker 简介 这是一个相当老牌的开源的免费的模版引擎.通过 Freemarker 模版,我们可以将数据渲染成 HTML ...

  9. springboot整合hibernate_峰哥说技术系列-17 .Spring Boot 整合 Spring Data JPA

    今日份主题 Spring Boot 整合 Spring Data JPA JPA(Java Persistence API)是用于对象持久化的 API,是Java EE 5.0 平台标准的 ORM 规 ...

最新文章

  1. 【Atcoder】ARC083 D - Restoring Road Network
  2. 百万用户规模的系统如何扩展
  3. Java构造方法以及重载
  4. springmvc环境搭建以及常见问题解决
  5. Informix onstat 常用的监控选项解释
  6. SpellGCN: Incorporating Phonological and Visual Similarities intoLanguage Models
  7. php复姓怎么排序,按姓氏笔画排名怎么排列了
  8. Spark的下一代引擎-Project Tungsten启示录
  9. 在线答题系统(小型HTTP服务器)
  10. theisle服务器信息设置,theisle服务器diy
  11. gdi与gdi+绘图效率_.NET和GDI +进行绘图[第1部分:基础知识]
  12. AI 如何识别西瓜和冬瓜?
  13. 一个好的浏览器是多么重要,强烈推荐一个好用的浏览器.........
  14. 数字化时代,如何推动实体经济和数字经济的融合
  15. 单目标跟踪——【数据集基准】RGB数据集OTB / NFS / TrackingNet / LaSOT / GOT-10k / UAV123 / VOT 简介
  16. Linux vmstat 命令详解
  17. 频率在电子领域内,频率是一种最基本的参数,并与其他许多电参量的测量方案和测量结果都有着十分密切的关系。由于频率信号抗干扰能力强、易于传输,可以获得较高的测量精度。因此,频率的测量就显得尤为重要,测频方
  18. coding期间遇到的bug记录
  19. Linphone分析 1_初始化
  20. 穿越火线河北一区服务器位置,cf北方大区属于哪个区(穿越火线合区列表)

热门文章

  1. 蓝牙BLE实用教程(转载)
  2. VxWorks网络编程
  3. 考研日记--8.7--英语要开始啦
  4. 《Android移动应用基础教程》(Android Studio)(第二版)黑马教程 课后题答案第一章
  5. Python-写个gif图片生成器(斗图小神器)
  6. AEM学习:Component(五)
  7. 征文 | 一个奋斗者的学习之路
  8. Upload-Labs-Pass-07
  9. linux 串口波特率失败,串口上不常见的波特率 - Linux
  10. Testbench Hierarchy