文章目录

  • 概述
    • 什么是AIO?
    • AIO与NIO有什么区别?
    • AIO性能比NIO更好吗?
      • 什么是epoll?
    • AIO既然是异步的,那么如何获得操作结果?
  • Java AIO的API使用
    • 了解CompletionHandler接口
      • CompletionHandler源码
      • 解析
        • completed
        • failed
      • Demo
    • 基于AIO的简单时间服务
      • Server端实现代码
      • Client端实现代码

概述

什么是AIO?

Java 1.7升级了NIO类库,升级后的NIO类库被称为NIO 2.0,在NIO2.0中Java提供了异步文件I/O操作,同时提供了与UNIX网络编程事件驱动I/O对应的AIO(Asynchronous I/O),AIO是真正的异步非阻塞I/O,它不需要通过多路复用器(Selector)对注册的通道进行轮询操作即可实现异步读写,从而简化了NIO的编程模型。

AIO与NIO有什么区别?

  1. 按照UNIX网络编程模型区分,NIO是同步非阻塞I/O,AIO是异步非阻塞I/O;
  2. AIO与NIO的操作结果获取方式不同,NIO的操作结束后会将操作就绪的I/O放在队列中,由Selector依次循环获取处理;AIO操作结束后则会直接回调CompletionHandler的实现类的相应函数来进行处理;
  3. 处理操作结果时NIO是单线程,即由Selector依次在当前线程中进行处理,如果需要多线程处理需要自行实现,这也是为什么它是同步而非异步;而AIO在回调处理操作结果时,是多线程的,其底层设有线程池。

AIO性能比NIO更好吗?

目前的AIO与NIO底层都使用了epoll(Linux中),所以二者性能都很好,主要差异在于同步与异步,NIO是同步的,始终只有一个线程在进行结果处理,而AIO的异步回调则是基于多线程的,如果NIO结果处理中引入多线程,个人认为二者性能是相仿的。

什么是epoll?

epoll是Linux中多路复用IO接口select/poll的增强版本,select/poll模型是忙轮询,即一直不停地轮询看哪些操作已经结束可以获取操作结果了,而epoll则是将已经结束的操作的操作结果放入队列中,然后只需要遍历处理队列中的操作就可以了,避免了CPU的浪费,提升程序运行效率。

更详细具体的可以移步百科了解:

https://baike.baidu.com/item/epoll/10738144?fr=aladdin

AIO既然是异步的,那么如何获得操作结果?

AIO中提供了两种方式获取操作结果:

  1. 通过java.util.concurrent.Future类来表示异步操作的结果;
  2. 在执行异步操作时传入一个java.nio.channel,并传入CompletionHandler接口的实现类作为操作完成的回调,CompletionHandler顾名思义就是专门用来处理完成结果的。

Java AIO的API使用

了解CompletionHandler接口

CompletionHandler源码

package java.nio.channels;/*** A handler for consuming the result of an asynchronous I/O operation.** <p> The asynchronous channels defined in this package allow a completion* handler to be specified to consume the result of an asynchronous operation.* The {@link #completed completed} method is invoked when the I/O operation* completes successfully. The {@link #failed failed} method is invoked if the* I/O operations fails. The implementations of these methods should complete* in a timely manner so as to avoid keeping the invoking thread from dispatching* to other completion handlers.** @param   <V>     The result type of the I/O operation* @param   <A>     The type of the object attached to the I/O operation** @since 1.7*/public interface CompletionHandler<V,A> {/*** Invoked when an operation has completed.** @param   result*          The result of the I/O operation.* @param   attachment*          The object attached to the I/O operation when it was initiated.*/void completed(V result, A attachment);/*** Invoked when an operation fails.** @param   exc*          The exception to indicate why the I/O operation failed* @param   attachment*          The object attached to the I/O operation when it was initiated.*/void failed(Throwable exc, A attachment);
}

解析

CompletionHandler接口中有两个方法需要实现,分别是completedfailed,当操作完成时,会回调completed,出现异常失败时会回调failed。

completed

操作完成时,回调completed函数,其有result和attachment两个参数:

  • result是操作完成后的操作结果;
  • attachment是在进行回调时可以传入的附件,用于回调内的操作;

failed

