转自:https://blog.csdn.net/dataiyangu/article/details/87214773

什么是NIO
Buffer && Channel
Buffer
举个栗子
NIO复制文件
Buffer中有3个重要的参数:
代码展示
图解
新建
存入10byte
flip
五次读操作
flip
几个重要的函数
文件映射到内存
网络编程
多线程网络服务器的一般结构
简单案例 EchoServer
EchoServer
HandleMsg
EchoServer的客户端
问题:
解决
模拟低效的客户端
服务器输出
网络编程NIO
参考代码
AIO
特点总结:
基本思想
其他的方法
代码实现
为什么需要了解NIO和AIO?
什么是NIO
NIO是New I/O的简称,与旧式的基于流的I/O方法相对,从名字看,它表示新的一套Java I/O标 准。它是在Java 1.4中被纳入到JDK中的,并具有以下特性:
– NIO是基于块(Block,硬盘上的块)的,它以块为基本单位处理数据,传统的是基于字节的,最小单位是字节,NIO最小单位是块。
– 为所有的原始类型提供(Buffer)缓存支持
– 增加通道(Channel)对象,作为新的原始 I/O 抽象
– 支持锁和内存映射文件的文件访问接口,拿文件系统来实现锁,就是我们平常的.log文件。
– 提供了基于Selector的异步网络I/O

Buffer && Channel

文件的读写都是通过Buffer读写到Channel,然后再到文件。Channel左边就是对应的我们的文件。

Buffer
每个基本类型都有对应的buffer

Object
-Buffer
-ByteBuffer
-CharBuffer
-DoubleBuffer
-FloatBUffer
-IntBuffer
-LongBuffer
-ShortBuffer
1
2
3
4
5
6
7
8
9
举个栗子
FileInputStream fin = new FileInputStream(new File("d:\\temp_buffer.tmp"));
//通过InputStream得到channel。
FileChannel fc=fin.getChannel();
//通过ByteBuffer分配1k的大小的buffer
ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
//通过channel将1k大小的数据读到buffer中
fc.read(byteBuffer);
fc.close();
//对buffer中的数据进行读写转换,后面可能要对buffer中的数据进行读取。
byteBuffer.flip();
1
2
3
4
5
6
7
8
9
10
NIO复制文件
public static void nioCopyFile(String resource, String destination) throws IOException {
FileInputStream fis = new FileInputStream(resource);
FileOutputStream fos = new FileOutputStream(destination);
FileChannel readChannel = fis.getChannel();//读文件通道
FileChannel writeChannel = fos.getChannel();//写文件通道
ByteBuffer buffer = ByteBuffer.allocate(1024); //读入数据缓存
while (true) {
buffer.clear();
int len = readChannel.read(buffer); //读入数据
//len读到数据的大小
if (len == -1) {
break;
//读取完毕
}
buffer.flip();
writeChannel.write(buffer);
//写入文件
}
readChannel.close();
writeChannel.close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Buffer中有3个重要的参数:
位置(position)、容量(capactiy)和上限(limit)

参数 写模式 读模式
位置 当前缓冲区的位置,将从position的下一个位置写数据 当前缓冲区读取的位置,将从此位置后,读取数据
容量 缓存区的总容量上限 缓存区的总容量上限
上限 缓冲区的实际上线,他总是小于等于容量。通常情况下和容量相等 代表刻度的总容量,和上次写入的数据量相等
代码展示
ByteBuffer b=ByteBuffer.allocate(15); //15个字节大小的缓冲区
System.out.println("limit="+b.limit()+" capacity="+b.capacity()+" position="+b.position());
for(int i=0;i<10;i++){ //存入10个字节数据
b.put((byte)i);
}
System.out.println("limit="+b.limit()+" capacity="+b.capacity()+" position="+b.position());
b.flip(); //重置position
System.out.println("limit="+b.limit()+" capacity="+b.capacity()+" position="+b.position());
for(int i=0;i<5;i++){
System.out.print(b.get());
}
System.out.println();
System.out.println("limit="+b.limit()+" capacity="+b.capacity()+" position="+b.position()); b.flip();
System.out.println("limit="+b.limit()+" capacity="+b.capacity()+" position="+b.position());

图解
新建

存入10byte

flip

该操作会重置position,通常,将buffer从写模式转换为读 模式时需要执行此方法 flip()操作不仅重置了当前的position为0,还将limit设置到当 前position的位置

五次读操作

flip

几个重要的函数
public final Buffer rewind()
– 将position置零,并清除标志位(mark)
public final Buffer clear()
– 将position置零,同时将limit设置为capacity的大小,并清除了标志mark
public final Buffer flip()
– 先将limit设置到position所在位置,然后将position置零,并清除标志位mark – 通常在读写转换时使用

文件映射到内存
RandomAccessFile raf = new RandomAccessFile("C:\\mapfile.txt", "rw");
FileChannel fc = raf.getChannel();
//将文件映射到内存中
MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0, raf.length());
while(mbb.hasRemaining()){
System.out.print((char)mbb.get());
}
mbb.put(0,(byte)98); //修改文件
raf.close();

