如果大家对java架构相关感兴趣,可以关注下面公众号,会持续更新java基础面试题, netty, spring boot,spring cloud等系列文章,一系列干货随时送达, 超神之路从此展开, BTAJ不再是梦想!

概念

Channel 是java nio的一个基本构造。

​ 它代表一个到实体(如一个硬件设备,一个文件、一个网络套接字或者一个能够之行一个或者多个不同的I/O操作的程序组件)的开放链接,如读操作和写操作。

可以把Channel 看做是传送(入站)或者传出(出站)数据的载体。可以被打开或者被关闭,链接或者断开连接。

其UML图:

分类

Channel:是对网络Socket的封装,抽象了网络I/O的读、写、连接与绑定。

AbstractChannel:实现了Channel接口的大部分功能,一次连接用到的Channel、ChannelId、eventLoop、pipelIne、unsafe都会保存在这里。

AbstractNioChannel:通过select的方式对读写事件进行监听。

客户端Channel:主要注册read与write事件,关注于具体数据的读写。

服务端Channel:主要注册accept事件,关注于具体连接的接入,这也是与客户端Channel的read事件最主要的区别。

所有方法,如图:

Channel重要方法和参数

eventLoop: 返回分配给Channel 的EventLoop
pipeline: 返回分配给Channel 的ChannelPipeline
isActive: 如果Channel 是活动的,则返回true。活动的意义可能依赖于底层的传输。例如,一个Socket 传输一旦连接到了远程节点便是活动的,而一个Datagram 传输一旦被打开便是活动的。
localAddress: 返回本地的SokcetAddress
remoteAddress: 返回远程的SocketAddress
write: 将数据写到远程节点。这个数据将被传递给ChannelPipeline,并且排队直到它被冲刷
flush: 将之前已写的数据冲刷到底层传输,如一个Socket
writeAndFlush: 一个简便的方法,等同于调用write()并接着调用flush()

生命周期状态

ChannelUnregistered :Channel 已经被创建,但还未注册到EventLoop
ChannelRegistered :Channel 已经被注册到了EventLoop
ChannelActive :Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了
ChannelInactive :Channel 没有连接到远程节点
当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给ChannelPipeline 中的ChannelHandler,其可以随后对它们做出响应。

状态的切换:

相关源码

1. AbstractChannel

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {// 父 Channel(NioServerSocketChannel 是没有父channel的)private final Channel parent;// Channel 唯一IDprivate final ChannelId id;// Unsafe 对象,封装 ByteBuf 的读写操作private final Unsafe unsafe;// 关联的 Pipeline 对象private final DefaultChannelPipeline pipeline;private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);private final CloseFuture closeFuture = new CloseFuture(this);// 本地地址private volatile SocketAddress localAddress;//远端地址private volatile SocketAddress remoteAddress;// EventLoop 封装的 Selector,channel所注册的eventLoopprivate volatile EventLoop eventLoop;// 是否注册private volatile boolean registered;private boolean closeInitiated;/** Cache for the string representation of this channel */private boolean strValActive;private String strVal;

构造函数

protected AbstractChannel(Channel parent, ChannelId id) {this.parent = parent;this.id = id;unsafe = newUnsafe();pipeline = newChannelPipeline();
}// Unsafe 实现交给子类实现
protected abstract AbstractUnsafe newUnsafe();// 创建 DefaultChannelPipeline 对象
protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);
}

说明

1.Unsafe类里实现了具体的连接与写数据。比如:网络的读,写,链路关闭,发起连接等。之所以命名为unsafe是不希望外部使用,并非是不安全的。

2.DefaultChannelPipeline 只是一个 Handler 的容器,也可以理解为一个Handler链,具体的逻辑由Handler处理,而每个Handler都会分配一个EventLoop,最终的请求还是要EventLoop来执行,而EventLoop中又调用Channel中的内部类Unsafe对应的方法。
新建一个channel会自动创建一个ChannelPipeline。

