传统I/O模式

  • 我们之前的的I/O文章中有过如下这种图:

  • 如上模型中,存在的问题

    • 当并发数量很大的时候,会创建大量的线程,占用很大的系统资源
    • 当连接创建后,如果当前线程暂时没有可以读的数据,那么改县城会阻塞在read的操作上,造成资源的浪费
  • 基于以上的问题,大师Doug Lea出了一篇关于分析与构建可伸缩的高性能IO服务的经典文章《Scalable IO in Java》;在文章中Doug Lea通过各个角度,循序渐进的梳理了服务开发中的相关问题,以及在解决问题的过程中服务模型的演变与进化,文章中基于Reactor反应器模式的几种服务模型架构,也被Netty、Mina等大多数高性能IO服务框架所采用,因此阅读这篇文章有助于你更深入了解Netty、Mina等服务框架的编程思想与设计模式。同时Doug Lea 也是java.util.concurrent包的作者,大师当之无愧

Reactor 模式

针对传统I/O的服务模式优化
  • 基于I/O复用模型:多个连接公用一个阻塞的对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有的连接,当某个连接有新的数据需要处理的时候,操作系统在通知应用陈旭,唤醒线程,线程从阻塞态返回,开始处理业务:
  • 基于线程池复用技术实现一个线程池:不用每次来一个连接创建一个线程,我们将连接完成后的业务处理任务分配给线程池处理,一个线程池可以处理多个连接的业务。
  • 如上两点优化如下:

Reactor模式基本设计思路
  • 上图中其实基本描述了Reactor的基本思想,我们用如下图细化

  • 入上图:

    • Reactor模式,通过一个活着多个输入同时传递给服务处理器的模式(基于事件的模式)
    • 服务器端程序处理传入的多个请求,并且将它们同步分派到对应的处理线程,因此,Reactor模式也可以叫Dispatcher模式
    • Readtor模式使用I/O复用监听事件,收到事件通知后,分发给某个线程(进程),这种设计就是网络服务高并发处理的关键。
Reactor模式中核心组成
  • Reactor:Reactor模式在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来处理对应的I/O事件。就类似Nginx的转发,他负责接收来自客户端的请求并且将请求转发到对应的不同服务来处理
  • Handlers:处理程序执行I/O事件要完成的实际事件,类似于Nginx转发之后给对应的接口的具体实现。Reactor通过一定的规则来找到对应的处理逻辑来响应I/O事件,处理程序执行非阻塞操作。

Reactor模式分类

  • 根据Reactor的数量和处理资源线程池的数量不同,有3种实现模式

    • 单Reactor单线程模式
    • 单Reactor多线程模式
    • 主从Reactor多线程模式
单Reactor单线程模式
  • 原理图如下下:

  • 用如下NIO简单群聊系统来验证单Reactor单线程模式

  • Client 端代码

