深入学习java非阻塞IO
深入学习java非阻塞IO
三大组件:
1. Channel:数据通道双向通道
(1)常见的channel:
- FileChannel:文件传输通道
- DatagramChannel:UDP传输通道
- SocketChannel:TCP传输通道(客户端和服务器端都能用)
- ServerSocketChannel: TCP传输通道(专用于服务器)
2. Buffer:缓冲区用来暂存从channel获取的数据
入门代码:
/*** FileChannel* 文件通道*/public static void fileChannel() throws Exception {RandomAccessFile randomAccessFile = new RandomAccessFile("E://nio.txt","rw");FileChannel fileChannel = randomAccessFile.getChannel();//建立通道ByteBuffer byteBuffer = ByteBuffer.allocate(1024); //创建buffer申请一块内存int len;while((len=fileChannel.read(byteBuffer))!=-1)//循环将数据从channel读出写入buffer{byteBuffer.flip();//切换到读模式byte[] bytes = new byte[1024];byteBuffer.get(bytes, 0, len);System.out.println(new String(bytes,0,bytes.length));byteBuffer.compact();//切换到写模式}}
(1)常见的buffer:
ByteBuffer:抽象类\以字节为单位缓存数据
实现类:
MappedByteBuffer、DirectByteBuffer(堆外内存直接系统中申请一块空间)、HeapByteBuffer
(2)ByteBuffer结构:
属性:
- Capacity:容量
- Position:指针位置
- Limit:读写入限制
(3)分配ByteBuffer内存两种:
- ByteBuffer.allocate(1024) :类型HeapByteBuffer -java堆内存,读写效率低,受到垃圾回收影响,当虚拟机内存不足时会发生内存整理压缩,堆内存中数据有一次拷贝搬迁。
- ByteBuffer.allocateDirect(1024):类型DirectByteBuffer - 直接内存,读写效率高(少一次拷贝)使用的是系统内存不会受到垃圾回收影响,缺点系统内存分配的比较慢需要调用操作系统进行分配,使用完不释放会造成内存泄漏。
(4)向buffer中写入数据:channel.read(buf) 或者buffer.put(byte)
(5)从buffer中读数据:channel.write(buf)或者buffer.get(),注意get每次读一个字节,该方法会让position读指针向后走无法重复读。
Buffer.mark():标记position位置
Buffer.reset():从mark标记位置重新读
(6)如果想重复读数据:
- 调用rewind方法将position重置为0
- 调用get(int i)方法获取索引i的内容,他不会移动指针。
(7)字符串转bytebuffer:
ByteBuffer byteBuffer= StandardCharsets.UTF_8.encode("netty");
(8)ByteBuffer转字符串
String s = StandardCharsets.UTF_8.decode(byteBuffer).toString();
(9)粘包、半包
- 粘包:将数据组装到一起发送,优点效率高。
- 半包:服务器缓冲区大小影响采用半包。
处理粘包/*** 第一次传入 hellow \nNio* 第二次传入 2022-0221\n* 处理成 {* hellow* Nio2022-02-21* }** 处理粘包*/public static void tickPackage(String data){ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode(data);for(int i=0;i<byteBuffer.limit();i++){if(byteBuffer.get(i)=='\n'){int len=i+1-byteBuffer.position();ByteBuffer allocate = ByteBuffer.allocate(len);//动态分配根据读到的数据for(int j=0;j<len;j++){allocate.put(byteBuffer.get());}}}byteBuffer.compact();//把剩余的与下次传入的数据进行合并}
(10)分散读与集中写
/***分散读取* @throws Exception*/public static void fileChannelScatteringReads()throws Exception{RandomAccessFile randomAccessFile = new RandomAccessFile("E://nio.txt", "rw"); // HellowNioFileChannel channel = randomAccessFile.getChannel();ByteBuffer buffer1 = ByteBuffer.allocate(6);//HellowByteBuffer buffer2 = ByteBuffer.allocate(3);//Niochannel.read(new ByteBuffer[]{buffer1,buffer2});buffer1.flip();buffer2.flip();byte[] bytes1 = new byte[6];byte[] bytes2 = new byte[3];buffer1.get(bytes1,0,6);buffer2.get(bytes2,0,3);String s1 = new String(bytes1, "UTF-8");String s2 = new String(bytes2, "UTF-8");}/*** 集中写* @throws Exception*/public static void fileChannelGatheringWrites()throws Exception{String content1="HellowNio";String content2="HellowNio";FileChannel channel = new FileOutputStream("E://nio.txt").getChannel();ByteBuffer buffer1 = StandardCharsets.UTF_8.encode(content1);ByteBuffer buffer2 = StandardCharsets.UTF_8.encode(content2);channel.write(new ByteBuffer[]{buffer1,buffer2});}
(11)强制写入:
- 操作系统处于性能考虑,会将数据缓存,不是立即写入磁盘,可以调用force(true)方法将文件内容和元数据(文件的权限信息)立即写入磁盘
(12)传输数据:channel.transferTo (底层使用零拷贝)
/*** channel.transferTo* 从一个channel中传输到另一个channel*/public static void channelCopy() throws Exception{FileChannel inputFileChannel = new FileInputStream("E://nio.txt").getChannel();FileChannel outFileChannel = new FileOutputStream("E://copynio.txt").getChannel();inputFileChannel.transferTo(0,inputFileChannel.size(),outFileChannel);outFileChannel.close();inputFileChannel.close();}/*** 文件大于2g* channel.transferTo* 从一个channel中传输到另一个channel*/public static void channelCopyGl2g() throws Exception{FileChannel inputFileChannel = new FileInputStream("E://nio.txt").getChannel();FileChannel outFileChannel = new FileOutputStream("E://copynio.txt").getChannel();long size = inputFileChannel.size();long surplus=size;while(surplus>0){surplus-=inputFileChannel.transferTo(size-surplus, surplus, outFileChannel);}outFileChannel.close();inputFileChannel.close();}
(13)Path与paths:
- Path用来表示文件路径,Paths工具类,用来获取path实例
Path path = Paths.get("nio.txt");//相对路径
Path path1 = Paths.get("d:\\nio.txt");//绝对路径 代表d:\nio.txt
Path path2 = Paths.get("d:/nio.txt");//绝对路径 代表d:\nio.txt
Path path3 = Paths.get("d:\\nio", "file");//代表d:\nio\file
(14)Files
- 文件拷贝
Files.copy(source,target);//如果文件存在就会复制失败
Files.copy(source,target,StandardCopyOption.REPLACE_EXISTING)//存在就覆盖
Files.move(source,target,StandardCopyOption.ATOMIC_MOVE)//移动文件保证文件移动原子性
Files.delete(target)//删除文件或目录,不存在抛异常,如果删除的是目录目录中有内容会抛异常
- 文件夹操作api
匿名内部类使用外部的整型进行累加需要使用AtomicInteger
/*** 文件夹遍历*/Files.walkFileTree(Paths.get("文件起始目录"), new SimpleFileVisitor<Path>() {//进入文件夹之前操作@Overridepublic FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {return super.preVisitDirectory(dir,attrs);}//遍历到文件时的方法@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {return super.visitFile(file,attrs);}//遍历文件失败时方法@Overridepublic FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {return super.visitFileFailed(file,exc);}//从文件夹出来以后的操作@Overridepublic FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {return super.postVisitDirectory(dir,exc);}});
- 多级目录文件拷贝
/*** 拷贝多级目录文件* @param args* @throws Exception*/public static void main(String[] args)throws Exception{String source="d:/nio";String taget="e:/nio";Files.walk(Paths.get(source)).forEach(path -> {try{Path finalPath = Paths.get(path.toString().replace(source, taget));if(Files.isDirectory(path))//判断是不是目录{Files.createDirectories(finalPath);}else if(Files.isRegularFile(path)){Files.copy(path,finalPath);}}catch (Exception e){}});}
3. Selector:配合一个线程管理多个channel,获取这些channel上发生的事件,这些channel工作在非阻塞模式下,适合连接数多,但流量低的场景。
调用selector的select()会阻塞直到channel发生读写就绪事件,这些事件发生,select方法就会返回这些事件交由thread来处理。
- 创建selector:
Selector selector = Selector.open(); - 将channel注册到selector中:
SelectionKey selectorkey= serverSocketChannel.register(selector, 0, null);//第二个参数时关注事件,0代表不关注任何事件。 - 指定selectionKey所关注事件:
selector.interestOps(SelectionKey.OP_ACCEPT)
通过SelectionKey 可以获取是哪个channel的事件 - 查询是否有事件发生:
selector.select();//没有事件就阻塞 - 获取所有事件集合
Set selectionKeys = selector.selectedKeys();
事件类型:
accept:有链接请求时触发
connect:客户端链接建立后触发的事件
read:可读事件
write:可写事件
Select何时不阻塞:
1.事件发生时
- 客户端发送请求时,会触发accept事件
- 客户端发送数据过来,客户端正常、异常关闭、都会触发read事件,另外如果发送的数据大于buffer缓冲区,会触发多次读取事件。
- 在linux bug发生时
2.调用selector.wakeup()
3.调用selector.close()
5.Selector所在线程interrupt
注意:selector.select()如果没有事件就会阻塞有事件才会执行,如果有事件没有去处理,下次还会让处理不会阻塞,可以调用取消。事件发生后要么处理要么取消,不能不操作。
如果不想处理事件可以调用selectionKey.cancel();
注:客户端正常断开和异常断开都会触发一个读事件,正常断开从通道中读取的值是-1,异常断开是读数据抛出IO异常,正常断开和异常断开都需要进行取消事件处理《cancel》
代码:
public static void serverSocketChannel() throws Exception{Selector selector = Selector.open();//建立选择器ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();//创建serverSocketChannelserverSocketChannel.bind(new InetSocketAddress(8888));//绑定端口serverSocketChannel.configureBlocking(false);//设置非阻塞serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT,null);//将通道注册到selector中,设置链接感兴趣while (selector.select()>0)//如果有事件发生{Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while(iterator.hasNext()){if (iterator.next().isAcceptable()) //判断是不是链接事件{ServerSocketChannel channel = (ServerSocketChannel)iterator.next().channel();SocketChannel accept = channel.accept();accept.configureBlocking(false);accept.register(selector,SelectionKey.OP_ACCEPT,null);}else if(iterator.next().isReadable()){//判断是不是读事件try{ByteBuffer byteBuffer = ByteBuffer.allocate(1024);SocketChannel socketChannel = (SocketChannel)iterator.next().channel();int len;while((len=socketChannel.read(byteBuffer))>0){byteBuffer.flip();String data = StandardCharsets.UTF_8.decode(byteBuffer).toString();System.out.println(data);byteBuffer.clear();}if(len==-1)//正常断开连接{iterator.next().cancel();//取消事件}}catch (IOException e){iterator.next().cancel();//取消事件,客户端异常断开链接会有异常引发一个读事件,不进行处理会一直存在这个事件}}iterator.remove();}}}
拆包自动扩容:
接收到的内容如果与bytebuffer设置的长度不相等会造成数据缺失,处理方式:
先接收一个数据长度,然后根据长度创建bytebuffer大小。
- 典型http2.0 采用ltv l代表长度t代表类型v代表数据。
- Http1.0 采用tlv
如果接收的数据长度大于bytebuffer设置的长度会触发两次读事件,要保证bytebuffer不能是局部变量,两次读事件要用同一个buffer否则数据丢失。
/*** 读消息时使用\n拆包* @param byteBuffer*/public static void split(ByteBuffer byteBuffer){byteBuffer.flip();for(int i=0;i<byteBuffer.limit();i++){if(byteBuffer.get(i)=='\n'){int len=i+1-byteBuffer.position();ByteBuffer allocate = ByteBuffer.allocate(len);for(int j=0;j<len;j++){allocate.put(byteBuffer.get());}}}byteBuffer.compact();}/*** 服务端* @throws Exception*/public static void serverSocketChannel() throws Exception{//创建selector选择器Selector selector = Selector.open();//创建通道ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();//绑定端口serverSocketChannel.bind(new InetSocketAddress(8888));//设置为非阻塞serverSocketChannel.configureBlocking(false);//将通道注册到selector中,指定感兴趣事件serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT,null);//遍历selector查看是否有事件发生while(selector.select()>0)//一直阻塞直到由事件发生{//获取通道集合Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while(iterator.hasNext()){iterator.remove();//移除防止重复处理SelectionKey skey = iterator.next();//判断是否连接事件if(skey.isAcceptable()){ServerSocketChannel channel = (ServerSocketChannel)skey.channel();SocketChannel accept = channel.accept();//处理事件accept.configureBlocking(false); //设置为非阻塞ByteBuffer buffer = ByteBuffer.allocate(1024);accept.register(selector,SelectionKey.OP_READ,buffer);//将bytebuffer与SelectionKey进行关联,每一个都会有自己的buffer}else if(skey.isReadable())//读事件{try{SocketChannel channel = (SocketChannel)skey.channel();ByteBuffer buffer = (ByteBuffer)skey.attachment();//从SelectionKey中拿到附件中buffer,生命周期与SelectionKey一致int read = channel.read(buffer);//处理事件if(read==-1)//客户端正常关闭读到的是-1{skey.cancel();//取消事件}else{split(buffer);//判断如果position与limit一致证明没有读到完整数据需要扩容if(buffer.position()==buffer.limit()){ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);//扩容原来的两倍buffer.flip();//切换读模式newBuffer.put(buffer);//将旧内容拷贝到扩容后的缓冲区中skey.attach(newBuffer);//替换SelectionKey上原有缓冲区}}}catch (IOException e){//强制关闭客户端会接收到一个read事件需要处理skey.cancel();//取消事件}}}}}
关注写事件(通道满的情况下有可能一次写不完,需要设置一个写感兴趣,多次将数据写出):
/*** 服务端写事件* @throws Exception*/public static void serverSocketChannelWrite() throws Exception{//创建selectorSelector selector = Selector.open();//创建serverSocketChannelServerSocketChannel serverSocketChannel = ServerSocketChannel.open();//绑定端口serverSocketChannel.bind(new InetSocketAddress(8000));//设置非阻塞serverSocketChannel.configureBlocking(false);//将通道注册到selector中serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT,null);//从selector中获取selectorswhile(selector.select()>0){//获取selectionKeySet<SelectionKey> selectionKeys = selector.selectedKeys();//迭代selectionKeys判断是连接事件还是读事件Iterator<SelectionKey> skey = selectionKeys.iterator();while(skey.hasNext()){skey.remove();//移除防止重复处理SelectionKey key = skey.next();//判断是不是连接事件if(key.isAcceptable())//客户端连接事件{ServerSocketChannel channel = (ServerSocketChannel)key.channel();SocketChannel accept = channel.accept();accept.configureBlocking(false);accept.register(selector,SelectionKey.OP_READ);//设置读感兴趣//连接成功向客户端发送数据String msg="hellowClient";ByteBuffer buffer= StandardCharsets.UTF_8.encode(msg);accept.write(buffer);if(buffer.hasRemaining())//如果还有数据没发送完{accept.register(selector,key.interestOps()+SelectionKey.OP_WRITE,buffer);//将原本的读与当前写都注册进去,把未读完的数据挂载到附件上}}else if(key.isWritable())//如果是写事件{SocketChannel channel = (SocketChannel)key.channel();ByteBUffer buffer=(ByteBuffer)key.attachment()channel.write(buffer);//将剩余数据接续写出if(!buffer.hasRemaining)//如果没有内容了将绑定的附件置空节省内存{key.attach(null);key.interestOps(key.interestOps()-SelectionKey.OP_WRITE);//取消对写的事件关注}}}}}
零拷贝
零拷贝指的是内核态和用户态没有拷贝的操作。
优点:
1.减少用户态和内核态的切换
2.不利用cpu计算减少cpu缓存伪共享
3.适合小文件传输
发展过程:
::: hljs-center
第一阶段
:::
::: hljs-center
第二阶段
:::
::: hljs-center
第三阶段
:::
::: hljs-center
Netty
:::
Netty:是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端。
入门demo 服务端
public static void main(String[] args) {//自定义处理类ChannelInboundHandlerAdapter channelInboundHandlerAdapter = new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(msg);}};//初始化器,代表和客户端进行读写数据的通道(负责添加别的handler)ChannelInitializer<NioSocketChannel> channelInitializer = new ChannelInitializer<NioSocketChannel>() {//连接建立后触发(netty自动处理accept事件调用initChannel)@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new StringDecoder());//添加处理器类,ByteBuf转成字符串处理器nioSocketChannel.pipeline().addLast(channelInboundHandlerAdapter);//处理自定义的处理器类}};ServerBootstrap serverBootstrap = new ServerBootstrap();//服务器端启动器,负责组装netty组件serverBootstrap.group(new NioEventLoopGroup());//添加了一个事件组组件,包含一个BossenvenLoop 和 WorkerEventLoopserverBootstrap.channel(NioServerSocketChannel.class);//选择服务器的ServerSocketChannel实现serverBootstrap.childHandler(channelInitializer);//添加处理器serverBootstrap.bind(8888);//绑定监听端口}
入门demo 客户端
public static void main(String[] args) throws InterruptedException {//通道初始化器,增加处理器ChannelInitializer<NioSocketChannel> channelInitializer = new ChannelInitializer<NioSocketChannel>() {//连接建立后会被调用@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new StringEncoder());//添加处理器类,字符串转ByteBuf处理器}};Bootstrap bootstrap = new Bootstrap();//启动类 负责组装netty组件bootstrap.group(new NioEventLoopGroup());//BossEventLoop,WorkerEvenLoop(selector,thread),group 组bootstrap.channel(NioSocketChannel.class);//选择客户端的channel实现bootstrap.handler(channelInitializer);//添加处理类ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("localhost", 8888));//建立连接channelFuture.sync();//阻塞方法,连接建立后才会向下执行Channel channel = channelFuture.channel();//服务端和客户端之间建立的连接对象channel.writeAndFlush("hellow Netty");//写出数据,注意收发数据都会走handler 也就是initChannel内部的处理器}
建立正确的观念
(1)把channel理解为数据的通道。
(2)把msg理解为流动的数据,最开始是ByteBuf,但经过pipeline的加工,会变成其他类型对象,最后输出变成ByteBuf。
(3)把handler理解为数据的处理工序:
- 工序有多道,合在一起就是pipeline,pipeline负责发布事件(读、读取完成…)传播给每个handler,handler对自己感兴趣的事件进行处理(重写相应的处理方法)
- handler分Inbound和Outbound两类
(4)把eventloop理解为处理数据工人:
- 工人可以管理多个channel的io操作,并且一旦工人负责某个channel,就负责到底(绑定)
- 工人即可以执行io操作,也可以进行任务处理,每个工人有任务队列,队列里可以堆放多个channel的待处理任务,任务分为普通任务、定时任务
- 工人按照pipeline顺序,一次按照handler规划(代码)处理数据,可以为每道工序指定不同的工人。
深入学习java非阻塞IO相关推荐
- Java 非阻塞 IO 和异步 IO
转载自 Java 非阻塞 IO 和异步 IO 上一篇文章介绍了 Java NIO 中 Buffer.Channel 和 Selector 的基本操作,主要是一些接口操作,比较简单. 本文将介绍非阻塞 ...
- java非阻塞io流_阻塞式和非阻塞io流初认识
1 什么是阻塞式和非阻塞式? 阻塞式IO:IO即input/output,阻塞式IO指的是"一旦输入/输出工作没有完成,则程序阻塞,直到输入/输出工作完成".在目前,我们从书本上 ...
- java epoll select_Java 非阻塞 IO 和异步 IO
点击上方 Java后端,选择 设为星标 优质文章,及时送达 作者 | HongJie 链接 | javadoop.com/post/nio-and-aio 本文将介绍非阻塞 IO 和异步 IO,也就是 ...
- java io nio pio_Netty之BIO(同步阻塞IO)、PIO(偽異步阻塞IO)、NIO(異步非阻塞IO)、AIO(異步非阻塞IO)、Netty...
學習書籍:Netty權威指南 多種IO方式的比較: 1.BIO(同步阻塞IO) 使用ServerSocket綁定IP地址和監聽端口,客戶端發起連接,通過三次握手建立連接,用socket來進行通信,通過 ...
- Java中阻塞IO和非阻塞IO
一.阻塞IO模型 最传统的一种IO模型,即在读写数据过程中会发生阻塞现象.当用户线程发出IO请求之后,内核回去看数据是否就绪,如果没有就绪就会等待数据就绪,而用户线程就会处于阻塞状态,用户线程交出CP ...
- java 非阻塞_Java之NIO(非阻塞IO)
[1]NIO的与IO的区别: 总的来说java 中的IO 和NIO的区别主要有3点: 1)IO是面向流的,NIO是面向缓冲的: 2)IO是阻塞的,NIO是非阻塞的: 3)IO是单线程的,NIO 是通过 ...
- java实现非阻塞io原理_JAVA 非阻塞IO原理
1. 基本概念 IO是主存和外部设备(硬盘.终端和网络等)传输数据的过程.IO是操作系统的底层功能实现,底层通过I/O指令进行完成. 2.nio简介 nio是java New IO的简称(并不只是指 ...
- linux驱动学习笔记(三)阻塞与非阻塞IO
Linux驱动中阻塞与非阻塞IO 前言 阻塞 非阻塞 一.等待队列 1.等待队列头 2.等待队列 模板 二.轮询 模板 总结 前言 阻塞和非阻塞io是两种不同的设备访问方式. 阻塞 阻塞IO表示在执行 ...
- 同步、异步、阻塞、非阻塞IO
在网上看到一篇对这四个概念比较清晰的分析的文章:http://blog.csdn.net/historyasamirror/article/details/5778378.结合自己一直在学习Java ...
最新文章
- 比尔盖茨NEJM发文:新冠肺炎是百年一遇的流行病!全世界应该如何应对?
- 计算机操作原理进程调度算法---先来先服务,短进程优先(C语言)
- Top-1 Error Top-5 Error
- JUC系列(九)| ThreadPool 线程池
- [分际]如何使用EVENTLOG类操作日志
- CPU 可以跑多快?地球到火星的距离告诉你!
- 专业FTP服务器Rumpus for Mac
- special word count
- 【MATLAB】(三)MATLAB在高等数学中的应用
- matlab通过带通滤波器代码,设计一个matlab带通滤波器代码
- maxon电机加速度上不去的原因及解决
- 大数据有十大应用领域,看看你用到了哪个?
- 数据库查询之对含有数字和字母的字段进行排序
- 基于Python3的格雷厄姆 股票估值模型
- oracle中 greatest、east、coalesce
- java标准输出输入(Scaner类)
- PAT乙级1033 旧键盘打字
- 小明Q2投影仪好用吗?小明Q2和哈趣K1哪个更值得入手?
- 岁月蹉跎,人生几何。
- 字符串快速变dict字典key