操作异常时回调failed函数,其有exc和attachment两个参数:

  • exc即进行操作时出现的异常;
  • attachment和completed中的一致,为在进行回调时传入的附件,用于回调内操作;

Demo

    /*** 接收请求的结束动作处理类,当异步socket服务接收到一个请求时,会回调此handler,从而对收到的请求进行处理*/private class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, SimpleTimeServer> {@Overridepublic void completed(AsynchronousSocketChannel channel, SimpleTimeServer attachment) {...}@Overridepublic void failed(Throwable exc, SimpleTimeServer attachment) {exc.printStackTrace();...}}

基于AIO的简单时间服务

Server端实现代码

package com.dongrui.study.ioserver.aioserver;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;/*** 使用AIO实现的简单时间服务*/
public class SimpleTimeServer implements Runnable {/*** 维持服务线程的门闩*/private CountDownLatch latch;/*** 异步socket服务通道*/private AsynchronousServerSocketChannel asynchronousServerSocketChannel;private SimpleTimeServer(int port) throws IOException {//开启异步socket服务asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();//绑定端口asynchronousServerSocketChannel.bind(new InetSocketAddress(port));System.out.println("simple time server start in " + port);}@Overridepublic void run() {latch = new CountDownLatch(1);//异步socket服务接收请求System.out.println("我是监听线程:" + Thread.currentThread());asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler());try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}}/*** 接收请求的结束动作处理类,当异步socket服务接收到一个请求时,会回调此handler,从而对收到的请求进行处理*/private class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, SimpleTimeServer> {@Overridepublic void completed(AsynchronousSocketChannel channel, SimpleTimeServer attachment) {try {System.out.println("我是处理线程:" + Thread.currentThread());//循环监听,进行监听操作的是SimpleTimeServer运行的线程attachment.asynchronousServerSocketChannel.accept(attachment, this);//这里休眠20秒,可以看到当处理线程没有处理完成时,会启用新的线程来处理后面的请求Thread.sleep(20);ByteBuffer buffer = ByteBuffer.allocate(1024);//从请求通道中读取数据channel.read(buffer, buffer, new ReadCompletionHandler(channel));} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic void failed(Throwable exc, SimpleTimeServer attachment) {//接收请求失败,打印异常信息,将门闩减一,服务线程终止exc.printStackTrace();attachment.latch.countDown();}}/*** 读取数据的结束动作处理类,当系统将数据读取到buffer中,会回调此handler*/private class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {private AsynchronousSocketChannel channel;ReadCompletionHandler(AsynchronousSocketChannel channel) {this.channel = channel;}@Overridepublic void completed(Integer byteNum, ByteBuffer readBuffer) {if (byteNum <= 0)return;readBuffer.flip();byte[] body = new byte[byteNum];readBuffer.get(body);String req = new String(body, StandardCharsets.UTF_8);System.out.println("the time server received order: " + req);SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String currentTime = "query time order".equalsIgnoreCase(req) ? format.format(new Date()) + "" : "bad order";doWrite(currentTime);}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {try {//读取失败,关闭通道channel.close();} catch (IOException e) {e.printStackTrace();}}private void doWrite(String msg) {if (null != msg) {byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);writeBuffer.put(bytes);writeBuffer.flip();channel.write(writeBuffer, writeBuffer,//写操作结束的回调handlernew CompletionHandler<Integer, ByteBuffer>() {@Overridepublic void completed(Integer result, ByteBuffer buffer) {if (buffer.hasRemaining()) {channel.write(buffer, buffer, this);}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {try {channel.close();} catch (IOException e) {e.printStackTrace();}}});}}}public static void main(String[] args) {try {System.out.println("我是主线程:" + Thread.currentThread());new SimpleTimeServer(8088).run();System.out.println("监听线程已挂");} catch (IOException e) {e.printStackTrace();}}
}

Client端实现代码

package com.dongrui.study.ioclient.aioclient;import com.google.common.primitives.Bytes;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.channels.InterruptedByTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;/*** 基于AIO的简单时间服务客户端*/
public class SimpleTimeClient {private String host;private int port;private CountDownLatch latch;/*** 异步socket通道*/private AsynchronousSocketChannel channel;private SimpleTimeClient(String host, int port) throws IOException {this.host = host;this.port = port;this.latch = new CountDownLatch(1);initChannel();}private void initChannel() throws IOException {// 打开异步socket通道channel = AsynchronousSocketChannel.open();// 异步连接指定地址,连接完成后会回调ConnectionCompletionHandlerchannel.connect(new InetSocketAddress(host, port), null, new ConnectionCompletionHandler());}private class ConnectionCompletionHandler implements CompletionHandler<Void, Void> {@Overridepublic void completed(Void result, Void attachment) {System.out.println("connection thread: " + Thread.currentThread());String msg = "query time order";ByteBuffer writeBuffer = ByteBuffer.allocate(msg.length());writeBuffer.put(msg.getBytes(StandardCharsets.UTF_8)).flip();// 异步写入发送数据,写入完成后会回调WriteCompletionHandlerchannel.write(writeBuffer, null, new WriteCompletionHandler());}@Overridepublic void failed(Throwable exc, Void attachment) {exc.printStackTrace();latch.countDown();}}/*** 读取到的byte集合和缓存*/private class BufferAndArr {public BufferAndArr(List<Byte> bytesArr, ByteBuffer buffer) {this.bytesArr = bytesArr;this.buffer = buffer;}List<Byte> bytesArr;ByteBuffer buffer;}/*** 写数据完成回调处理类*/private class WriteCompletionHandler implements CompletionHandler<Integer, Void> {@Overridepublic void completed(Integer result, Void attachment) {System.out.println("write thread: " + Thread.currentThread());List<Byte> rtnBytesArr = new ArrayList<>();ByteBuffer readBuffer = ByteBuffer.allocate(1024);// 异步读取返回的数据,读取结束后会回调ReadCompletionHandlerchannel.read(readBuffer, 1000, TimeUnit.MILLISECONDS, new BufferAndArr(rtnBytesArr, readBuffer), new ReadCompletionHandler());}@Overridepublic void failed(Throwable exc, Void attachment) {exc.printStackTrace();latch.countDown();}}/*** 读数据完成回调处理类*/private class ReadCompletionHandler implements CompletionHandler<Integer, BufferAndArr> {@Overridepublic void completed(Integer bytesNum, BufferAndArr attachment) {System.out.println("read thread: " + Thread.currentThread());int size = attachment.buffer.limit();attachment.buffer.flip();byte[] tempBytes = new byte[bytesNum];attachment.buffer.get(tempBytes);attachment.bytesArr.addAll(Bytes.asList(tempBytes));// 根据读取到的数据长度与缓存总长度比较,相等则继续读取,否则读取结束if (bytesNum >= size) {attachment.buffer.clear();// 继续读取时加入超时时间,如果已经读取完,则会触发超时异常,转到fail中channel.read(attachment.buffer, 1000, TimeUnit.MILLISECONDS, attachment, new ReadCompletionHandler());} else {completionAction(attachment.bytesArr);}}@Overridepublic void failed(Throwable exc, BufferAndArr attachment) {// 当没有数据时会超时抛出InterruptedByTimeoutException异常,然后在这里处理读取结果,因为暂时没有发现更好的方法if (exc instanceof InterruptedByTimeoutException) {completionAction(attachment.bytesArr);} else {exc.printStackTrace();}}private void completionAction(List<Byte> bytesArr) {System.out.println("当前时间:" + new String(Bytes.toArray(bytesArr), StandardCharsets.UTF_8));latch.countDown();}}public static void main(String[] args) {while (true) {new Thread(() -> {try {System.out.println("time client thread: " + Thread.currentThread());SimpleTimeClient client = new SimpleTimeClient("localhost", 8088);client.latch.await();} catch (IOException | InterruptedException e) {e.printStackTrace();}}).start();try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}}}
}

Java AIO知识总结相关推荐