/*** 群聊客户端* @author liaojiamin* @Date:Created in 16:03 2022/8/22*/
public class GroupChatClient {private SocketChannel socketChannel;private Selector selector;private Integer port = 6667;private String HOST = "127.0.0.1";private String userName;/*** 初始化* */public GroupChatClient() throws IOException {socketChannel = SocketChannel.open(new InetSocketAddress(HOST, port));selector = Selector.open();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);userName = socketChannel.getLocalAddress().toString().substring(1);System.out.println(userName + " is ok");}public void sendInfo(String info){info = userName + " say "+ info;try {socketChannel.write(ByteBuffer.wrap(info.getBytes()));} catch (IOException e) {e.printStackTrace();}}public void readInfo(){try{int readChannels = selector.select();if(readChannels > 0){Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()){SelectionKey selectionKey = iterator.next();if(selectionKey.isReadable()){SocketChannel readSocketChannel = (SocketChannel) selectionKey.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(1024);readSocketChannel.read(byteBuffer);String msg = new String(byteBuffer.array());System.out.println("读取消息: "+ msg.trim());}iterator.remove();}}else {//               System.out.println("没有可用通道.....");}}catch (Exception e){e.printStackTrace();}}public static void main(String[] args) throws IOException {GroupChatClient groupChatClient = new GroupChatClient();new Thread(){@Overridepublic void run(){while (true){groupChatClient.readInfo();try {Thread.currentThread().sleep(3000);}catch (Exception e){e.printStackTrace();}}}}.start();Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()){String s = scanner.next();groupChatClient.sendInfo(s);}}
}
  • Service 端代码
/*** 群聊服务器* @author liaojiamin* @Date:Created in 15:49 2022/8/22*/
public class GroupChatService {private Selector selector;private ServerSocketChannel serverSocketChannel;private Integer port = 6667;/*** 数据初始化*/public GroupChatService() {try {selector = Selector.open();serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.socket().bind(new InetSocketAddress("127.0.0.1", port));serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}public void listen() {System.out.println("listen thread name "+ Thread.currentThread().getName());try {while (true) {int count = selector.select(2000);if (count > 0) {Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();if (selectionKey.isAcceptable()) {SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);System.out.println(socketChannel.getRemoteAddress() + " 上线 ");}if (selectionKey.isReadable()) {readData(selectionKey);}iterator.remove();}}else {//                    System.out.println("等待....");}}} catch (Exception e) {e.printStackTrace();} finally {System.out.println("关闭各种管道");}}/*** 读取数据*/public void readData(SelectionKey key) {SocketChannel socketChannel = null;try {socketChannel = (SocketChannel) key.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int count = socketChannel.read(byteBuffer);if (count > 0) {String msg = new String(byteBuffer.array());System.out.println("from 客户端: " + msg);sendInfoToOtherClients(msg, socketChannel);}} catch (IOException e) {try {System.out.println(socketChannel.getRemoteAddress()+ "离线了....");key.cancel();socketChannel.close();}catch (Exception e1){e1.printStackTrace();}e.printStackTrace();}}/*** 消息转发到其他客户端*/public void sendInfoToOtherClients(String msg, SocketChannel socketChannel) {try {System.out.println("消息转发中.....");System.out.println("sendInfoToOtherClients thread name "+ Thread.currentThread().getName());for (SelectionKey selectionKey : selector.keys()) {Channel targetChannel = selectionKey.channel();if (targetChannel instanceof SocketChannel && targetChannel != socketChannel) {SocketChannel targetSocketChannel = (SocketChannel) targetChannel;targetSocketChannel.write(ByteBuffer.wrap(msg.getBytes()));}}} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) {GroupChatService groupChatService = new GroupChatService();groupChatService.listen();}
}
  • 案例说明:

    • Select是NIO中标准网络编程API,可以实现应用陈旭通过一个阻塞对象监听多路连接请求
    • Reactor对象通过Select监控客户端请求事件,收到事件后通过dispatch进行分发
    • 如果是建立连接请求的事件,那么直接有Acceptor通过Accept处理连接请求,然后传家一个Handler对象用这个新创建的Handler来完成连接之后的业务
    • 如果不是建立连接事件,那么Reactor会分发调用连接对应的Handler来响应
    • Handler会完成Read ---- 业务处理 ---- Send的完整业务流程
  • 优缺点分析

    • 优点:模型简单,没有多线程,进程通讯,没有线程竞争问题,全部在一个线程中完成,编程难度低
    • 缺点:明显的性能问题,只有一个线程,无法完全发挥多核CPU的性能。Handler在处理某个连接上的业务时候,整个进程无法处理其他连接事件,很容易导致性能瓶颈。
    • 缺点:可靠性问题,线程意外终止,或者线程进入死循环,会导致整个系统通讯模块不可用,不能接受和处理外部消息,造成节点故障
    • 使用场景:客户端数量少,业务处理非常快,比如Redis的业务处理事件复杂度O(1)

单Reactor 多线程模型

  • 单Reactor多线程图示说明

