NIO基础

  • 1、三大组件
    • 1.1 Channel & Buffer
    • 1.2 Selector
  • 2.ByteBuffer
    • 2.1ByteBuffer正确使用
    • 2.2 ByteBuffer结构
    • 2.3 ByteBuffer 常用方法
      • 调试工具类
      • 分配空间
      • 向buffer中写入数据
      • 从buffer中读取数据
      • 代码演示
      • get(),get(int i),allocate(),mark(),reset()方法的使用
      • 字符串和ByteBuffer互转
        • 将字符串转化为ByteBuffer
        • 将ByteBuffer转化为字符串
    • 2.4 Scattering Reads(分散读取)
    • 2.5 Gathering Writes (集中写入)
    • 2.6 黏包半包分析
  • 3.文件编程
    • 3.1 FileChannel
    • 3.2 两个channel传输数据
    • 3.3 Path
    • 3.4 Files
      • 3.4.1 统计文件个数,文件夹个数,特殊后缀的文件个数
      • 3.4.2 删除多级目录
      • 3.4.3 拷贝多级目录
  • 4. 网络编程
    • 4.1 阻塞模式
    • 4.2 非阻塞模式
      • 4.2.1 多路复用
    • 4.3 selector
      • 监听Channel事件
      • select()方法何时不阻塞
      • 4.3.1 selector-处理accept
      • 4.3.2 selector-cancel取消事件
      • 4.3.3 selector-处理read事件
      • 4.3.4 selector-处理read事件后要将selectorkey删除
      • 4.3.5 selector-nio处理客户端断开连接
    • 4.4 selector-nio 正确处理消息边界
      • 4.4.1 解决方法
      • 4.4.2 附件 attachment 与 扩容
      • 4.4.3 selector-nio-ByteBuffer的大小分配
      • 4.4.4 nio-selector 写入内容过多
      • 4.4.4 nio-selector 处理写入内容过多的问题
    • 4.5 多线程优化
  • 5、NIO 和 BIO
    • 5.1 stream vs channel
    • 5.2 IO模型
    • 5.3 零拷贝
      • 5.3.1 NIO优化
    • 5.4 AIO 异步IO

码云仓库
码云仓库
码云仓库

1、三大组件

1.1 Channel & Buffer

Channel有一点类似于Stream,它就是读写数据的双向通道,可以从channel将数据读入buffer,也可以将buffer的数据写入channel,而之前的stream要么是输入,要么是输入,channel比stream更加底层。buffer就是内存缓冲区域。

常见的Channel有
FileChannel:文件的数据传输通道
DaragramChannel:UDP的数据传输通道
SocketChannel:TCP的数据传输通道
ServerSocketChannel:专用于服务器的TCP数据传输通道

buffer则常用来缓冲读写数据,常见的buffer有
ByteBuffer:以字节为单位来缓冲数据,它是一个抽象类。
MappedByteBuffer
DierctByteBuffer
HeapByteBuffer
ShortBuffer
IntBuffer
LongBuffer
FloatBuffer
DoubleBuffer
CharBuffer

1.2 Selector

最早的服务器开发思路

多线程版设计
每个线程专管一个连接,相当于餐馆,1000个客人雇佣1000个服务员来服务客人!

缺点:
内存占用高
线程上下文切换成本高:因为线程非常多,需要保存这些线程的状态的数据也非常多!
只适合连接数少的场景

线程池版设计

缺点:
阻塞模式下,线程仅能处理一个socket连接。【线程是服务员,socket是客人,服务员得全程跟着客人,等到客人处理完当前客人的事情后才能去服务下一个客人!】
仅适合短连接场景【为了线程能够处理完一个请求后立即断开,去服务下一个请求】

selector版设计


selector的作用就是配合一个线程来管理多个Channel,获取这些Channel上发生的事件,这些channel工作在非阻塞模式下,不会让线程吊死在一个channel上。适合连接数特别多,但是流量低的场景

调用selector的select()会阻塞直到channel发生了读写就绪事件,这些事件发生,select方法就会返回这些事件交给thread来处理!
【thread类似服务员,channel类似于客人,selector类似于能够监控,能够监控所有客人的需求,一旦客人有需求,它能顾第一时间知道,并且通知服务员前去服务】

2.ByteBuffer

需要引入的依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>Nettt-study</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.39.Final</version></dependency><!-- https://mvnrepository.com/artifact/com.google.code.gson/gson --><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.5</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>19.0</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.3</version></dependency></dependencies></project>

新建一个data.txt文件,用来模拟数据源!