  1. Java基础知识回顾之七 ----- 总结篇

    前言 在之前Java基础知识回顾中,我们回顾了基础数据类型.修饰符和String.三大特性.集合.多线程和IO.本篇文章则对之前学过的知识进行总结.除了简单的复习之外,还会增加一些相应的理解. 基础数 ...

  2. JAVA基础知识学习全覆盖

    文章目录 一.JAVA基础知识 1.一些基本概念 1.Stringbuffer 2.局部变量成员变量 3.反射机制 4.protect 5.pow(x,y) 6.final ,finally,fina ...

  3. (Java实习生)每日10道面试题打卡——Java基础知识篇2

    临近秋招,备战暑期实习,祝大家每天进步亿点点! 本篇总结的是Java基础知识相关的面试题,后续会每日更新~ 1.请你说一下Java中的IO流?以及他们的分类和作用? IO 流的分类: 按照数据流的方向 ...

  4. java aio实现_深入理解Java AIO(三)—— Linux中的AIO实现

    我们调用的Java AIO底层也是要调用OS的AIO实现,而OS主要也就Windows和Linux这两大类,当然还有Solaris和mac这些小众的. 在 Windows 操作系统中,提供了一个叫做 ...

  5. java aio nio bio_3. 彤哥说netty系列之Java BIO NIO AIO进化史