网络编程
多线程网络服务器的一般结构

简单案例 EchoServer
EchoServer
public static void main(String args[]) {
ServerSocket echoServer = null;
Socket clientSocket = null;
try {
echoServer = new ServerSocket(8000);
} catch (IOException e) {
System.out.println(e); }
while (true) {
try {
clientSocket = echoServer.accept(); System.out.println(clientSocket.getRemoteSocketAddress() + " connect!");
tp.execute(new HandleMsg(clientSocket));
} catch (IOException e) {
System.out.println(e); }
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
主线程在这里制作一个接收accept的功能。

HandleMsg
static class HandleMsg implements Runnable{
//省略部分信息
public void run(){
try {
is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
os = new PrintWriter(clientSocket.getOutputStream(), true);
// 从InputStream当中读取客户端所发送的数据
String inputLine = null;
long b=System.currentTimeMillis();
while ((inputLine = is.readLine()) != null) {
os.println(inputLine);
}
long e=System.currentTimeMillis();
System.out.println("spend:"+(e-b)+"ms");
} catch (IOException e) {
e.printStackTrace(); }finally{
//关闭资源
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
读到什么就回写什么

EchoServer的客户端
public static void main(String[] args) throws IOException {
Socket client = null;
PrintWriter writer = null;
BufferedReader reader = null;
try {
client = new Socket();
client.connect(new InetSocketAddress("localhost", 8000)); writer = new PrintWriter(client.getOutputStream(), true); writer.println("Hello!");
writer.flush();
reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
System.out.println("from server: " + reader.readLine());
} catch{
} finally {
//省略资源关闭
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
问题:
– 为每一个客户端使用一个线程,如果客户端出现延时等异常,线程可能会被占用很长时间。因为数据的准备和读取都在这个线程中。
– 此时,如果客户端数量众多,可能会消耗大量的系统资源

解决
– 非阻塞的NIO
– 数据准备好了在工作
注意:io真正的工作分为准备和读取两个部分。nio就是在准备玩了之后才会分配读取的操作。

模拟低效的客户端
private static ExecutorService tp=Executors.newCachedThreadPool(); private static final int sleep_time=1000*1000*1000;
public static class EchoClient implements Runnable{
public void run(){
try {
client = new Socket();
client.connect(new InetSocketAddress("localhost", 8000));
writer = new PrintWriter(client.getOutputStream(), true); writer.print("H");
LockSupport.parkNanos(sleep_time);
writer.print("e");
LockSupport.parkNanos(sleep_time);
writer.print("l");
LockSupport.parkNanos(sleep_time);
writer.print("l");
LockSupport.parkNanos(sleep_time);
writer.print("o");
LockSupport.parkNanos(sleep_time);
writer.print("!");
LockSupport.parkNanos(sleep_time);
writer.println();
writer.flush();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
服务器输出
spend:6000ms
spend:6000ms
spend:6000ms
spend:6001ms
spend:6002ms
spend:6002ms
spend:6002ms
spend:6002ms
spend:6003ms
spend:6003ms
1
2
3
4
5
6
7
8
9
10
这里的6秒花在了哪里?
注意上面的代码块

long b=System.currentTimeMillis();
while ((inputLine = is.readLine()) != null) {
os.println(inputLine);
}
long e=System.currentTimeMillis();
System.out.println("spend:"+(e-b)+"ms");
1
读和写一共花了六秒,读的时候花了很多时间读不到

网络编程NIO

把数据准备好了再通知我 Channel有点类似于流,一个Channel可以和文件或者网络Socket对应
Selector:多路复用选择器 Selector,它是NIO编程的基础,非常重要,多路复用器提供选择已经就绪的任务的能力。简单说,就是Selector会不断地轮询注册在其上的通道(Channel) 假如某个通道发生了读写操作,这个通道就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以取得就绪的Channel集合,从而进行后续的IO操作。一个Selector可以负责成千上万Channel通道,没有上线,这也是JDK使用了epoll代替了传统的select实现,获得连接句柄没有限制。这就意味着 我们只需要一个线程负责Selector的轮询,就可以接入成千上万个客户端。
Selector准备好数据后,返回SelectionKey SelectionKey表示一对Selector和Channel的关系, 从SelectionKey中可以得到Channnel(数据已经准备), 并读取数据
select()和selectNow():
select方法如果没有一个channel准备好数据的话,就会出现阻塞,selectNow不论有没有准备好,都会有一个返回值,不会出现阻塞。

参考代码
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;

public class NioTest {
private void StartServer ()throws Exception{
final Selector selector = SelectorProvider.provider().openSelector();
ServerSocketChannel ssc = ServerSocketChannel.open();
//将ServerSocketChannel设置为非阻塞的,accept不会一直等待,即数据准备好了,给个通知。
ssc.configureBlocking(false);
InetSocketAddress isa = new InetSocketAddress(8000);
ssc.socket().bind(isa);
//给channel注册一个感兴趣的事件,即accept事件,如果有人accept,selector就告诉ssc
SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT);
for (; ; ) {
//已经有数据准备好了(读、写、accept)
selector.select();
// if (selector.selectNow() == 0) {
// continue;
// }
Set readyKeys = selector.selectedKeys();
Iterator i = readyKeys.iterator();
long e = 0;
while (i.hasNext()) {
SelectionKey sk = (SelectionKey) i.next();
//remove掉,是为了防止后面重复处理。
i.remove();
//如果是一个accept的事件
if (sk.isAcceptable()) {
doAccept(sk);
}
//如果是一个读的事件,读已经准备好了
else if(sk.isValid()&&sk.isReadable()){
// 将时间做一个记录
if (!geym_time_stat.containsKey(((SocketChannel) sk.channel()).socket())) {
geym_time_stat.put(((SocketChannel) sk.channel()).socket(),System.currentTimeMillis() );
doRead(sk);
}
}
//如果是一个写的事件
else if (sk.isValid() && sk.isWritable()) {
doWrite(sk);
e = System.currentTimeMillis();
//耗时的统计
long b = geym_time_stat.remove(((SocketChannel) sk.channel()).socket());
System.out.println("spend:"+(e-b)+"ms");
}
}
}
class HandleMsg implements Runnable{
SelectionKey sk;
ByteBuffer bb;

public HandleMsg(SelectionKey sk,ByteBuffer bb){
this.sk= sk;
this.bb = bb;
}
public void run() {
// 将之前添加的附件拿出来
EchoClient echoClient = (EchoClient) sk.attachment();
echoClient.enqueue(bb);
// 将SelectionKey感兴趣的事件修改为OP_READ和OP_WRITE;
sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
// 强迫Selector立即返回
selector.wakeup();
}
}
}
private void doRead(SelectionKey sk){
SocketChannel channel = (SocketChannel) sk.channel();
ByteBuffer bb = ByteBuffer.allocate(8192);
int len;
try{
len = channel.read(bb);
if (len < 0) {
disconnect(sk);
return;
}
} catch (IOException e) {
System.out.println("Field to read from client");
e.printStackTrace();
disconnect(sk);
return;
}
bb.flip();
tp.excute(new HandleMsg(sk, bb));
}
private void doWrite(SelectionKey sk){
SelectableChannel channel = sk.channel();
EchoClient echoClient = (EchoClient) sk.attachment();
LinkedList<ByteBuffer> outq = echoClient.getOutputQueue();
ByteBuffer bb = outq.getLast();
try {
// 回写到channel中
int len = channel.write(bb);
if (len == -1) {
disconnect(sk);
return;
}
if (bb.remaining() == 0) {
outq.removeLast();
}
} catch (Exception e) {
System.out.println("Faild to write to client .");
e.printStackTrace();
disconnect(sk);
}
//队列中的数据已经回写完毕的话
if (outq.size() == 0) {
// 将对写事件感兴趣去掉。
sk.interestOps(SelectionKey.OP_READ);
}
}
private void doAccept(SelectionKey sk){
ServerSocketChannel server = (ServerSocketChannel) sk.channel();
SocketChannel clientChannel;
try {
clientChannel = server.accept();
clientChannel.configureBlocking(false);
//希望能读数据
SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);
EchoClient echoClient = new EchoClient();
// 增加一个附件给Key
clientKey.attach(echoClient);
InetAddress clientAddress = clientChannel.socket().getInetAddress();
System.out.println("Accepted connection from "+clientAddress.getHostAddress());

} catch (Exception e) {
System.out.println("Faild to accept new client");
e.printStackTrace();
}
}
class EchoClient{
LinkedList<ByteBuffer> outq;
EchoClient(){
outq = new LinkedList<ByteBuffer>();
}
// 读到数据就往里面塞
public void enqueue(ByteBuffer bb){
outq.addFirst(bb);
}
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
以上是通过nio的方式实现上面的功能,从6s多变为了几毫秒,因为只是部分代码,所以没有运行结果。
总结:
– NIO会将数据准备好后,再交由应用进行处理,数据如果没有准备好,是不会进入到应用中的,数据的读取过程依然在应用线程中完成 ,等待的时间剥离到一个独立的线程中去等待(selector.select();)。
– 节省数据准备时间(因为Selector可以复用)

AIO
异步io
上面的NIO只是将等待的时间剥离到少数的线程中,避免大量的线程等待,造成的资源浪费,AIO等你写完了或者读完了再通知我。不需要读,也不需要写,只需要等系统把数据都处理完了之后,把回调函数放进去,就会执行,AIO不会加快io,io本身的速度并没有加快,只是改变了原来的线程对io的处理方式,看起来是变快了。

特点总结:
读完了再通知我
不会加快IO,只是在读完后进行通知
使用回调函数,进行业务处理

基本思想
用到的类

AsynchronousServerSocketChannel
1
server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(PORT));
1
使用server上的accept方法

public abstract <A> void accept(A attachment,
CompletionHandler<AsynchronousSocketChannel,? super A>handler);
1
2
因为AIO是异步的,将accept调用了之后立即返回,但是并没有真正拿到客户端的数据,CompletionHandler<AsynchronousSocketChannel,? super A>handler这个接口的作用就是在真正accept的时候,将accept得到的数据传给handler,供后面的相关操作使用。

其他的方法
AsynchronousSocketChannel
因为是异步的,肯定不能读完了之后再返回,所以所有的方法都是立即返回的
– read

TimeUnit是超时时间
第三个函数只有一个参数,但是会返回一个future,告知读到了第几个字节
第四个函数是读到了ByteBuffer数组中,因为网络报文的前二十(举例)个字节是无用的,直接剥离掉,能直接拿到有用的数据。
– write

代码实现
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
final ByteBuffer buffer = ByteBuffer.allocate(1024);
网络编程 AIO
public void completed(AsynchronousSocketChannel result, Object attachment) {
System.out.println(Thread.currentThread().getName()); Future<Integer> writeResult=null;
try {
buffer.clear();
result.read(buffer).get(100, TimeUnit.SECONDS);
buffer.flip();
writeResult=result.write(buffer);
} catch (InterruptedException | ExecutionException e) { e.printStackTrace();
} catch (TimeoutException e) { e.printStackTrace();
} finally { try {
//执行完了之后再次执行accept,防止执行完了没事干了。
server.accept(null, this);
writeResult.get();
result.close();
} catch (Exception e) {
System.out.println(e.toString()); }
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("failed: " + exc);
}
});

上面是accept的代码实现,

为什么需要了解NIO和AIO?

转载于:https://www.cnblogs.com/sharpest/p/10850989.html

Java高并发程序设计学习笔记(八):NIO和AIO相关推荐

  1. Java高并发程序设计学习笔记(十一):Jetty分析

    转自:https://blog.csdn.net/dataiyangu/article/details/87894253 new Server() 初始化线程池 QueuedThreadPool ex ...

  2. Java高并发程序设计学习笔记(五):JDK并发包(各种同步控制工具的使用、并发容器及典型源码分析(Hashmap等))...

    转自:https://blog.csdn.net/dataiyangu/article/details/86491786#2__696 1. 各种同步控制工具的使用 1.1. ReentrantLoc ...

  3. 《实战 Java 高并发程序设计》笔记——第3章 JDK 并发包(二)

    文章目录 3.2 线程复用:线程池 3.2.1 什么是线程池 3.2.2 不要重复发明轮子:JDK 对线程池的支持 1. 固定大小的线程池 2. 计划任务 3.2.3 刨根究底:核心线程池的内部实现 ...

  4. 《实战Java高并发程序设计》github笔记和源码

    笔记 <实战Java高并发程序设计>中有很多代码范例,适合初学者通过实践入门并发编程,这本书有个问题就是前面的代码都用JDK7,第六章开始又用JDK8了 笔者做了相关笔记并整理源代码,欢迎 ...

  5. JAVA高并发程序设计(葛一鸣著)读书笔记

    本文为JAVA高并发程序设计(葛一鸣著)读书笔记.这本书对于刚刚入门的童鞋来讲可能有点深,我推荐可以先看看Java多线程编程核心技术(高洪岩著)一书. 第一章 走入并行世界 什么是同步和异步? 同步就 ...

  6. 实战java高并发程序设计-笔记进行中

    <JAVA并发编程实践>:出书时间太早,内容比较散,专业术语翻译较早和现在有差异 <Java并发编程的艺术>:手绘图较多文字内容较少,主要讲解并发实现的底层原理和面临的问题,底 ...

  7. java unsafe获取指针_【实战Java高并发程序设计 1】Java中的指针:Unsafe类

    是<实战Java高并发程序设计>第4章的几点. 如果你对技术有着不折不挠的追求,应该还会特别在意incrementAndGet() 方法中compareAndSet()的实现.现在,就让我 ...

  8. 【实战Java高并发程序设计6】挑战无锁算法

    我们已经比较完整得介绍了有关无锁的概念和使用方法.相对于有锁的方法,使用无锁的方式编程更加考验一个程序员的耐心和智力.但是,无锁带来的好处也是显而易见的,第一,在高并发的情况下,它比有锁的程序拥有更好 ...

  9. Java高并发程序设计入门

    转自:http://blog.csdn.net/johnstrive/article/details/50667557 说在前面 本文绝大部分参考<JAVA高并发程序设计>,类似读书笔记和 ...

  10. Java高并发程序设计(三)——JDK并发包(二)

    引言 好久没来学习Java高并发程序设计了,感觉在慢慢遗忘之前学过的内容,今天打算重新拾起. Condition Condition与前两章讲的Object.wait() 和Object.notify ...

最新文章

  1. css3学习 理论之渐变
  2. C语言中sizeof()的用法
  3. 什么是计算机的网络体系结构,什么是网络体系结构 网络体系结构介绍【详解】...
  4. httpurlconnection 封装_不要再封装各种Util工具类了,看看这个框架
  5. Win10系统装载ISO出现问题的解决方案
  6. 视频码率[百科词条]
  7. Ubuntu14下安装使用SVN RabbitVCS客户端
  8. SpringCloud系列------Config-Server
  9. 学习金字塔 理论的一个应用
  10. Unity3D -- 天空盒(图文)
  11. 天玑9200领跑背后,高端芯片掀起蝴蝶效应
  12. 三维城市建筑模型生产工具《geobuilding1.0》2022.03.25
  13. 如何使用JS实现banner图滚动
  14. 自学unity,该不该阻止?
  15. java利用正则表达式提取字符串中的整数和小数部分
  16. 利用python搭建“5433小游戏集成平台”
  17. NTC热敏电阻温度采集与adc转换
  18. rsync、gitlab、svn安装和使用
  19. pygame中人物上下左右移动及翻转,背景透明化
  20. PL/SQL语言必看书籍推荐

热门文章

  1. 强化学习研究什么?用白话讲就是……
  2. 快速了解Druid——实时大数据分析软件
  3. 万里航行总舵手——业务测试架构的设计
  4. 基于MVC的JavaScriptWeb富应用开发
  5. IT项目管理之系统规划
  6. 21世纪IT人才需要具有的5个鲜明特点
  7. 简单类型视图状态应用
  8. Keras:框架架构
  9. 对数线性模型:逻辑斯谛回归和最大熵模型
  10. PyQt5教程 - pyqt gui编程