3.这里创建 DefaultChannelPipeline,构造中传入当前的 Channel,而读写数据都是在 ChannelPipeline 中进行的,ChannelPipeline 进行读写数据又委托给 Channel 中的 Unsafe 进行操作。

2.AbstractNioChannel

public abstract class AbstractNioChannel extends AbstractChannel {// 抽象了 SocketChannel 和 ServerSocketChannel 的公共的父类//Socketchannle和ServerSocketChannel的公共操作类,用来设置SelectableChannel相关参数和IO操作private final SelectableChannel ch;// SelectionKey.OP_READ 读事件protected final int readInterestOp;// 注册到 selector 上返回的 selectorKeyvolatile SelectionKey selectionKey;// 是否还有未读的数据boolean readPending;private final Runnable clearReadPendingRunnable = new Runnable() {@Overridepublic void run() {clearReadPending0();}};/*** The future of the current connection attempt.  If not null, subsequent* connection attempts will fail.*/// 连接操作的结果private ChannelPromise connectPromise;// 连接超时定时任务private ScheduledFuture<?> connectTimeoutFuture;// 客户端地址private SocketAddress requestedRemoteAddress;....

核心方法:

  //核心操作,注册操作//1) 如果当前注册返回的selectionKey已经被取消,则抛出CancelledKeyException异常,捕获该异常进行处理。
//2) 如果是第一次处理该异常,调用多路复用器的selectNow()方法将已经取消的selectionKey从多路复用器中删除掉。操作成功之后,将selected置为true, 说明之前失效的selectionKey已经被删除掉。继续发起下一次注册操作,如果成功则退出,
//3) 如果仍然发生CancelledKeyException异常,说明我们无法删除已经被取消的selectionKey,按照JDK的API说明,这种意外不应该发生。如果发生这种问题,则说明可能NIO的相关类库存在不可恢复的BUG,直接抛出CancelledKeyException异常到上层进行统一处理。protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// Force the Selector to select now as the "canceled" SelectionKey may still be// cached and not removed because no Select.select(..) operation was called yet.eventLoop().selectNow();selected = true;} else {// We forced a select operation on the selector before but the SelectionKey is still cached// for whatever reason. JDK bug ?throw e;}}}}}

SelectionKey 常量值

public abstract class SelectionKey {public static final int OP_READ = 1 << 0;   //读操作位public static final int OP_WRITE = 1 << 2;  //写操作位public static final int OP_CONNECT = 1 << 3;  //客户端连接操作位public static final int OP_ACCEPT = 1 << 4;  //服务端接受连接操作位//如果注册的操作位为0表示只是完成注册功能,说明对任何事件都不感兴趣

doBeginRead() 读之前的准备

  @Overrideprotected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;   //key无效的话直接返回}readPending = true;  //表示读pending中final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {  //表示当前没有读操作位selectionKey.interestOps(interestOps | readInterestOp);  //设置读操作位}}//SelectionKey中定义的是否可读操作public final boolean isReadable() {return (readyOps() & OP_READ) != 0;}

3.AbstractNioByteChannel

doWrite操作

配置中设置循环次数是避免半包中数据量过大,IO线程一直尝试写操作,此时IO线程无法处理其他IO操作或者定时任务,比如新的消息或者定时任务,如果网络IO慢或者对方读取慢等造成IO线程假死的状态.