    你好,我是彤哥,本篇是netty系列的第三篇. 欢迎来我的公从号彤哥读源码系统地学习源码&架构的知识. 简介 上一章我们介绍了IO的五种模型,实际上Java只支持其中的三种,即BIO/NIO/ ...

  6. 内核aio_深入理解Java AIO(三)—— Linux中的AIO实现

    我们调用的Java AIO底层也是要调用OS的AIO实现,而OS主要也就Windows和Linux这两大类,当然还有Solaris和mac这些小众的. 在 Windows 操作系统中,提供了一个叫做 ...

  7. 计算机语言之java基础知识一

    在家已经待了一个多星期了,最近学到的东西一直没有梳理,这次变梳理边分享出来,说是分享其实就是搬运一下. Java 基础知识 基本数据类型 问:7 种基本数据类型:整型.浮点型.布尔型.字符型? 答:四 ...

  8. Java基础看jvm,JAVA基础知识|java虚拟机(JVM)

    一.JVM简介 java语言是跨平台的,兼容各种操作系统.实现跨平台的基石就是虚拟机(JVM),虚拟机不是跨平台的,所以不同的操作系统需要安装不同的jdk版本(jre=jvm+类库:jdk=jre+开 ...

  9. Java基础知识强化之集合框架笔记76:ConcurrentHashMap之 ConcurrentHashMap简介

    1. ConcurrentHashMap简介: ConcurrentHashMap是一个线程安全的Hash Table,它的主要功能是提供了一组和Hashtable功能相同但是线程安全的方法.Conc ...

最新文章

  1. 《C和指针》一1.7 问题
  2. python中创建对象的七种方式
  3. c3074 无法使用带圆括号的_如何选择路面裂缝贴缝带?
  4. Rosenbrock函数到底什么用
  5. codeforces 808d
  6. C语言中 用选择结构编译算法,C语言程序设计立体化教程(高等教育立体化精品系列规划教材)...
  7. LAMP-各PHP加速器性能剖析
  8. 阶段1 语言基础+高级_1-3-Java语言高级_06-File类与IO流_08 转换流_6_练习_转换文件编码...
  9. vncviewer远程连linux白屏,vncviewer远程链接桌面linux
  10. 现代操作系统(原书第4版)英文中文答案
  11. win10怎么手动修改自己的IP地址
  12. P2P直播软件设计的技术原理和改进
  13. doc 问卷调查模板表_调查问卷模板.doc
  14. 【SSL证书】如何使用 FreeSSL (V2.8.0) 申请免费证书及安装
  15. edge浏览器打开html文件路径被拆分,Win10默认浏览器被强制修改为Edge的两种解决方法...
  16. Ubuntu18.04安装Nvidia显卡驱动教程
  17. 爬虫学习笔记(六)——Scrapy框架(一):安装、运行流程及简单使用
  18. 【微信H5】分享出去是链接,不是卡片的原因及解决方案
  19. 展锐服务器芯片,芯片“代号”背后的故事,你了解多少?
  20. JavaScript实现H5游戏断线自动重连的技术

热门文章

  1. 逗号运算符的简单分析和用法
  2. bzoj1663 [Usaco2006 Open]赶集 (最短路)
  3. 发现一个很好的图片占位工具网。placehold.it
  4. windows 多线程(五) 互斥量(Mutex)
  5. php实现批量导入商品,destoon批量导入产品方法
  6. 全流程重构京东服务市场系统
  7. onenote离线无法同步解决办法
  8. 设计师如何放飞提升自己广告创意
  9. 2023年第三届智能制造与自动化前沿国际会议(CFIMA 2023)
  10. 以太网PHY寄存器分析【转】