    • Reactor对象通过Select监控客户端请求事件,收到事件后,通过Dispatch进行分发
    • 如果建立连接请求后,如果是Acceptor连接事件那么我们通过accept处理这个请求,然后创建一个Handler对象处理完成连接后的各种事件
    • 如果不是连接请求,那么由Reactor分发调用连接对应的handler来处理
    • handler之负责响应事件,不做具体的业务处理,通过read读取数据后,会分发给后面的worker线程池的某个线程处理业务
    • worker线程池会分配独立的线程完成真正的业务,并且将结果返回给handler
    • handler收到响应后,通过send将结果返回给client
  • 方案优缺点

    • 优点:可以充分利用CPU多核处理能力
    • 缺点:多线程数据共享范问比较复杂,Reactor处理所有的事件的监听和响应,在单线程运行,在高并发场景容易出现性能瓶颈

主从Reactor多线程模型

  • 针对单Reactor多线程模型中,Reactor在单线程中运行,高并发下这部分容易成为性能瓶颈,可以让Reactor在多线程中运行。

  • 上图方案说明:

    • Reactor主线程MainReactor对象通过select监听连接事件,收到事件后,通过Acceptor处理连接事件
    • 当Acceptor处理完连接事件后,MainReactor将连接分配给SubReactor
    • subReactor将连接加入到连接队列进行监听,并且川江handler进行各种事件处理
    • 当有新的事件发生时候,subReactor就会调用对应的handler处理
    • handler通过read读取数据,分发给后面的worker线程处理
    • worker线程池分配度的worker线程进行业务处理,并且返回结果
    • handler收到响应的结果后,在通过,send将结果返回给client
    • Reactor主线程可以对应多个Reactor子线程,即MainReactor可以关联多个SunReactor
Scalable IO in Java对Multiple Reactors原理图解

  • Scalable IO in Java 原文链接

  • 方案优缺点说明:

    • 优点:父线程于子线程的数据交互简单职责明确,父线程只需要接收新链接,子线程完成后续的业务处理
    • 优点:父线程与子线程的数据交互简单,Reactor主线程只需要把新链接传给子线程,子线程无需返回数据
    • 缺点:编程复杂度高
    • 结合实例:这种模型在许多项目中广泛使用,包括Nginx主从Reactor多进程模型,Memcached主从多线程,Netty主从多线程模型的支持。
Reactor模式总结
  • 单Reactor单线程,酒店的门童,服务员是同一个人,全程为客户服务
  • 单Reactor多线程,1个门童,多个服务员,门童之负责接待,转给服务员
  • 主从Reactor多线程,多个门童,多个服务员
Reactor模式优点
  • 响应快,不必为单个同步事件所阻塞,虽然Reactor本身是同步的
  • 可以最大程度避免复杂多线程同步问题,并且避免了多线程/进程的切换开销
  • 扩展性好,可以方便的通过增加Reactor实例个数来充分利用CPU资源
  • 复用新好,Reactor模型本身与具体事件处理逻辑无关,具有很高的复用性。

Java I/O中的Reactor模式相关推荐

  1. 高性能IO设计中的Reactor模式与Proactor模式

    为什么80%的码农都做不了架构师?>>>    在高性能的IO设计中,有两个比较著名的模式Reactor和Proactor模式,其中Reactor模式用于同步I/O,而Proacto ...

  2. Java多线程编程中Future模式的详解

    转载自 https://www.cnblogs.com/winkey4986/p/6203225.html Java多线程编程中,常用的多线程设计模式包括:Future模式.Master-Worker ...

  3. Reactor模式与Proactor模式

    博主一脚刚踏进分布式的大门(看<分布式Java应用>,如果大家有啥推荐的书欢迎留言~),发现书中对NIO采用的Reactor模式.AIO采用的Proactor模式一笔带过,好奇心趋势我找了 ...

  4. Reactor模式详解及 三种模式演变