    @Overrideprotected void doWrite(ChannelOutboundBuffer in) throws Exception {int writeSpinCount = -1; //        写自选次数boolean setOpWrite = false;  //写操作位为0for (;;) {Object msg = in.current();if (msg == null) {  //从环形数组ChannelOutboundBuffer弹出一条消息,如果为null,表示消息已经发送完成,// Wrote all messages.clearOpWrite();  //清除写标志位,退出循环// Directly return here so incompleteWrite(...) is not called.return;}if (msg instanceof ByteBuf) {ByteBuf buf = (ByteBuf) msg;int readableBytes = buf.readableBytes();if (readableBytes == 0) { //如果可读字节为0,则丢弃该消息,循环处理其他消息in.remove();continue;}boolean done = false;    //消息是否全部发送完毕表示long flushedAmount = 0;  //发送的字节数量if (writeSpinCount == -1) {//如果为-1的时候从配置中获取写循环次数writeSpinCount = config().getWriteSpinCount();}for (int i = writeSpinCount - 1; i >= 0; i --) {int localFlushedAmount = doWriteBytes(buf);  //由子类实现写if (localFlushedAmount == 0) {  //这里表示本次发送字节为0,发送TCP缓冲区满了,所以此时为了避免空循环一直发送,这里就将半包写表示设置为true并退出循环setOpWrite = true;break;}//发送成功就对发送的字节计数flushedAmount += localFlushedAmount;if (!buf.isReadable()) { //如果没有可读字节,表示已经发送完毕done = true; //表示发送完成,并退出循环break;}}//通知promise当前写的进度in.progress(flushedAmount); if (done) {  //如果发送完成,移除缓冲的数据in.remove();} else {如果没有完成会调用incompleteWrite方法// Break the loop and so incompleteWrite(...) is called.break;}} else if (msg instanceof FileRegion) {  //这个是文件传输和上面类似FileRegion region = (FileRegion) msg;boolean done = region.transferred() >= region.count();if (!done) {long flushedAmount = 0;if (writeSpinCount == -1) {writeSpinCount = config().getWriteSpinCount();}for (int i = writeSpinCount - 1; i >= 0; i--) {long localFlushedAmount = doWriteFileRegion(region);if (localFlushedAmount == 0) {setOpWrite = true;break;}flushedAmount += localFlushedAmount;if (region.transferred() >= region.count()) {done = true;break;}}in.progress(flushedAmount);}if (done) {in.remove();} else {// Break the loop and so incompleteWrite(...) is called.break;}} else {// Should not reach here.throw new Error();}}//如果没有完成写看看需要做的事情incompleteWrite(setOpWrite);}//未完成写操作,看看操作protected final void incompleteWrite(boolean setOpWrite) {// Did not write completely.if (setOpWrite) {  //如果当前的写操作位true,那么当前多路复用器继续轮询处理setOpWrite();} else {  //否则重新新建一个task任务,让eventLoop后面点执行flush操作,这样其他任务才能够执行// Schedule flush again later so other tasks can be picked up in the meantimeRunnable flushTask = this.flushTask;if (flushTask == null) {flushTask = this.flushTask = new Runnable() {@Overridepublic void run() {flush();}};}eventLoop().execute(flushTask);}}

4.AbstractNioMessageChannel

doWrite操作

    @Overrideprotected void doWrite(ChannelOutboundBuffer in) throws Exception {final SelectionKey key = selectionKey();final int interestOps = key.interestOps();for (;;) {Object msg = in.current();if (msg == null) {// Wrote all messages.if ((interestOps & SelectionKey.OP_WRITE) != 0) {key.interestOps(interestOps & ~SelectionKey.OP_WRITE);}break;}try {boolean done = false;for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {if (doWriteMessage(msg, in)) {done = true;break;}}if (done) {in.remove();} else {// Did not write all messages.if ((interestOps & SelectionKey.OP_WRITE) == 0) {key.interestOps(interestOps | SelectionKey.OP_WRITE);}break;}} catch (Exception e) {if (continueOnWriteError()) {in.remove(e);} else {throw e;}}}}

AbstractNioMessageChannel 和AbstractNioByteChannel的消息发送实现比较相似,

不同之处在于:一个发送的是ByteBuf或者FileRegion,它们可以直接被发送;另一个发送的则是POJO对象。

如果大家对java架构相关感兴趣,可以关注下面公众号,会持续更新java基础面试题, netty, spring boot,spring cloud等系列文章,一系列干货随时送达, 超神之路从此展开, BTAJ不再是梦想!

详尽Netty(三):Channel相关推荐

  1. netty自定义channel id

    netty自定义channel id.netty custom channel id 搞搞netty时发现默认的id很长,无法直接自定义. 于是我网上搜索了search一下,发现没有相关文章,那就自己 ...

  2. (三)Netty之Channel通道

    通道(Channel)基本介绍 NIO 的通道类似于流,但有些区别: 通道可以同时进行读写,而流只能读或者只能写 通道可以实现异步读写数据 通道可以从缓冲读数据,也可以写数据到缓冲 BIO中的stre ...

  3. Netty之Channel源代码分析

    由于Netty只是一个接口,没有实现,所以这些接口的作用也只能从注释上一探究竟,具体的用法需要在实现类中研究 1.api 2.各个api的作用 2.API功能说明 - 1)Channel read() ...

  4. Netty(三) 什么是 TCP 拆、粘包?如何解决?

    前言 记得前段时间我们生产上的一个网关出现了故障. 这个网关逻辑非常简单,就是接收客户端的请求然后解析报文最后发送短信. 但这个请求并不是常见的 HTTP ,而是利用 Netty 自定义的协议. 有个 ...

  5. Netty教程-Channel

    Channel是顶层接口,继承了AttributeMap, ChannelOutboundInvoker, ChannelPropertyAccess, Comparable,它作为一个具体IO能力的 ...

  6. netty的channel配置参数详解

    参考开源项目常用配置:http://www.programcreek.com/java-api-examples/index.php?api=io.netty.channel.ChannelOptio ...

  7. netty的channel介绍

    Channel的基本方法 id():返回此通道的全局唯一标识符. isActive():如果通道处于活动状态并连接,则返回true. isOpen():如果通道打开并且可能稍后激活,则返回true. ...

  8. NIO 学习(三) channel(主要介绍channel----FileChannel详解--通道间的信息传输)

    Channel实现 Java NIO中最重要的通道的实现: FileChannel :从文件中读写数据 DatagramChannel :通过UDP读写网络中的数据 SocketChannel  :通 ...

  9. Netty之Channel的继承关系

    我们学习的顺序将是从下往上进行!

最新文章

  1. 人工智能新手入门学习路线!附学习资源合集
  2. 移动端应用类型及特点
  3. java url特殊字符处理_简单实例处理url特殊符号处理(2种方法)
  4. vue项目(webpack+mintui),使用hbuilder打包app - 小小人儿大大梦想 - 博客园
  5. top,parent,opener,iframe
  6. Python3中queue模块的使用
  7. Matlab之矩阵行列式、秩、迹的求解
  8. LSH︱python实现局部敏感哈希——LSHash(二)
  9. docker 厂商 容器_我终于不用再解释Docker了!
  10. 3月2日 雾霾图像清晰化处理,第1人称相机世界的坐标系,焦距、焦点、调焦、超焦距、视场角、滑动变焦Dolly zooming,相机内参
  11. idea2017+kemulator搭建J2ME开发环境
  12. /proc/sys/vm/drop_caches
  13. 相机视场角和焦距_镜头焦距和视场角介绍!
  14. 【毕业设计源码】基于JAVA的微信小程序直播在线教育平台
  15. 底部菜单控件BottomNavigationView的使用
  16. 百度快照劫持是什么意思?如何解决百度快照被劫持、百度劫持
  17. 一:log4j2配置文档
  18. 数据链路层-1 什么是数据链路层和工作原理
  19. uefi +gpt 系统安装 和 传统legacy + mbr 的区别
  20. 代尔夫特理工大学计算机科学排名,2019-2020代尔夫特理工大学世界排名多少【QS最新第50名】...

热门文章

  1. TUSB3410驱动
  2. python dtype=float 是32还是64_【Python】numpy数组用dtype=float16初始化的坑
  3. 红队渗透靶场之W1R3S靶场(超详细!)
  4. InstallShield 2015 打包vs2015编辑的.net程序 生成setup单文件
  5. 计算机科学计算百度云,科学计算
  6. 当ChatGPT遇到网络安全!如何入门网络安全?
  7. Whole Test Suit Generation
  8. 黑暗之光三部曲 黑暗之光三部曲小说
  9. Java 格林威治时间字符串格式化
  10. 异星探险家自建服务器,异星探险家怎么开服务器 服务器创造详解