package com.zidu.netty;import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;public class TestByteuffer {public static void main(String[] args){// FileChannel// 1.输入输出流 2.RandomAccessFiletry (FileChannel channel = new FileInputStream("data.txt").getChannel()) {// 准备缓冲区,划分10个字节来作为缓冲区ByteBuffer buffer=ByteBuffer.allocate(10);// 从channel读取数据,向buffer写入channel.read(buffer);// 打印buffer的内容buffer.flip();//切换至读模式while (buffer.hasRemaining()) // 是否还有未读字符{byte b=buffer.get();System.out.println((char)b);}}catch (IOException e) {}}
}最终发现只有10个字符被读取进来,因为我们设置的字符缓冲区大小只有10个字节。


可以看到这个channel有返回值,它的返回值是一个数字,当返回结果为-1时,表示读完了。所以,可以进行以下的改造

public static void main(String[] args){// FileChannel// 1.输入输出流 2.RandomAccessFiletry (FileChannel channel = new FileInputStream("data.txt").getChannel()) {// 准备缓冲区,划分10个字节来作为缓冲区ByteBuffer buffer=ByteBuffer.allocate(10);// 从channel读取数据,向buffer写入while (true){int len=channel.read(buffer);log.debug("读取到的字节数{}",len);if (len==-1) //没有内容{break;}buffer.flip();//切换至读模式while (buffer.hasRemaining()) // 是否还有未读字符{// 打印buffer的内容byte b=buffer.get();log.debug("实际字节{}",(char)b);}// 切换成写模式buffer.clear();}}catch (IOException e) {}}=============自己试了一下,这样的操作也是可以的=====while (channel.read(buffer)!=-1){buffer.flip();//切换至读模式while (buffer.hasRemaining()) // 是否还有未读字符{// 打印buffer的内容byte b=buffer.get();log.debug("实际字节{}",(char)b);}// 切换成写模式buffer.clear();}

2.1ByteBuffer正确使用

1、向buffer写入数据,例如调用channel…read(buffer)
2、调用flip()切换至读模式
3、从buffer读取数据,例如buffer.get()
4、调用clear()或compact()切换至写模式
5、重复1-4步骤

2.2 ByteBuffer结构

ByteBuffer有以下重要属性(有点像一个数组)
capacity:即ByteBuffer的大小
position:读写指针,索引下标,标志读到哪了写到哪了
limit:读写的限制


调用flip方法后,转换成为读模式!


clean切换到写模式

compact方法,未读完的往前压缩,然后切换至写模式

2.3 ByteBuffer 常用方法

调试工具类

package com.zidu.netty;import io.netty.util.internal.StringUtil;import java.nio.ByteBuffer;import static io.netty.util.internal.MathUtil.isOutOfBounds;
import static io.netty.util.internal.StringUtil.NEWLINE;public class ByteBufferUtil {private static final char[] BYTE2CHAR = new char[256];private static final char[] HEXDUMP_TABLE = new char[256 * 4];private static final String[] HEXPADDING = new String[16];private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];private static final String[] BYTE2HEX = new String[256];private static final String[] BYTEPADDING = new String[16];static {final char[] DIGITS = "0123456789abcdef".toCharArray();for (int i = 0; i < 256; i++) {HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];}int i;// Generate the lookup table for hex dump paddingsfor (i = 0; i < HEXPADDING.length; i++) {int padding = HEXPADDING.length - i;StringBuilder buf = new StringBuilder(padding * 3);for (int j = 0; j < padding; j++) {buf.append("   ");}HEXPADDING[i] = buf.toString();}// Generate the lookup table for the start-offset header in each row (up to 64KiB).for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {StringBuilder buf = new StringBuilder(12);buf.append(NEWLINE);buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));buf.setCharAt(buf.length() - 9, '|');buf.append('|');HEXDUMP_ROWPREFIXES[i] = buf.toString();}// Generate the lookup table for byte-to-hex-dump conversionfor (i = 0; i < BYTE2HEX.length; i++) {BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);}// Generate the lookup table for byte dump paddingsfor (i = 0; i < BYTEPADDING.length; i++) {int padding = BYTEPADDING.length - i;StringBuilder buf = new StringBuilder(padding);for (int j = 0; j < padding; j++) {buf.append(' ');}BYTEPADDING[i] = buf.toString();}// Generate the lookup table for byte-to-char conversionfor (i = 0; i < BYTE2CHAR.length; i++) {if (i <= 0x1f || i >= 0x7f) {BYTE2CHAR[i] = '.';} else {BYTE2CHAR[i] = (char) i;}}}/*** 打印所有内容* @param buffer*/public static void debugAll(ByteBuffer buffer) {int oldlimit = buffer.limit();buffer.limit(buffer.capacity());StringBuilder origin = new StringBuilder(256);appendPrettyHexDump(origin, buffer, 0, buffer.capacity());System.out.println("+--------+-------------------- all ------------------------+----------------+");System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);System.out.println(origin);buffer.limit(oldlimit);}/*** 打印可读取内容* @param buffer*/public static void debugRead(ByteBuffer buffer) {StringBuilder builder = new StringBuilder(256);appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());System.out.println("+--------+-------------------- read -----------------------+----------------+");System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());System.out.println(builder);}public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.allocate(10);buffer.put(new byte[]{97, 98, 99, 100});debugAll(buffer);}private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {if (isOutOfBounds(offset, length, buf.capacity())) {throw new IndexOutOfBoundsException("expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length+ ") <= " + "buf.capacity(" + buf.capacity() + ')');}if (length == 0) {return;}dump.append("         +-------------------------------------------------+" +NEWLINE + "         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |" +NEWLINE + "+--------+-------------------------------------------------+----------------+");final int startIndex = offset;final int fullRows = length >>> 4;final int remainder = length & 0xF;// Dump the rows which have 16 bytes.for (int row = 0; row < fullRows; row++) {int rowStartIndex = (row << 4) + startIndex;// Per-row prefix.appendHexDumpRowPrefix(dump, row, rowStartIndex);// Hex dumpint rowEndIndex = rowStartIndex + 16;for (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);}dump.append(" |");// ASCII dumpfor (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);}dump.append('|');}// Dump the last row which has less than 16 bytes.if (remainder != 0) {int rowStartIndex = (fullRows << 4) + startIndex;appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);// Hex dumpint rowEndIndex = rowStartIndex + remainder;for (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);}dump.append(HEXPADDING[remainder]);dump.append(" |");// Ascii dumpfor (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);}dump.append(BYTEPADDING[remainder]);dump.append('|');}dump.append(NEWLINE +"+--------+-------------------------------------------------+----------------+");}private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {if (row < HEXDUMP_ROWPREFIXES.length) {dump.append(HEXDUMP_ROWPREFIXES[row]);} else {dump.append(NEWLINE);dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));dump.setCharAt(dump.length() - 9, '|');dump.append('|');}}public static short getUnsignedByte(ByteBuffer buffer, int index) {return (short) (buffer.get(index) & 0xFF);}
}

分配空间

// 分配一个大小为16字节的缓冲区
ByteBuffer buf= ByteBuffer.allocate(16);
ByteBuffer buf=ByteBuffer.allocateDirect(16);
// 分配固定的空间
public class TestByteBufferAllocate {public static void main(String[] args) {// 第一种是在Java堆内存上分配空间,读写效率低,收到垃圾回收(GC)的影响,GC可能会造成数据的搬迁System.out.println(ByteBuffer.allocate(16).getClass());// 第二种是在直接内存上分配空间,读写效率高(少一次拷贝),不会收到GC影响,分配内存的效率低下// 且如果使用不当,可能会造成内存泄漏System.out.println(ByteBuffer.allocateDirect(16).getClass());}
}

向buffer中写入数据

调用channel的read方法

int readBytes=channel.read(buf);

调用buffer的put方法

buf.put((byte)127);

从buffer中读取数据

调用channel的write方法

int writeBytes=channel.write(buf);

调用buffer自己的get方法

byte b = buf.get();

get方法会让position读指针向后走,如果想重复读取数据

可以调用rewind 方法将position 重新置为0
或者调用get(int i)方法获取索引i的内容,它不会移动读指针

代码演示

public class TestByteBufferReadWrite {public static void main(String[] args) {// 开辟一个10字节大小的缓冲区ByteBuffer buffer = ByteBuffer.allocate(10);// 写入一个十六进制的数,其实就是写入一个a字符buffer.put((byte)0x61);debugAll(buffer);buffer.put(new byte[]{0x62,0x63,0x64});debugAll(buffer);buffer.flip();//切换到读模式System.out.println(buffer.get());debugAll(buffer);// 往前移动,整理数据buffer.compact();debugAll(buffer);buffer.put(new byte[]{0x65,0x66});debugAll(buffer);}
}

get(),get(int i),allocate(),mark(),reset()方法的使用

public class TestByteBufferReadWrite {public static void main(String[] args) {// 开辟一个10字节大小的缓冲区ByteBuffer buffer = ByteBuffer.allocate(10);buffer.put(new byte[]{'a','b','c','d'});//  切换到读模式buffer.flip();System.out.println((char) buffer.get());System.out.println((char) buffer.get());// 其实就是重置了position的值而已
//        System.out.println(buffer.rewind());/*public final Buffer rewind(){position = 0;mark = -1;return this;}*/// mark & reset// mark 做一个标记,记录position位置,reset 是将 position 重置到mark 的位置
//        debugAll(buffer);// 假设现在1的位置很重要,需要记录下这个位置,就可以使用mark 方法buffer.mark();System.out.println((char) buffer.get());System.out.println((char) buffer.get());// 然后这个时候,又要拿到刚刚的标记位置的数据,那么就可以使用reset将position重置到刚刚mark的位置buffer.reset();System.out.println((char) buffer.get());}
}

字符串和ByteBuffer互转

将字符串转化为ByteBuffer

ByteBuffer.allocate(16).put("example".getBytes())
StandardCharsets.UTF_8.encode("example")
ByteBuffer.wrap("example".getBytes())

将ByteBuffer转化为字符串

注意:这个方法只能转化处在读模式下的ByteBuffer,如果转化处在写模式下,那么不会正确输出结果

StandardCharsets.UTF_8.decode(buffer1).toString()
package com.zidu.netty;import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;import static com.zidu.netty.ByteBufferUtil.debugAll;public class TestByteBufferReadWrite {public static void main(String[] args) {// 1.字符串转为ByteBuffer,这个不会自动切换到读模式ByteBuffer buffer = ByteBuffer.allocate(16);buffer.put("hello".getBytes());debugAll(buffer);// 2.借助一个Charset,这个会自动切换到读模式ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("hello");debugAll(buffer1);// 3.wrap,这个会自动切换到读模式ByteBuffer buffer2 = ByteBuffer.wrap("hello".getBytes());debugAll(buffer2);// 4.将一个缓冲区的字符转化为字符串,只能转化处在读模式下的缓冲区String s1 = StandardCharsets.UTF_8.decode(buffer1).toString();System.out.println(s1);// 5.不能转化为处在写模式下的缓冲区,会转一些奇奇怪怪的字符buffer.flip();//先转为读模式String s2 = StandardCharsets.UTF_8.decode(buffer).toString();System.out.println(s2);}
}

2.4 Scattering Reads(分散读取)

即利用多个缓冲区,一起从某个管道中读取数据,我们就不用利用一个大的缓冲区来完全存储这些数据!减少数据复制的次数,提高效率!

package com.zidu.netty;import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;import static com.zidu.netty.ByteBufferUtil.debugAll;
/*
*
* 分散读取
*
* */
public class TestScatteringReads {public static void main(String[] args) {try (FileChannel channel = new RandomAccessFile("words.txt", "r").getChannel()){ByteBuffer b1 = ByteBuffer.allocate(3);ByteBuffer b2 = ByteBuffer.allocate(3);ByteBuffer b3 = ByteBuffer.allocate(5);channel.read(new ByteBuffer[]{b1,b2,b3});b1.flip();b2.flip();b3.flip();debugAll(b1);debugAll(b2);debugAll(b3);}catch (IOException e){System.out.println(e);}}
}

2.5 Gathering Writes (集中写入)

即不用将所有缓冲区合并起来,就可以将缓冲区里面的内容写入到文件中!减少数据复制的次数,提高效率!

package com.zidu.netty;/*
*
* 集中写入
*
* */import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;public class TestGatheringWrites {public static void main(String[] args) {ByteBuffer b1 = StandardCharsets.UTF_8.encode("hello");ByteBuffer b2 = StandardCharsets.UTF_8.encode("world");ByteBuffer b3 = StandardCharsets.UTF_8.encode("I am coming!");try (FileChannel channel = new RandomAccessFile("words2.txt", "rw").getChannel()) {channel.write(new ByteBuffer[]{b1,b2,b3});} catch (IOException e) {}}
}

2.6 黏包半包分析

比如,客户端有三条数据。
Hello,world!\n
I'm zhangsan!\n
How are you?\n经过网络传输后,就变成了
Hello,world!\nI'm zhangsan!\nHo
w are you?\n
这就是黏包和半包现象!

模拟黏包半包,并且模拟了解决方法!

package com.zidu.netty;import java.nio.ByteBuffer;import static com.zidu.netty.ByteBufferUtil.debugAll;public class TestByteBufferExam {public static void main(String[] args){// 模拟接收到黏包和半包的数据ByteBuffer source=ByteBuffer.allocate(64);source.put("Hello,world!\nI'm zhangsan!\nHo".getBytes());split(source);source.put("w are you?\n".getBytes());split(source);}private static void split(ByteBuffer source){// 切换到读模式source.flip();for (int i=0;i< source.limit();++i){// 找到一条完整消息if (source.get(i)=='\n'){int length=i+1- source.position();// 把这条完整消息存入新的ByteBufferByteBuffer target = ByteBuffer.allocate(length);// 从 source 中读取,向 target 中写入 完整的消息,这样就可以了for (int j=0;j<length;++j){target.put(source.get());}debugAll(target);}}// 切换到写模式source.compact();}
}

可以看到,消息已经被成功分离出来了!

3.文件编程

3.1 FileChannel

FileChannel只能工作在阻塞模式下!

获取
不能直接打开FileChannel,必须通过 FileInputStream、FileOutputStream或者RandomAccessFile来获取FileChannel,他们都有getChannel的方法!

读取
会从Channel读取数据填充ByteBuffer,返回值表示读到了多少字节,-1表示到达了文件的末尾!

int  readBytes = channel.read(buffer);

写入

ByteBuffer buffer= ....;
buffer.put(...);     // 存入数据
buffer.flip();       // 切换读写模式while(buffer.hasRemaining())
{channel.write(buffer);
}

在while中调用channel.write 是因为write方法并不能保证一次将buffer中的全部内容全部写入channel!
关闭
channel必须关闭,不过调用了FileInputStream、FileOutputStream或者RandomAccessFile的close方法间接的调用channel的close方法!
大小
使用size获取文件的大小

3.2 两个channel传输数据

数据迁移的速度很快,因为这是利用了操作系统底层的零拷贝进行优化,有最大传输上限,最大传输数据的大小是2G!

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;/*
*
* 测试两个 channel 之间的通信
*
* */
public class TestFileChannelTransferTo
{public static void main(String[] args) throws IOException {FileChannel from = new FileInputStream("data.txt").getChannel();FileChannel to = new FileOutputStream("newdata.txt").getChannel();from.transferTo(0,from.size(),to);}
}


传输大于2G的数据

package com.zidu.netty;import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;/*
*
* 测试两个 channel 之间的通信
*
* */
public class TestFileChannelTransferTo
{public static void main(String[] args) throws IOException {FileChannel from = new FileInputStream("data.txt").getChannel();FileChannel to = new FileOutputStream("newdata.txt").getChannel();long size = from.size();// left 变量代表还剩余多少字节,分多次传输大于2G的数据for (long left = size; left>0;){System.out.println("position: "+(size-left) + " left:"+left);left-=from.transferTo(size-left, left, to);}}
}

3.3 Path


3.4 Files





拷贝文件也是利用了操作系统的底层,但是是利用不同的实现,效率也比较高!

3.4.1 统计文件个数,文件夹个数,特殊后缀的文件个数

package com.zidu.netty;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.concurrent.atomic.AtomicInteger;@Slf4j
public class TestFilesWalkFileTree {public static void main(String[] args) throws IOException {AtomicInteger dirCount=new AtomicInteger();AtomicInteger fileCount=new AtomicInteger();AtomicInteger jarCount=new AtomicInteger();Files.walkFileTree(Paths.get("C:\\Program Files\\Java\\jdk1.8.0_162"),new SimpleFileVisitor<Path>(){@Overridepublic FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {// 统计文件夹个数dirCount.incrementAndGet();return super.preVisitDirectory(dir, attrs);}@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {// 统计文件个数fileCount.incrementAndGet();// 统计jar包个数if (file.toString().endsWith(".jar"))jarCount.incrementAndGet();return super.visitFile(file, attrs);}});log.debug(String.valueOf(dirCount));log.debug(String.valueOf(fileCount));log.debug(String.valueOf(jarCount));}
}

3.4.2 删除多级目录

思路:非空目录不能删除,必须进入到目录中,现将文件删除掉了,再回头删除目录!

假设有一个需求可以删除文件目录,删除的时候又要求目录为空,就可以利用这些方法来操作!
此操作十分危险,且删除文件不经过回收站!
此操作十分危险,且删除文件不经过回收站!
此操作十分危险,且删除文件不经过回收站!

package com.zidu.netty;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;@Slf4j
public class TestFilesWalkFileTree {public static void main(String[] args) throws IOException {Files.walkFileTree(Paths.get("无用路径"),new SimpleFileVisitor<Path>(){@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {// 阅读文件
//               Files.delete(file); 将文件删除return super.visitFile(file, attrs);}@Overridepublic FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {//                 退出文件夹之后删除文件夹
//                Files.delete(dir);// 退出文件夹之后做什么return super.postVisitDirectory(dir, exc);}});}
}

3.4.3 拷贝多级目录

思路:如果是目录,就到目标里面新建目录,如果是文件,那么就复制到目标文件夹!

package com.zidu.netty;import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;public class TestFileCopy {public static void main(String[] args) throws IOException {String source="E:\\test";String target="E:\\testCopy";Files.walk(Paths.get(source)).forEach(file->{try {String targetName=file.toString().replace(source,target);// 如果是目录,那么就需要在目的目录新建对应的目录if (Files.isDirectory(file)) {Files.createDirectory(Paths.get(targetName));}// 是一个普通文件else  if (Files.isRegularFile(file)){Files.copy(file,Paths.get(targetName));}} catch (IOException e){e.printStackTrace();}});}
}

4. 网络编程

4.1 阻塞模式

ServerSocketChannel是用来服务器连接的,SocketChannel 是从连接中获取数据的!
模拟服务器

package com.zidu.netty.c4;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;import static com.zidu.netty.c2.ByteBufferUtil.debugRead;@Slf4jpublic class Server {public static void main(String[] args) throws IOException {// 利用 nio来理解阻塞模式// 1.创建缓冲区ByteBuffer buffer = ByteBuffer.allocate(16);// 2.创建服务器ServerSocketChannel ssc = ServerSocketChannel.open();// 3.绑定监听端口ssc.bind(new InetSocketAddress(8888));//4.连接集合List<SocketChannel> channels=new ArrayList<SocketChannel>();while (true){//5.accept与客户端建立连接,SocketChannel 用来和客户端之间进行通信log.debug("connecting...");SocketChannel sc = ssc.accept();// 阻塞方法,线程停止运行log.debug("connected...{}",sc);channels.add(sc);for (SocketChannel channel : channels) {// 6、接收客户端发送的数据log.debug("before read ..{}",channel);// 7、把数据写入缓冲区channel.read(buffer);// 8、切换缓冲区到读模式buffer.flip();// 9、读出所有缓冲区的数据debugRead(buffer);// 10、切换缓冲区到写模式buffer.clear();log.debug("after read...{}",channel);}}}
}

模拟客户端

package com.zidu.netty.c4;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;public class Client {public static void main(String[] args) throws IOException {SocketChannel sc = SocketChannel.open();sc.connect(new InetSocketAddress("localhost",8888));System.out.println("wait data....");}
}

运行情况
1、先运行服务器端,发现到了accept()方法线程就不再往下执行了,原因是没有建立连接,accept()方法是一个阻塞方法

2、再启动客户端,发现到了read()方法也就不再往下执行了,原因是缓冲区里没有数据,read()方法是一个阻塞方法。

3、对客户端进行调试,模拟发送数据

输入调试语句,点击evaluate。

4、发现服务器端接收到了数据,并且又进行下一轮的等待(等待连接)。

总结:这个时候,如果你利用客户端再发送一次数据,服务器端是没有办法接收到的!因为现在的服务器端被阻塞在accept()方法那里,要想再发送数据,必须再次新建新建一个客户端进行连接!
单线程和阻塞模式无法很好的相结合!

4.2 非阻塞模式

模拟服务端,把ServerSocketChannel 和 SocketChannel 都设置为非阻塞模式!

package com.zidu.netty.c4;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import static com.zidu.netty.c2.ByteBufferUtil.debugRead;@Slf4jpublic class Server {public static void main(String[] args) throws IOException {// 1.创建缓冲区ByteBuffer buffer = ByteBuffer.allocate(16);// 2.创建服务器ServerSocketChannel ssc = ServerSocketChannel.open();// 2.5 设置为非阻塞模式ssc.configureBlocking(false);// 3.绑定监听端口ssc.bind(new InetSocketAddress(8888));//4.连接集合List<SocketChannel> channels=new ArrayList<SocketChannel>();while (true){//5.accept与客户端建立连接,SocketChannel 用来和客户端之间进行通信SocketChannel sc = ssc.accept();// 非阻塞方法,线程继续运行,如果没有连接建立,返回一个null值if (sc != null) {log.debug("connected...{}", sc);// 5.5 设置为非阻塞模式sc.configureBlocking(false);//设置为非阻塞模式channels.add(sc);}for (SocketChannel channel : channels) {// 6、接收客户端发送的数据// 7、把数据写入缓冲区int read=channel.read(buffer); // 非阻塞方法,线程会继续运行if (read>0){// 8、切换缓冲区到读模式buffer.flip();// 9、读出所有缓冲区的数据debugRead(buffer);// 10、切换缓冲区到写模式buffer.clear();log.debug("after read...{}",channel);}}}}
}

模拟客户端(打开配置,允许有多个实例一起运行)!

package com.zidu.netty.c4;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;public class Client {public static void main(String[] args) throws IOException {SocketChannel sc = SocketChannel.open();sc.connect(new InetSocketAddress("localhost",8888));System.out.println("wait data....");}
}

1、运行多个客户端实例,发现服务端可以监听到这几个连接

2、利用端口为54274的客户端向服务端发送数据,客户端能够正常接收!

3、利用端口为55573和53230的客户端向服务器发送数据,服务器端能够正常接收数据!

总结:单线程和非阻塞模式能够结合得很好,但是缺点也很明显,因为线程不停地做询问。这样子会导致CPU一直在执行这个线程,消耗太多资源。虽然单线程和非阻塞模式能够结合得很好,但是这种模式在实际开发中不实用!

4.2.1 多路复用

4.3 selector

监听Channel事件

select()方法何时不阻塞

4.3.1 selector-处理accept

模拟服务器端,创建一个selector,用来管理多个channel,把channel注册到selecot里面,注册完后会获得一个指定的SelectionKey,这个SelectionKey就是来监听所注册的这个channel的事件,并且可以指定当前的SelectionKey要关注哪些事件!比如客户端的注册事件。然后一旦客户端发生注册,就会促使selector的select()方法继续往下执行!

package com.zidu.netty.c4;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;import static com.zidu.netty.c2.ByteBufferUtil.debugRead;@Slf4jpublic class Server {public static void main(String[] args) throws IOException {// 1.创建一个selector对象,管理多个channelSelector selector = Selector.open();ByteBuffer buffer = ByteBuffer.allocate(16);ServerSocketChannel ssc = ServerSocketChannel.open();// 设置为非阻塞模式ssc.configureBlocking(false);//2. 建立selector和channel的联系(将channel注册到selector里面)// SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件SelectionKey sscKey = ssc.register(selector, 0, null);/** 事件一共有四种:* accept - 会在有连接请求时触发* connect - 是客户端,连接建立后触发* read - 可读事件* write - 可写事件* */sscKey.interestOps(SelectionKey.OP_ACCEPT); // 指定sscKey要关注accept事件log.debug("register key :{}",sscKey);ssc.bind(new InetSocketAddress(8888));while (true){// 3. select 用法,没有事件发生,线程阻塞,有事件,线程才会恢复运行// select()方法在有事件未处理时是非阻塞的!selector.select();// 4. 处理事件,selectedKeys 内部包含了所有发生的事件Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()){SelectionKey key = iter.next();log.debug("key : {}",key);ServerSocketChannel channel = (ServerSocketChannel)key.channel();SocketChannel sc = channel.accept();log.debug("{}",sc);}}}
}

用的是channel的同一个SelectionKey来监听注册的事件,每次该channel有新的注册事件就都由SelectionKey去通知selector!

4.3.2 selector-cancel取消事件

事件一旦发生了,select()方法就会变为非阻塞状态,要么处理事件,要么取消事件,如果事件既不处理,也不取消事件,那么select()就会一直处于非阻塞状态,消耗CPU资源

 while (iter.hasNext()){SelectionKey key = iter.next();log.debug("key : {}",key);
//                // 获取这个事件其实就是对其进行处理,如果不对事件进行处理,那么select就会一直触发非阻塞状态
//                ServerSocketChannel channel = (ServerSocketChannel)key.channel();
//                SocketChannel sc = channel.accept();
//                log.debug("{}",sc);key.cancel();// 取消当前事件}

4.3.3 selector-处理read事件

package com.zidu.netty.c4;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;import static com.zidu.netty.c2.ByteBufferUtil.debugAll;@Slf4jpublic class Server {public static void main(String[] args) throws IOException {// 1.创建一个selector对象,管理多个channelSelector selector = Selector.open();ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);//2. 建立selector和channel的联系(将channel注册到selector里面)// SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件SelectionKey sscKey = ssc.register(selector, 0, null);/** 事件一共有四种:* accept - 会在有连接请求时触发* connect - 是客户端,连接建立后触发* read - 可读事件* write - 可写事件* */sscKey.interestOps(SelectionKey.OP_ACCEPT); // 指定sscKey要关注accept事件log.debug("register key :{}",sscKey);ssc.bind(new InetSocketAddress(8888));while (true){// 3. select 用法,没有事件发生,线程阻塞,有事件,线程才会恢复运行// select()方法在有事件未处理时是非阻塞的!selector.select();// 4. 处理事件,selectedKeys 内部包含了所有发生的事件Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()){SelectionKey key = iter.next();log.debug("key : {}",key);// 是一个注册事件if (key.isAcceptable()){// 获取这个事件其实就是对其进行处理,如果不对事件进行处理,那么select就会一直触发非阻塞状态ServerSocketChannel channel = (ServerSocketChannel)key.channel(); // 获取SocketChannel sc = channel.accept();// selector 必须工作在非阻塞模式下sc.configureBlocking(false);// sc要想在非阻塞模式下工作,必须把管理权交给selector,即将sc注册到selector里面,获得的key,就专门用来管理sc的SelectionKey scKey = sc.register(selector, 0, null);scKey.interestOps(SelectionKey.OP_READ); //关注读事件log.debug("{}",sc);log.debug("scKey:{}",scKey);
//                key.cancel();// 取消当前事件}// 是一个读事件else if (key.isReadable()){SocketChannel channel=(SocketChannel)key.channel(); // 拿到触发事件的channelByteBuffer buffer=ByteBuffer.allocate(16);channel.read(buffer);//把数据写入缓冲区buffer.flip();//切换缓冲区到读模式debugAll(buffer);}}}}
}

就是我们可以利用一个selector来管理多个channel,selector监听到有事件发生,就会触发select方法!我们就要遍历这个selector的selectedKeys()把那些事件拿出来做一个处理!还可以根据事件的类型作不同的处理,如果是连接事件,就要获得一个channel并且注册到selecor中;如果是一个读事件,那么就要将channel的数据写到缓冲区,再从缓冲区里面取出数据!
ServerSocketChannel是用来服务器连接的,SocketChannel 是从连接中获取数据的!

客户端发送完数据后,服务器端会报一个空指针异常。

4.3.4 selector-处理read事件后要将selectorkey删除

一开始,我们新建一个selector,它是这样子的!

当我们向selector里面注册了一个channel之后,会变成这个样子!

代码往下执行到select()方法就会阻塞,这个时候,客户端来了一个连接,select()方法就会继续向下运行!

然后这是一个注册事件,就会被第一个分支所捕获,事件处理后,selectedKeys中的accept就会把事件抹去,但是不会删除sscKey。

然后再往下,我们往selector里面又注册了一个chanel。

连接事件就处理完了,进入到下一轮循环!
这个时候,客户端发来新数据!select()方法监测到有事件发生,就会继续往下执行!那么这个时候,是哪个key发生的事件呢?我们发现是scKey发生的写事件,于是就会把scKey加入到selectedKeys中!

然后再继续往下执行代码,开始获取整个selectedKeys的迭代器并且进行遍历!


这个时候,第一个注册事件已经被执行了,但是没有被删除,所以,我们通过代码去获取是获取不到chanel的,返回的是一个null。!所以就会报空指针异常,所以就必须每次执行完后要将key给删除掉!

                    ServerSocketChannel channel = (ServerSocketChannel)key.channel(); // 获取SocketChannel sc = channel.accept();

所以上述代码应该作出如下修改,增加一行删除代码,一旦拿到了key,就要把它删除掉!

                SelectionKey key = iter.next();log.debug("key : {}",key);iter.remove(); // 删除已经被拿到的key

4.3.5 selector-nio处理客户端断开连接

服务端

package com.zidu.netty.c4;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;import static com.zidu.netty.c2.ByteBufferUtil.debugAll;@Slf4jpublic class Server {public static void main(String[] args) throws IOException {// 1.创建一个selector对象,管理多个channelSelector selector = Selector.open();ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);//2. 建立selector和channel的联系(将channel注册到selector里面)// SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件SelectionKey sscKey = ssc.register(selector, 0, null);sscKey.interestOps(SelectionKey.OP_ACCEPT); // 指定sscKey要关注accept事件/** 事件一共有四种:* accept - 会在有连接请求时触发* connect - 是客户端,连接建立后触发* read - 可读事件* write - 可写事件* */log.debug("register key :{}",sscKey);ssc.bind(new InetSocketAddress(8888));while (true){// 3. select 用法,没有事件发生,线程阻塞,有事件,线程才会恢复运行// select()方法在有事件未处理时是非阻塞的!selector.select();// 4. 处理事件,selectedKeys 内部包含了所有发生的事件Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()){SelectionKey key = iter.next();log.debug("key : {}",key);iter.remove();// 是一个注册事件if (key.isAcceptable()){// 获取这个事件其实就是对其进行处理,如果不对事件进行处理,那么select就会一直触发非阻塞状态ServerSocketChannel channel = (ServerSocketChannel)key.channel(); // 获取SocketChannel sc = channel.accept();// selector 必须工作在非阻塞模式下sc.configureBlocking(false);// sc要想在非阻塞模式下工作,必须把管理权交给selector,即将sc注册到selector里面,获得的key,就专门用来管理sc的SelectionKey scKey = sc.register(selector, 0, null);scKey.interestOps(SelectionKey.OP_READ); //关注读事件log.debug("{}",sc);log.debug("scKey:{}",scKey);
//                key.cancel();// 取消当前事件}// 是一个读事件else if (key.isReadable()){SocketChannel channel=(SocketChannel)key.channel(); // 拿到触发事件的channelByteBuffer buffer=ByteBuffer.allocate(16);channel.read(buffer);//把数据写入缓冲区buffer.flip();//切换缓冲区到读模式debugAll(buffer);}}}}
}

客户端

package com.zidu.netty.c4;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;public class Client {public static void main(String[] args) throws IOException {SocketChannel sc = SocketChannel.open();sc.connect(new InetSocketAddress("localhost",8888));System.out.println("wait data....");}
}

客户端建立连接后没有发送数据就断开连接报异常!

客户端断开可以分为正常断开和异常断开,每次断开后都会触发一次read事件,我们需要对这个read事件做一下处理!

else if (key.isReadable()){try {SocketChannel channel=(SocketChannel)key.channel(); // 拿到触发事件的channelByteBuffer buffer=ByteBuffer.allocate(16);// 处理客户端正常断开的情况int read = channel.read(buffer);//如果是正常断开,read方法返回值是-1;if (read==-1){key.cancel();}buffer.flip();//切换缓冲区到读模式debugAll(buffer);} catch (IOException e){// 处理客户端异常断开的情况e.printStackTrace();key.cancel(); // 客户端断开触发后,会再触发一个read事件,我们需要取消这个read事件(从selector的key中真正删除)}}

4.4 selector-nio 正确处理消息边界

服务器端设置一个缓冲区大小为4,,客户端发来“中国”两个字!

try {SocketChannel channel=(SocketChannel)key.channel(); // 拿到触发事件的channelByteBuffer buffer=ByteBuffer.allocate(4);int read = channel.read(buffer);//如果是正常断开,read方法返回值是-1;if (read==-1){key.cancel();}buffer.flip();//切换缓冲区到读模式System.out.println(Charset.defaultCharset().decode(buffer).toString());
//                        debugAll(buffer);}


我们发现,中字完整的展现了出来,国字没有。

原因:一个汉字对应三个字节(Windows操作系统默认的编码格式是UTF-8,一个汉字对应三字节),而我们的缓冲区只有4个字节,所以第一次读到的,就是“中”字的全部和“国”字的三分之一!

4.4.1 解决方法

解决方式:
1.约定一个长度,使用固定大小(可能会造成浪费)
2.使用较小的缓冲区,会出现黏包和半包现象
3.使用TLV格式,就是把消息分成两部,先传数据长度,获取到后,再根据实际内容去开辟一块缓冲区!


4.4.2 附件 attachment 与 扩容

假设服务器端的缓冲区长度只有16,但是客户端发送的数据的长度有32。这样子,服务器就没办法一次性把一条消息给读完,所以就需要添加一个buffer给对应的channel【其实这里我自己感觉有点像Thread里面的ThreadLocal!】,也就在注册channel的第三个参数就是这个附件!

拆分消息的方法

private static void split(ByteBuffer source){// 切换到读模式source.flip();for (int i=0;i< source.limit();++i){// 找到一条完整消息if (source.get(i)=='\n'){int length=i+1- source.position();// 把这条完整消息存入新的ByteBufferByteBuffer target = ByteBuffer.allocate(length);// 从 source 中读取,向 target 中写入 完整的消息,这样就可以了for (int j=0;j<length;++j){target.put(source.get());}debugAll(target);}}// 切换到写模式source.compact();}

对消息进行读取,其实这个netty已经帮我们显示了扩容与附件的功能!但是底层就是基本是这样子实现的,如果当前缓冲区不够接收消息,即上面的split方法没有分割出消息,那么就证明我们无法获得一个完整的消息,就需要对缓冲区进行扩容,而且我们这里只进行扩容,netty还有在缩小容器的情况!

 split(buffer); // 对buffer进行读取if (buffer.position()==buffer.limit()) // 即当前的position和limit在同一个位置,表示没有读到一个完整的消息{ByteBuffer newBytebuffer=ByteBuffer.allocate(buffer.capacity()*2);//按照当前的ByteBuffer的两倍长度扩容buffer.flip();// 因为上面的split方法会将buffer改为读模式newBytebuffer.put(buffer);// 把旧的附近的buffer内容复制到新的buffer里面key.attach(newBytebuffer);//重新给当前的channel绑定buffer}

4.4.3 selector-nio-ByteBuffer的大小分配

4.4.4 nio-selector 写入内容过多

服务器端一直等待发送数据完毕,很耗时间!
向客户端写入30000000个字节的数据

public class WriteServer {public static void main(String[] args) throws IOException {ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);Selector selector = Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8888));while (true){selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()){SelectionKey key = iter.next();iter.remove();// 连接事件if (key.isAcceptable()){//其实下面这个就相当于上面这个的作用,我们只有一个ServerSocketChannel,所以通过key获取到的一定是这个Channel也就是ssc//所以也就可以直接利用ssc.accept();//ServerSocketChannel sscKey = (ServerSocketChannel)key.channel();SocketChannel sc = ssc.accept();sc.configureBlocking(false);//非阻塞模式// 1、向客户端发送大量数据StringBuilder sb = new StringBuilder();for (int i = 0; i < 30000000; i++) {sb.append("a");}ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());// 2、返回值代表实际写入的字节数while (buffer.hasRemaining()){int write = sc.write(buffer);System.out.println(write);}}}}}
}

接收来自服务器端的数据

public class WriteClient {public static void main(String[] args) throws IOException {SocketChannel sc = SocketChannel.open();sc.connect(new InetSocketAddress("localhost",8888));int count=0;//3、接收数据while (true){ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);count+= sc.read(buffer);System.out.println(count);buffer.clear();}}
}

服务器端每次都打印当前发送的数据,但是我们可以看到,有一段时间,它因为没有将数据发送完毕,一直在那里自旋,很耽误CPU的效率,这个时候。可以去做别的事情,不用一直在那里等待发送数据,直到发送数据完毕!

4.4.4 nio-selector 处理写入内容过多的问题

将一个不断循环的问题改为由多个可写事件进行触发,避免了一直在那里等待buffer的清空而无法去处理别的事情,提高CPU的工作效率!

public class WriteServer {public static void main(String[] args) throws IOException {ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);//开启一个selector来管理channelSelector selector = Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8888));while (true){selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()){SelectionKey key = iter.next();iter.remove();// 连接事件if (key.isAcceptable()) // 连接事件{SocketChannel sc = ssc.accept();sc.configureBlocking(false);//非阻塞模式SelectionKey sckey = sc.register(selector, 0, null);sckey.interestOps(SelectionKey.OP_READ);StringBuilder sb = new StringBuilder();for (int i = 0; i <5000000; i++) {sb.append("a");}ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());// 先写一次 int write = sc.write(buffer);System.out.println(write);// 当前数据没有被写完,关注一个可写事件,一旦buffer里面有空缺了,就会触发select()方法if (buffer.hasRemaining()){sckey.interestOps(sckey.interestOps()+SelectionKey.OP_WRITE);//  将没有写完的数据挂载到sckey上sckey.attach(buffer);}}// 触发了 可写事件else  if (key.isWritable()){//   从挂载的附件中取出可写的bufferByteBuffer buffer = (ByteBuffer)key.attachment();//  获得对应的channelSocketChannel scChannel = (SocketChannel)key.channel();int writeNum = scChannel.write(buffer);System.out.println(writeNum);// 6、清理操作if (!buffer.hasRemaining()){key.attach(null); //  需要清除buffer,不能让他占用这么多的内存key.interestOps(key.interestOps()-SelectionKey.OP_WRITE); // 不需要关注可写事件}}}}}
}

服务器端不会出现一直等待发送数据的情况了!

4.5 多线程优化

现在都是多核CPU。设计时要充分考虑别让cpu的力量被白白浪费。

模拟服务器

package com.zidu.netty.c4;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;import static com.zidu.netty.c2.ByteBufferUtil.debugAll;@Slf4j
public class MultiThreadServer {public static void main(String[] args) throws IOException {Thread.currentThread().setName("boss");ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);Selector boss = Selector.open();SelectionKey bossKey = ssc.register(boss, 0, null);bossKey.interestOps(SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8888));// 1. 创建固定数量的 worker 并初始化Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];for (int i = 0; i < workers.length; i++) {workers[i] = new Worker("worker-" + i);}AtomicInteger index = new AtomicInteger();while(true) {boss.select();Iterator<SelectionKey> iter = boss.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {SocketChannel sc = ssc.accept();sc.configureBlocking(false);log.debug("connected...{}", sc.getRemoteAddress());// 2. 关联 selectorlog.debug("before register...{}", sc.getRemoteAddress());// round robin 轮询workers[index.getAndIncrement() % workers.length].init(sc); // boss 调用 初始化 selector , 启动 worker-0log.debug("after register...{}", sc.getRemoteAddress());}}}}static class Worker implements Runnable{private Thread thread;private Selector selector;private String name;private volatile boolean start = false; // 还未初始化private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();public Worker(String name) {this.name = name;}// 初始化线程,和 selectorpublic void init(SocketChannel sc) throws IOException {if(!start) {selector = Selector.open();thread = new Thread(this, name);thread.start();start = true;}selector.wakeup(); // 唤醒 select 方法 bosssc.register(selector, SelectionKey.OP_READ, null); // boss}@Overridepublic void run() {while(true) {try {selector.select(); // worker-0  阻塞Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel) key.channel();log.debug("read...{}", channel.getRemoteAddress());channel.read(buffer);buffer.flip();debugAll(buffer);}}} catch (IOException e) {e.printStackTrace();}}}}
}

模拟客户端

package com.zidu.netty.c4;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;public class TestClient {public static void main(String[] args) throws IOException {SocketChannel sc = SocketChannel.open();sc.connect(new InetSocketAddress("localhost",8888));sc.write(Charset.defaultCharset().encode("123456789"));System.in.read();}
}

5、NIO 和 BIO

5.1 stream vs channel

stream不会自动缓冲数据,channel会利用系统提供的发送缓冲区和接收缓冲区(更为底层)
stream 仅支持阻塞API。channel同时支持阻塞API和非阻塞API,网络channel刻配合selector实现多路复用!
二者均为全双工,即读写可以同时进行!

5.2 IO模型

同步阻塞、同步非阻塞、多路复用、异步阻塞、异步非阻塞
当调用一次channel.read 或 stream.read 后,会切换至操作系统内核态来完成真正的数据读取,而读取又分为两个阶段,分别是:
等待数据阶段
复制数据阶段

阻塞IO:当用户发起了一次read,但是这个时候可能没有数据,线程就会停下来等待数据,等待数据到了,复制数据,再返回给用户!

阻塞IO更加详细的图:第一次read请求,阻塞,等待数据,完成,然后这个时候,又要循环到建立连接,如果这个时候连接没有建立,它就会阻塞,但这个时候,channel1还是想发送数据,这个时候就没法发送了!【必须等待channel2连接建立完了,才能发送数据!】

非阻塞IO:当用户发起了一次read,这个时候数据没有传过来,会立刻返回0.然后就不断循环,用户线程一直在执行。当某次出现有数据,那么用户线程会被阻塞住,等待内核空间复制完毕,返回!

多路复用:其实跟阻塞IO很相似,但是它的优势在哪里呢?


一个selector可以同时监听多个事件,任何一个事件来了,都可以触发它继续向下执行,而且,还可以同时对三个事件一起处理,比阻塞IO高效!

同步:线程自己去获取结果(一个线程),所以阻塞IO是同步阻塞,非阻塞IO、多路复用本质上都是同步阻塞!【因为都是自己一个线程发起请求,获取结果!】

异步:线程自己不去获取结果,而是由其他线程送结果(至少两个线程)。异步肯定不是非阻塞的,异步还是阻塞的,异步就没有意义了!

5.3 零拷贝

传统IO问题

内部工作流程是这样子的:

1、java本身并不具备IO读写能力,因此read方法调用后,要从java程序的用户态切换至内核态,去调用操作系统的读能力,将数据读入内核缓冲区,这期间用户线程阻塞,操作系统边用DMA(Direct Memory Access)来实现文件读,期间也不会使用CPU
2、从内核态切换为用户态,将数据从内核缓冲区读入用户缓冲区(即byte[] buf),这期间会拷贝,无法利用DMA
3、调用wirte方法,这时将数据从用户缓冲区写入socket缓冲区,CPU会参与拷贝
4、接下来便要向网卡中写入数据,这项能力java又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,便用DMA将socket缓冲区的数据写入网卡,不会使用CPU!

5.3.1 NIO优化

通过DirectByteBuffer
BuyteBuffer.allocate(10) HeapByteBuffer【java堆内存分配缓冲区】
ByteBuffer.allocateDirect(10) DirectByteBuffer 【直接由操作系统的内存分配缓冲区,在操作系统可以访问,在java里面也可以访问】

内核缓冲区和用户缓冲区使用同一块区域,减少一次拷贝!

进一步优化
底层采用linux2.1后提供的sendFile方法,java中对应着两个channel调用transferTo和transferFrom方法拷贝数据


再进一步优化


数据的复制只需要一次,从java态切换到用户态也只需要两次

零拷贝:指的是没有数据会被拷贝到JVM的内存区域中
DMA就是专门做文件传输的硬件,下面就可以不使用CPU,而直接使用DMA传输!

5.4 AIO 异步IO

package com.zidu.netty.test;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;import static com.zidu.netty.c2.ByteBufferUtil.debugAll;@Slf4j
public class AioFileChannel {public static void main(String[] args) throws IOException, InterruptedException {// 获取一个文件的异步channel,后面参数表示打开哪个文件,对文件做什么操作AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("data.txt"), StandardOpenOption.READ);/** 传入四个参数:* 参数一:bytebuffer* 参数二:读取的起始位置* 参数三:附件 (可能需要多次才能读完整个文件)* 参数四:回调对象 CompletionHandler* */ByteBuffer buffer=ByteBuffer.allocate(16);log.debug("read begin..");channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {@Override // read 读取成功public void completed(Integer result, ByteBuffer attachment) {log.debug("read completed....");attachment.flip();// 其实使用buffer和attachment都是一样的,因为上面传入的是同一个debugAll(buffer);}@Override  // 读取失败public void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();}});log.debug("read  end...");Thread.sleep(1000); // 这里让主线程睡眠,就是为了防止,主线程结束了,也就是守护线程结束了,但是回调线程还没有完成工作也结束了}
}

NIO基础,帮助入门Netty相关推荐

  1. Netty学习笔记一NIO基础

    Netty学习笔记一 一. NIO 基础 non-blocking io 非阻塞IO (也可称为new IO, 因为是JDK1.4加入的) 1. 三大组件 1.1 Channel 通道:数据的传输通道 ...

  2. 中间件系列「三」netty之NIO基础

    Java NIO系统的核心在于:通道(Channel)和缓冲区(Buffer).通道表示打开到 IO 设备(例如:文件.套接字)的连接.若需要使用 NIO 系统,需要获取用于连接 IO 设备的通道以及 ...

  3. Java NIO框架Mina、Netty、Grizzly介绍与对比

    Java NIO框架Mina.Netty.Grizzly介绍与对比 原文地址:https://blog.csdn.net/e765741668/article/details/45234711 Min ...

  4. 【Netty】入门Netty官方例子解析(二)Time Server

    本文承接上文<[Netty]入门Netty官方例子解析(一)写个 Discard Server> ,接下来讲解官网文档中Netty入门官方例子第二个例子 Time Server 原文这个章 ...

  5. 【Netty】入门Netty官方例子解析(一)写个 Discard Server

    本文以Netty官方给出的列子来讲解Netty带你一步步进入Netty.Netty最全教程在这里 Getting Started 版本 netty4 maven依赖: <!-- https:// ...

  6. BIO、NIO、AIO、Netty面试题(总结最全面的面试题)

    BIO.NIO.AIO.Netty 什么是IO Java中I/O是以流为基础进行数据的输入输出的,所有数据被串行化(所谓串行化就是数据要按顺序进行输入输出)写入输出流.简单来说就是java通过io流方 ...

  7. BIO、NIO、AIO、Netty面试题(总结最全面的面试题!!!)

    BIO.NIO.AIO.Netty 什么是IO Java中I/O是以流为基础进行数据的输入输出的,所有数据被串行化(所谓串行化就是数据要按顺序进行输入输出)写入输出流.简单来说就是java通过io流方 ...

  8. 零基础快速入门SpringBoot2.0教程 (三)

    一.SpringBoot Starter讲解 简介:介绍什么是SpringBoot Starter和主要作用 1.官网地址:https://docs.spring.io/spring-boot/doc ...

  9. Netty系列(2)快速入门Netty线程模型、Netty入门程序、Netty任务队列

    文章目录 1 Netty线程模型 1.1 传统阻塞 I/O 服务模型 1.2 Reactor线程模型 1.2.1 单 Reactor 单线程模型 1.2.2 单Reactor多线程 1.2.3 主从 ...

最新文章

  1. java itemcf_大规模电商推荐数据分析-基于ItemCF的召回
  2. 若依管理系统——前后端分离版(二)登陆接口分析及SpringSecurity的登陆认证流程
  3. 实践 config drive - 每天5分钟玩转 OpenStack(170)
  4. 《助推》读书笔记-行为经济学
  5. hbase集群搭建,hbase单个节点重启
  6. Linux搭建私有Git服务器以及ssh免密登陆配置
  7. 超线程cpu的寄存器_一文总结 CPU 基本知识
  8. 在网络上提供资源的计算机,在计算机网络中通常把提供并管理共享资源的计算机称为...
  9. Eclipse中 Project facet jst.web.jstl has not been defined.解决方案
  10. 解决Vue的表格中,expand只有某些行需要展开的问题。
  11. Tomcat----windows系统通过命令符“强制关闭Tomcat”
  12. 如何对关键工序进行质量控制,以保证产品高可靠性
  13. java就是我的幸福,关于幸福的句子经典
  14. 《OKR工作法:谷歌、领英等公司的高绩效秘籍》读书笔记
  15. centos7 操作记录
  16. PSINS_Toolbox使用心得1
  17. 巧妙复制网页中的文本——复制网页上不能复制的文字
  18. 在DAZ3D STUDIO中使用.OBJ和.FBX三维模型
  19. Office-Tool
  20. 小程序使用腾讯地图获取当前位置和地图选取位置

热门文章

  1. 如何利用GitHub设计一个炫酷的个人网站(含代码)
  2. iphone 插电充不进电以及cydia 闪退问题
  3. 工业机器人用铸铁牌号_铸铁材料牌号国际对照表
  4. sklearn回归 预测下一期双色球
  5. 记一次Linux服务器上查杀木马经历
  6. 异星工厂机器人教程_异星工厂逻辑机器人建筑机器人使用方法
  7. opencv 去除玻璃蒙版_OpenCV实现马赛克和毛玻璃滤镜效果
  8. 在wincc中编辑语言c的另一个应用领域是关于动态向导的创建,wincc的c语言基础
  9. Fil暴力飙升,背后有哪些原因?
  10. 调用Camera API实现自己的拍照和摄像程序