    无论是C++还是Java编写的网络框架,大多数都是基于Reactor模式进行设计和开发,Reactor模式基于事件驱动,特别适合处理海量的I/O事件. 什么是Reactor模式 要回答这个问题,首先当 ...

  5. 【IO】IO设计模式:TPR模式,Reactor模式、Proactor模式

    1.TPR模式 传统的 Server/Client 模式会基于TPR(Thread per Request),服务器会为每个客户端请求建立一个线程,由该线程单独负责处理一个客户请求. 这种模式虽然处理 ...

  6. Java进阶(五)Java I/O模型从BIO到NIO和Reactor模式

    本文介绍了Java中的四种I/O模型,同步阻塞,同步非阻塞,多路复用,异步阻塞.同时将NIO和BIO进行了对比,并详细分析了基于NIO的Reactor模式,包括经典单线程模型以及多线程模式和多Reac ...

  7. Java I/O模型从BIO到NIO和Reactor模式

    本文转发自技术世界,原文链接 http://www.jasongj.com/java/nio_reactor/ 一.Java I/O模型 同步 vs. 异步 同步I/O 每个请求必须逐个地被处理,一个 ...

  8. java NIO和Reactor模式

    Reactor模式和NIO 板桥里人 jdon.com 2002/11/08 本文可看成是对Doug Lea Scalable IO in Java一文的翻译. 当前分布式计算 Web Service ...

  9. java reactor模式例子_JAVA BIO,NIO,Reactor模式总结

    传统同步阻塞I/O(BIO) 在NIO之前编写服务器使用的是同步阻塞I/O(Blocking I/O).下面是一个典型的线程池客服端服务器示例代码,这段代码在连接数急剧上升的情况下,这个服务器代码就会 ...

最新文章

  1. android 控件随手指移动_液体流动控件,隔壁产品都馋哭了
  2. js时间格式化函数,支持Unix时间戳
  3. CALayer 了解与使用
  4. 【快乐水题】575. 分糖果
  5. microsoft excel 正在等待其他某个应用程序_浅谈应该购买英特尔Mac还是等待购买基于Arm的Mac...
  6. python time模块详解_py 模块之 time模块 讲解②
  7. 做折线图_python的visvis库做折线图(line.py)代码详解
  8. 电影院终于要开门了!一大波搁浅的春节档大片即将上映...
  9. 3C(Computer、Communication、Consumer Electronic)
  10. spark相关原理介绍
  11. python实现Kruskal算法求解加权图中最小生成树问题
  12. [DesignPattern]各自用一句话来概括MVC、MVP、MVVM的差异特点
  13. 我的世界JAVA会支持光追吗_《我的世界》RTX beta版视频体验:仿佛打破了次元壁...
  14. 新的特洛伊木马程序SectopRAT用以控制浏览器会话
  15. 2007年中国网络游戏市场分析及投资咨询报告(上下卷)
  16. h2o api java_H2O框架简介
  17. 用pip安装指定版本的包遇到的坑
  18. iphone 扩容测试软件,给iPhone扩容!闪迪手机U盘测评
  19. 眼见不一定为实!18个神奇的视错觉,看完不相信眼睛系列
  20. 大数据学习内容及方法

热门文章

  1. npm 编译完成后不退出
  2. u盘固定盘符_快速固定U盘盘符的解决方法 pe固定U盘盘符
  3. 关于顺序栈的退栈和入栈操作
  4. 含泪吐槽学C++的血与泪
  5. 黄金3:雨露均沾-不要让你的线程在竞争中被“饿死”
  6. jQuery prepend( ) 方法
  7. 计算机图形学求亮度级别数,计算机图形学复习笔记
  8. php支付功能好写吗,php实现微信支付功能的操作步骤 - 编程语言
  9. 鸿蒙系统包括8x吗,华为荣耀8X可以升级鸿蒙系统吗?
  10. RAC数据库一节点更换HBA卡导致emc存储设备序号变动处理记录