ByteBuf缓冲区

Netty提供了ByteBuf来替代Java NIO的ByteBuffer缓冲区,以操纵内存缓冲区。

ByteBuf与Java NIO的byteBuffer的对比

ByteBuf的优势

  • Pooling (池化,这点减少了内存复制和GC,提升了效率)
  • 复合缓冲区类型,支持零复制
  • 不需要调用flip()方法去切换读/写模式
  • 扩展性好,例如StringBuffer· 可以自定义缓冲区类型
  • 读取和写入索引分开
  • 方法的链式调用
  • 可以进行引用计数,方便重复使用

ByteBuf的逻辑部分

ByteBuf是一个字节容器,内部是一个字节数组。从逻辑上来分,字节容器内部可以分为四个部分,具体如图6-14所示。


第一个部分是已用字节,表示已经使用完的废弃的无效字节;

第二部分是可读字节,这部分数据是ByteBuf保存的有效数据,从ByteBuf中读取的数据都来自这一部分;

第三部分是可写字节,写入到ByteBuf的数据都会写到这一部分中;

第四部分是可扩容字节,表示的是该ByteBuf最多还能扩容的大小。

ByteBuf的重要属性

ByteBuf通过三个整型的属性有效地区分可读数据和可写数据,使得读写之间相互没有冲突。这三个属性定义在AbstractByteBuf抽象类中,分别是

  • readerIndex(读指针)

  • writerIndex(写指针)

  • maxCapacity(最大容量)

ByteBuf的这三个重要属性,如图6-15所示。

这三个属性的详细介绍如下:

  • readerIndex(读指针):指示读取的起始位置。每读取一个字节,readerIndex自动增加1。一旦readerIndex与writerIndex相等,则表示ByteBuf不可读了。
  • writerIndex(写指针):指示写入的起始位置。每写一个字节,writerIndex自动增加1。一旦增加到writerIndex与capacity()容量相等,则表示ByteBuf已经不可写了。capacity()是一个成员方法,不是一个成员属性,它表示ByteBuf中可以写入的容量。注意,它不是最大容量maxCapacity。
  • maxCapacity(最大容量):表示ByteBuf可以扩容的最大容量。当向ByteBuf写数据的时候,如果容量不足,可以进行扩容。扩容的最大限度由maxCapacity的值来设定,超过maxCapacity就会报错。

ByteBuf的三组方法

ByteBuf的方法大致可以分为三组。

第一组:容量系列

  • capacity():表示ByteBuf的容量,它的值是以下三部分之和:废弃的字节数、可读字节数和可写字节数。
  • maxCapacity():表示ByteBuf最大能够容纳的最大字节数。当向ByteBuf中写数据的时候,如果发现容量不足,则进行扩容,直到扩容到maxCapacity设定的上限。

第二组:写入系列

  • isWritable() :表示ByteBuf是否可写。如果capacity()容量大于writerIndex指针的位置,则表示可写,否则为不可写。注意:如果isWritable()返回false,并不代表不能再往ByteBuf中写数据了。如果Netty发现往ByteBuf中写数据写不进去的话,会自动扩容ByteBuf。
  • writableBytes() :取得可写入的字节数,它的值等于容量capacity()减去writerIndex。
  • maxWritableBytes() :取得最大的可写字节数,它的值等于最大容量maxCapacity减去writerIndex。
  • writeBytes(byte[] src) :把src字节数组中的数据全部写到ByteBuf。这是最为常用的一个方法。
  • writeTYPE(TYPE value):写入基础数据类型的数据。TYPE表示基础数据类型,包含了8大基础数据类型。具体如下:writeByte()、 writeBoolean()、writeChar()、writeShort()、writeInt()、writeLong()、writeFloat()、writeDouble()。
  • setTYPE(TYPE value):基础数据类型的设置,不改变writerIndex指针值,包含了8大基础数据类型的设置。具体如下:setByte()、 setBoolean()、setChar()、setShort()、setInt()、setLong()、setFloat()、setDouble()。setType系列与writeTYPE系列的不同:setType系列不改变写指针writerIndex的值;writeTYPE系列会改变写指针writerIndex的值。
  • markWriterIndex()与resetWriterIndex():这两个方法一起介绍。前一个方法表示把当前的写指针writerIndex属性的值保存在markedWriterIndex属性中;后一个方法表示把之前保存的markedWriterIndex的值恢复到写指针writerIndex属性中。markedWriterIndex属性相当于一个暂存属性,也定义在AbstractByteBuf抽象基类中。

第三组:读取系列

  • isReadable( ) :返回ByteBuf是否可读。如果writerIndex指针的值大于readerIndex指针的值,则表示可读,否则为不可读。

  • readableBytes( ) :返回表示ByteBuf当前可读取的字节数,它的值等于writerIndex减去readerIndex。

  • readBytes(byte[] dst):读取ByteBuf中的数据。将数据从ByteBuf读取到dst字节数组中,这里dst字节数组的大小,通常等于readableBytes()。这个方法也是最为常用的一个方法之一。

  • readType():读取基础数据类型,可以读取8大基础数据类型。具体如下:readByte()、readBoolean()、readChar()、readShort()、readInt()、readLong()、readFloat()、readDouble()。

  • getTYPE(TYPE value):读取基础数据类型,并且不改变指针值。具体如下:getByte()、 getBoolean()、getChar()、getShort()、getInt()、getLong()、getFloat()、getDouble()。getType系列与readTYPE系列的不同:getType系列不会改变读指针readerIndex的值;readTYPE系列会改变读指针readerIndex的值。

  • markReaderIndex( )与resetReaderIndex( ) :这两个方法一起介绍。前一个方法表示把当前的读指针ReaderIndex保存在markedReaderIndex属性中。后一个方法表示把保存在markedReaderIndex属性的值恢复到读指针ReaderIndex中。markedReaderIndex属性定义在AbstractByteBuf抽象基类中。

ByteBuf基本使用的实践案例

ByteBuf的基本使用分为三部分:

(1)分配一个ByteBuf实例;

(2)向ByteBuf写数据;

(3)从ByteBuf读数据。

这里用了默认的分配器,分配了一个初始容量为9,最大限制为100个字节的缓冲区。关于ByteBuf实例的分配,稍候具体详细介绍。

实战代码很简单,具体如下:

        package com.crazymakercircle.netty.bytebuf;//....public class WriteReadTest {@Testpublic void testWriteRead() {ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);print("动作:分配ByteBuf(9, 100)", buffer);buffer.writeBytes(new byte[]{1, 2, 3, 4});print("动作:写入4个字节 (1,2,3,4)", buffer);Logger.info("start==========:get==========");getByteBuf(buffer);print("动作:取数据ByteBuf", buffer);Logger.info("start==========:read==========");readByteBuf(buffer);print("动作:读完ByteBuf", buffer);}//取字节private void readByteBuf(ByteBuf buffer) {while (buffer.isReadable()) {Logger.info("取一个字节:" + buffer.readByte());}}//读字节,不改变指针private void getByteBuf(ByteBuf buffer) {for (int i = 0; i<buffer.readableBytes(); i++) {Logger.info("读一个字节:" + buffer.getByte(i));}}}

运行的结果,节选如下:

        //...[main|PrintAttribute:print]:after ===========动作:分配ByteBuf(9,100)============[main|PrintAttribute:print]:1.0 isReadable(): false[main|PrintAttribute:print]:1.1 readerIndex(): 0[main|PrintAttribute:print]:1.2 readableBytes(): 0[main|PrintAttribute:print]:2.0 isWritable(): true[main|PrintAttribute:print]:2.1 writerIndex(): 0[main|PrintAttribute:print]:2.2 writableBytes(): 9[main|PrintAttribute:print]:3.0 capacity(): 9[main|PrintAttribute:print]:3.1 maxCapacity(): 100[main|PrintAttribute:print]:3.2 maxWritableBytes(): 100//...[main|PrintAttribute:print]:after ===========动作:写入4个字节(1,2,3,4)===========[main|PrintAttribute:print]:1.0 isReadable(): true[main|PrintAttribute:print]:1.1 readerIndex(): 0[main|PrintAttribute:print]:1.2 readableBytes(): 4[main|PrintAttribute:print]:2.0 isWritable(): true[main|PrintAttribute:print]:2.1 writerIndex(): 4[main|PrintAttribute:print]:2.2 writableBytes(): 5[main|PrintAttribute:print]:3.0 capacity(): 9[main|PrintAttribute:print]:3.1 maxCapacity(): 100[main|PrintAttribute:print]:3.2 maxWritableBytes(): 96//...[main|PrintAttribute:print]:after ===========动作:取数据ByteBuf============[main|PrintAttribute:print]:1.0 isReadable(): true[main|PrintAttribute:print]:1.1 readerIndex(): 0[main|PrintAttribute:print]:1.2 readableBytes(): 4[main|PrintAttribute:print]:2.0 isWritable(): true[main|PrintAttribute:print]:2.1 writerIndex(): 4[main|PrintAttribute:print]:2.2 writableBytes(): 5[main|PrintAttribute:print]:3.0 capacity(): 9[main|PrintAttribute:print]:3.1 maxCapacity(): 100[main|PrintAttribute:print]:3.2 maxWritableBytes(): 96//...[main|PrintAttribute:print]:after ===========动作:读完ByteBuf============[main|PrintAttribute:print]:1.0 isReadable(): false[main|PrintAttribute:print]:1.1 readerIndex(): 4[main|PrintAttribute:print]:1.2 readableBytes(): 0[main|PrintAttribute:print]:2.0 isWritable(): true[main|PrintAttribute:print]:2.1 writerIndex(): 4[main|PrintAttribute:print]:2.2 writableBytes(): 5[main|PrintAttribute:print]:3.0 capacity(): 9[main|PrintAttribute:print]:3.1 maxCapacity(): 100[main|PrintAttribute:print]:3.2 maxWritableBytes(): 96

可以看到,使用get取数据是不会影响ByteBuf的指针属性值的。由于篇幅原因,这里不仅省略了很多的输出结果,还省略了print方法的源代码,它的作用是打印ByteBuf的属性值。建议打开源代码工程,查看和运行本案例的代码。

ByteBuf的引用计数

Netty的ByteBuf的内存回收工作是通过引用计数的方式管理的。JVM中使用“计数器”(一种GC算法)来标记对象是否“不可达”进而收回(注:GC是Garbage Collection的缩写,即Java中的垃圾回收机制), Netty也使用了这种手段来对ByteBuf的引用进行计数。Netty采用“计数器”来追踪ByteBuf的生命周期,一是对Pooled ByteBuf的支持,二是能够尽快地“发现”那些可以回收的ByteBuf(非Pooled),以便提升ByteBuf的分配和销毁的效率。

插个题外话:什么是Pooled(池化)的ByteBuf缓冲区呢?在通信程序的执行过程中,Buffer缓冲区实例会被频繁创建、使用、释放。大家都知道,频繁创建对象、内存分配、释放内存,系统的开销大、性能低,如何提升性能、提高Buffer实例的使用率呢?从Netty4版本开始,新增了对象池化的机制。即创建一个Buffer对象池,将没有被引用的Buffer对象,放入对象缓存池中;当需要时,则重新从对象缓存池中取出,而不需要重新创建。

回到正题。引用计数的大致规则如下:在默认情况下,当创建完一个ByteBuf时,它的引用为1;每次调用retain()方法,它的引用就加1;每次调用release()方法,就是将引用计数减1;如果引用为0,再次访问这个ByteBuf对象,将会抛出异常;如果引用为0,表示这个ByteBuf没有哪个进程引用它,它占用的内存需要回收。在下面的例子中,多次用到了retain()和release()方法,运行后可以看效果:

        package com.crazymakercircle.netty.bytebuf;//....public class ReferenceTest {@Testpublic  voidtestRef(){ByteBufbuffer  =ByteBufAllocator.DEFAULT.buffer();Logger.info("after create:"+buffer.refCnt());buffer.retain();Logger.info("after retain:"+buffer.refCnt());buffer.release();Logger.info("after release:"+buffer.refCnt());buffer.release();Logger.info("after release:"+buffer.refCnt());//错误:refCnt: 0,不能再retainbuffer.retain();Logger.info("after retain:"+buffer.refCnt());}}

运行后我们会发现:最后一次retain方法抛出了IllegalReferenceCountException异常。原因是:在此之前,缓冲区buffer的引用计数已经为0,不能再retain了。也就是说:在Netty中,引用计数为0的缓冲区不能再继续使用。为了确保引用计数不会混乱,在Netty的业务处理器开发过程中,应该坚持一个原则:retain和release方法应该结对使用。简单地说,在一个方法中,调用了retain,就应该调用一次release。

        public void handlMethodA(ByteBufbyteBuf) {byteBuf.retain();try {handlMethodB(byteBuf);} finally {byteBuf.release();}}

如果retain和release这两个方法,一次都不调用呢?则在缓冲区使用完成后,调用一次release,就是释放一次。例如在Netty流水线上,中间所有的Handler业务处理器处理完ByteBuf之后直接传递给下一个,由最后一个Handler负责调用release来释放缓冲区的内存空间。当引用计数已经为0, Netty会进行ByteBuf的回收。分为两种情况:(1)Pooled池化的ByteBuf内存,回收方法是:放入可以重新分配的ByteBuf池子,等待下一次分配。(2)Unpooled未池化的ByteBuf缓冲区,回收分为两种情况:如果是堆(Heap)结构缓冲,会被JVM的垃圾回收机制回收;如果是Direct类型,调用本地方法释放外部内存(unsafe.freeMemory)。

ByteBuf的Allocator分配器

Netty通过ByteBufAllocator分配器来创建缓冲区和分配内存空间。Netty提供了ByteBufAllocator的两种实现:PoolByteBufAllocatorUnpooledByteBufAllocatorPoolByteBufAllocator(池化ByteBuf分配器)将ByteBuf实例放入池中,提高了性能,将内存碎片减少到最小;这个池化分配器采用了jemalloc高效内存分配的策略,该策略被好几种现代操作系统所采用。UnpooledByteBufAllocator是普通的未池化ByteBuf分配器,它没有把ByteBuf放入池中,每次被调用时,返回一个新的ByteBuf实例;通过Java的垃圾回收机制回收。

为了验证两者的性能,大家可以做一下对比试验:

(1)使用UnpooledByteBufAllocator的方式分配ByteBuf缓冲区,开启10000个长连接,每秒所有的连接发一条消息,再看看服务器的内存使用量的情况。实验的参考结果:在短时间内,可以看到占到10GB多的内存空间,但随着系统的运行,内存空间不断增长,直到整个系统内存被占满而导致内存溢出,最终系统宕机。

(2)把UnpooledByteBufAllocator换成PooledByteBufAllocator,再进行试验,看看服务器的内存使用量的情况。实验的参考结果:内存使用量基本能维持在一个连接占用1MB左右的内存空间,内存使用量保持在10GB左右,经过长时间的运行测试,我们会发现内存使用量都能维持在这个数量附近,系统不会因为内存被耗尽而崩溃。

在Netty中,默认的分配器为ByteBufAllocator.DEFAULT,可以通过Java系统参数(System Property)的选项io.netty.allocator.type进行配置,配置时使用字符串值:“unpooled”, “pooled”。

不同的Netty版本,对于分配器的默认使用策略是不一样的。在Netty 4.0版本中,默认的分配器为UnpooledByteBufAllocator。而在Netty 4.1版本中,默认的分配器为PooledByteBufAllocator。现在PooledByteBufAllocator已经广泛使用了一段时间,并且有了增强的缓冲区泄漏追踪机制。因此,可以在Netty程序中设置启动器Bootstrap的时候,将PooledByteBufAllocator设置为默认的分配器。

        ServerBootstrap b = new ServerBootstrap()//....//4 设置通道的参数b.option(ChannelOption.SO_KEEPALIVE, true);b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);//....

内存管理的策略可以灵活调整,这是使用Netty所带来的又一个好处。只需一行简单的配置,就能获得到池化缓冲区带来的好处。在底层,Netty为我们干了所有“脏活、累活”!这主要是因为Netty用到了Java的Jemalloc内存管理库。

使用分配器分配ByteBuf的方法有多种。下面列出主要的几种:

        package com.crazymakercircle.netty.bytebuf;//...public class AllocatorTest {@Testpublic void showAlloc() {ByteBuf buffer = null;//方法一:分配器默认分配初始容量为9,最大容量100的缓冲区buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);//方法二:分配器默认分配初始容量为256,最大容量Integer.MAX_VALUE的缓冲区buffer = ByteBufAllocator.DEFAULT.buffer();//方法三:非池化分配器,分配基于Java的堆(Heap)结构内存缓冲区buffer = UnpooledByteBufAllocator.DEFAULT.heapBuffer();//方法四:池化分配器,分配基于操作系统管理的直接内存缓冲区buffer = PooledByteBufAllocator.DEFAULT.directBuffer();//…..其他方法}}

如果没有特别的要求,使用第一种或者第二种分配方法分配缓冲区即可。

ByteBuf缓冲区的类型

介绍完了分配器的类型,再来说一下缓冲区的类型,如表6-2所示。根据内存的管理方不同,分为堆缓存区和直接缓存区,也就是Heap ByteBuf和Direct ByteBuf。另外,为了方便缓冲区进行组合,提供了一种组合缓存区。

上面三种缓冲区的类型,无论哪一种,都可以通过池化(Pooled)、非池化(Unpooled)两种分配器来创建和分配内存空间。

下面对Direct Memory(直接内存)进行一下特别的介绍:

  • Direct Memory不属于Java堆内存,所分配的内存其实是调用操作系统malloc()函数来获得的;由Netty的本地内存堆Native堆进行管理。
  • Direct Memory容量可通过-XX:MaxDirectMemorySize来指定,如果不指定,则默认与Java堆的最大值(-Xmx指定)一样。注意:并不是强制要求,有的JVM默认Direct Memory与-Xmx无直接关系。
  • Direct Memory的使用避免了Java堆和Native堆之间来回复制数据。在某些应用场景中提高了性能。
  • 在需要频繁创建缓冲区的场合,由于创建和销毁Direct Buffer(直接缓冲区)的代价比较高昂,因此不宜使用Direct Buffer。也就是说,Direct Buffer尽量在池化分配器中分配和回收。如果能将Direct Buffer进行复用,在读写频繁的情况下,就可以大幅度改善性能。
  • 对Direct Buffer的读写比Heap Buffer快,但是它的创建和销毁比普通Heap Buffer慢。
  • 在Java的垃圾回收机制回收Java堆时,Netty框架也会释放不再使用的Direct Buffer缓冲区,因为它的内存为堆外内存,所以清理的工作不会为Java虚拟机(JVM)带来压力。注意一下垃圾回收的应用场景:(1)垃圾回收仅在Java堆被填满,以至于无法为新的堆分配请求提供服务时发生;(2)在Java应用程序中调用System.gc()函数来释放内存。

三类ByteBuf使用的实践案例

首先对比介绍一下,Heap ByteBuf和Direct ByteBuf两类缓冲区的使用。它们有以下几点不同:

  • 创建的方法不同:Heap ByteBuf通过调用分配器的buffer()方法来创建;而Direct ByteBuf的创建,是通过调用分配器的directBuffer()方法。
  • Heap ByteBuf缓冲区可以直接通过array()方法读取内部数组;而Direct ByteBuf缓冲区不能读取内部数组。
  • 可以调用hasArray()方法来判断是否为Heap ByteBuf类型的缓冲区;如果hasArray()返回值为true,则表示是Heap堆缓冲,否则就不是。
  • Direct ByteBuf要读取缓冲数据进行业务处理,相对比较麻烦,需要通过getBytes/readBytes等方法先将数据复制到Java的堆内存,然后进行其他的计算。

Heap ByteBuf和Direct ByteBuf这两类缓冲区的使用对比,实践案例的代码如下:

        package com.crazymakercircle.netty.bytebuf;//...public class BufferTypeTest {final static Charset UTF_8 = Charset.forName("UTF-8");//堆缓冲区@Testpublic  void testHeapBuffer() {//取得堆内存ByteBuf heapBuf = ByteBufAllocator.DEFAULT.buffer();heapBuf.writeBytes("疯狂创客圈:高性能学习社群".getBytes(UTF_8));if (heapBuf.hasArray()) {//取得内部数组byte[] array = heapBuf.array();int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();int length = heapBuf.readableBytes();Logger.info(new String(array, offset, length, UTF_8));}heapBuf.release();}//直接缓冲区@Testpublic  void testDirectBuffer() {ByteBuf directBuf=  ByteBufAllocator.DEFAULT.directBuffer();directBuf.writeBytes("疯狂创客圈:高性能学习社群".getBytes(UTF_8));if (! directBuf.hasArray()) {int length = directBuf.readableBytes();byte[] array = new byte[length];//把数据读取到堆内存directBuf.getBytes(directBuf.readerIndex(), array);Logger.info(new String(array, UTF_8));}directBuf.release();}}

注意,如果hasArray()返回false,不一定代表缓冲区一定就是Direct ByteBuf直接缓冲区,也有可能是CompositeByteBuf缓冲区。

下面演示一下通过CompositeByteBuf来复用Header,代码如下:

        package com.crazymakercircle.netty.bytebuf;//...public class CompositeBufferTest {static Charset utf8 = Charset.forName("UTF-8");@Testpublic void byteBufComposite() {CompositeByteBufcbuf = ByteBufAllocator.DEFAULT.compositeBuffer();//消息头ByteBufheaderBuf = Unpooled.copiedBuffer("疯狂创客圈:", utf8);//消息体1ByteBufbodyBuf = Unpooled.copiedBuffer("高性能Netty", utf8);cbuf.addComponents(headerBuf, bodyBuf);sendMsg(cbuf);//在refCnt为0前,retainheaderBuf.retain();cbuf.release();cbuf = ByteBufAllocator.DEFAULT.compositeBuffer();//消息体2bodyBuf = Unpooled.copiedBuffer("高性能学习社群", utf8);cbuf.addComponents(headerBuf, bodyBuf);sendMsg(cbuf);cbuf.release();}private void sendMsg(CompositeByteBufcbuf) {//处理整个消息for (ByteBufb :cbuf) {int length = b.readableBytes();byte[] array = new byte[length];//将CompositeByteBuf中的数据复制到数组中b.getBytes(b.readerIndex(), array);//处理一下数组中的数据System.out.print(new String(array, utf8));}System.out.println();}}

在上面的程序中,向CompositeByteBuf对象中增加ByteBuf对象实例,这里调用了addComponents方法。Heap ByteBuf和Direct ByteBuf两种类型都可以增加。如果内部只存在一个实例,则CompositeByteBuf中的hasArray()方法,将返回这个唯一实例的hasArray()方法的值;如果有多个实例,CompositeByteBuf中的hasArray()方法返回false。

调用nioBuffer()方法可以将CompositeByteBuf实例合并成一个新的Java NIO ByteBuffer缓冲区(注意:不是ByteBuf)。演示代码如下:

        package com.crazymakercircle.netty.bytebuf;//...public class CompositeBufferTest {@Testpublic void intCompositeBufComposite() {CompositeByte Bufcbuf = Unpooled.compositeBuffer(3);cbuf.addComponent(Unpooled.wrappedBuffer(new byte[]{1, 2, 3}));cbuf.addComponent(Unpooled.wrappedBuffer(new byte[]{4}));cbuf.addComponent(Unpooled.wrappedBuffer(new byte[]{5, 6}));//合并成一个的缓冲区ByteBuffer nioBuffer = cbuf.nioBuffer(0, 6);byte[] bytes = nioBuffer.array();System.out.print("bytes = ");for (byte b : bytes) {System.out.print(b);}cbuf.release();}}

在以上代码中,使用到了Netty中一个非常方便的类——Unpooled帮助类,用它来创建和使用非池化的缓冲区。另外,还可以在Netty程序之外独立使用Unpooled帮助类。

ByteBuf的自动释放

查看Netty源代码,我们可以看到,Netty的Reactor反应器线程会在底层的Java NIO通道读数据时,也就是AbstractNioByteChannel.NioByteUnsafe.read()处,调用ByteBufAllocator方法,创建ByteBuf实例,从操作系统缓冲区把数据读取到Bytebuf实例中,然后调用pipeline.fireChannelRead(byteBuf)方法将读取到的数据包送入到入站处理流水线中。再看看入站处理时,入站的ByteBuf是如何自动释放的。

方式一:TailHandler自动释放

Netty默认会在ChannelPipline通道流水线的最后添加一个TailHandler末尾处理器,它实现了默认的处理方法,在这些方法中会帮助完成ByteBuf内存释放的工作。在默认情况下,如果每个InboundHandler入站处理器,把最初的ByteBuf数据包一路往下传,那么TailHandler末尾处理器会自动释放掉入站的ByteBuf实例。

如何让ByteBuf数据包通过流水线一路向后传递呢?如果自定义的InboundHandler入站处理器继承自ChannelInboundHandlerAdapter适配器,那么可以在InboundHandler的入站处理方法中调用基类的入站处理方法,演示代码如下:

        public class DomoHandler extends ChannelInboundHandlerAdapter {/*** 出站处理方法* @param ctx上下文* @param msg  入站数据包* @throws Exception可能抛出的异常*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throwsException {ByteBuf byteBuf = (ByteBuf) msg;//...省略ByteBuf的业务处理//自动释放ByteBuf的方法:调用父类的入站方法,将msg向后传递// super.channelRead(ctx, msg);}}

总体来说,如果自定义的InboundHandler入站处理器继承自ChannelInboundHandlerAdapter适配器,那么可以调用以下两种方法来释放ByteBuf内存:(1)手动释放ByteBuf。具体的方式为调用byteBuf.release()。(2)调用父类的入站方法将msg向后传递,依赖后面的处理器释放ByteBuf。具体的方式为调用基类的入站处理方法super.channelRead(ctx, msg)。

        public class DomoHandler extends ChannelInboundHandlerAdapter {/*** 出站处理方法* @param ctx上下文* @param msg  入站数据包* @throws Exception  可能抛出的异常*/@Overridepublic void channelRead(ChannelHandlerContextctx, Object msg) throwsException {ByteBufbyteBuf = (ByteBuf) msg;//...省略ByteBuf的业务处理//释放ByteBuf的两种方法// 方法一:手动释放ByteBufbyteBuf.release();//方法二:调用父类的入站方法,将msg向后传递// super.channelRead(ctx, msg);}}

方式二:SimpleChannelInboundHandler自动释放

如果Handler业务处理器需要截断流水线的处理流程,不将ByteBuf数据包送入后边的InboundHandler入站处理器,这时,流水线末端的TailHandler末尾处理器自动释放缓冲区的工作自然就失效了。在这种场景下,Handler业务处理器有两种选择:

  • 手动释放ByteBuf实例。

  • 继承SimpleChannelInboundHandler,利用它的自动释放功能。

这里,我们聚焦的是第二种选择:看看SimpleChannelInboundHandler是如何自动释放的。

以入站读数据为例,Handler业务处理器必须继承自SimpleChannelInboundHandler基类。并且,业务处理器的代码必须移动到重写的channelRead0(ctx, msg)方法中。SimpleChannelInboundHandle类的channelRead等入站处理方法,会在调用完实际的channelRead0方法后,帮忙释放ByteBuf实例。如果大家好奇,想看看SimpleChannelInboundHandler是如何释放ByteBuf的,那么就一起来看看Netty源代码。截取部分的代码如下所示:

              public abstract class SimpleChannelInboundHandler<I> extendsChannelInboundHandlerAdapter{//基类的入站方法@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throwsException {boolean release = true;try {if (acceptInboundMessage(msg)) {@SuppressWarnings("unchecked")I imsg = (I) msg;//调用实际的业务代码,必须由子类继承,并且提供实现channelRead0(ctx, imsg);} else {release = false;ctx.fireChannelRead(msg);}} finally {if (autoRelease&& release) {//释放ByteBufReferenceCountUtil.release(msg);}}}}

在Netty的SimpleChannelInboundHandler类的源代码中,执行完由子类继承的channelRead0()业务处理后,在finally语句代码段中,ByteBuf被释放了一次,如果ByteBuf计数器为零,将被彻底释放掉。再看看出站处理时,Netty是何时释放出站的ByteBuf的呢?出站缓冲区的自动释放方式:HeadHandler自动释放。在出站处理流程中,申请分配到的ByteBuf主要是通过HeadHandler完成自动释放的。出站处理用到的Bytebuf缓冲区,一般是要发送的消息,通常由Handler业务处理器所申请而分配的。例如,在write出站写入通道时,通过调用ctx.writeAndFlush(Bytebufmsg), Bytebuf缓冲区进入出站处理的流水线。在每一个出站Handler业务处理器中的处理完成后,最后数据包(或消息)会来到出站的最后一棒HeadHandler,在数据输出完成后,Bytebuf会被释放一次,如果计数器为零,将被彻底释放掉。

在Netty开发中,必须密切关注Bytebuf缓冲区的释放,如果释放不及时,会造成Netty的内存泄露(Memory Leak),最终导致内存耗尽。

ByteBuf浅层复制的高级使用方式

首先说明一下,浅层复制是一种非常重要的操作。可以很大程度地避免内存复制。这一点对于大规模消息通信来说是非常重要的。ByteBuf的浅层复制分为两种,有切片(slice)浅层复制和整体(duplicate)浅层复制。

slice切片浅层复制

ByteBuf的slice方法可以获取到一个ByteBuf的一个切片。一个ByteBuf可以进行多次的切片浅层复制;多次切片后的ByteBuf对象可以共享一个存储区域。

slice方法有两个重载版本:

(1)public ByteBuf slice()(2)public ByteBuf slice(int index, int length)

第一个是不带参数的slice方法,在内部是调用了第二个带参数的slice方法,调用大致方式为:buf.slice(buf.readerIndex(), buf.readableBytes())。也就是说,第一个无参数slice方法的返回值是ByteBuf实例中可读部分的切片。第二个带参数的slice(int index, int length) 方法,可以通过灵活地设置不同起始位置和长度,来获取到ByteBuf不同区域的切片。

一个简单的slice的使用示例代码如下:

        package com.crazymakercircle.netty.bytebuf;//....public class SliceTest {@Testpublic  voidtestSlice() {ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);print("动作:分配ByteBuf(9, 100)", buffer);buffer.writeBytes(new byte[]{1, 2, 3, 4});print("动作:写入4个字节 (1,2,3,4)", buffer);ByteBuf slice = buffer.slice();print("动作:切片slice", slice);}}

在上面代码中,输出了源ByteBuf和调用slice方法后的切片ByteBuf的三组属性值,运行结果如下:

        //…篇幅原因,省略了ByteBuf刚分配后的属性值输出[main|SliceTest:print]:after ===========动作:写入4个字节(1,2,3,4)============[main|SliceTest:print]:1.0 isReadable(): true[main|SliceTest:print]:1.1 readerIndex(): 0[main|SliceTest:print]:1.2 readableBytes(): 4[main|SliceTest:print]:2.0 isWritable(): true[main|SliceTest:print]:2.1 writerIndex(): 4[main|SliceTest:print]:2.2 writableBytes(): 5[main|SliceTest:print]:3.0 capacity(): 9[main|SliceTest:print]:3.1 maxCapacity(): 100[main|SliceTest:print]:3.2 maxWritableBytes(): 96[main|SliceTest:print]:after ===========动作:切片slice============[main|SliceTest:print]:1.0 isReadable(): true[main|SliceTest:print]:1.1 readerIndex(): 0[main|SliceTest:print]:1.2 readableBytes(): 4[main|SliceTest:print]:2.0 isWritable(): false[main|SliceTest:print]:2.1 writerIndex(): 4[main|SliceTest:print]:2.2 writableBytes(): 0[main|SliceTest:print]:3.0 capacity(): 4[main|SliceTest:print]:3.1 maxCapacity(): 4[main|SliceTest:print]:3.2 maxWritableBytes(): 0

调用slice()方法后,返回的切片是一个新的ByteBuf对象,该对象的几个重要属性值,大致如下:

  • readerIndex(读指针)的值为0。

  • writerIndex(写指针)的值为源Bytebuf的readableBytes()可读字节数。·

  • maxCapacity(最大容量)的值为源Bytebuf的readableBytes( )可读字节数。

切片后的新Bytebuf有两个特点:

  • 切片不可以写入,原因是:maxCapacity与writerIndex值相同。
  • 切片和源ByteBuf的可读字节数相同,原因是:切片后的可读字节数为自己的属性writerIndex - readerIndex,也就是源ByteBuf的readableBytes() -0。切片后的新ByteBuf和源ByteBuf的关联性:
  • 切片不会复制源ByteBuf的底层数据,底层数组和源ByteBuf的底层数组是同一个。
  • 切片不会改变源ByteBuf的引用计数。

从根本上说,slice()无参数方法所生成的切片就是源ByteBuf可读部分的浅层复制。

duplicate整体浅层复制

和slice切片不同,duplicate() 返回的是源ByteBuf的整个对象的一个浅层复制,包括如下内容:

  • duplicate的读写指针、最大容量值,与源ByteBuf的读写指针相同。
  • duplicate() 不会改变源ByteBuf的引用计数。
  • duplicate() 不会复制源ByteBuf的底层数据。

duplicate() 和slice() 方法都是浅层复制。不同的是,slice()方法是切取一段的浅层复制,而duplicate( )是整体的浅层复制。

浅层复制的问题

leBytes( )可读字节数。

切片后的新Bytebuf有两个特点:

  • 切片不可以写入,原因是:maxCapacity与writerIndex值相同。
  • 切片和源ByteBuf的可读字节数相同,原因是:切片后的可读字节数为自己的属性writerIndex - readerIndex,也就是源ByteBuf的readableBytes() -0。切片后的新ByteBuf和源ByteBuf的关联性:
  • 切片不会复制源ByteBuf的底层数据,底层数组和源ByteBuf的底层数组是同一个。
  • 切片不会改变源ByteBuf的引用计数。

从根本上说,slice()无参数方法所生成的切片就是源ByteBuf可读部分的浅层复制。

duplicate整体浅层复制

和slice切片不同,duplicate() 返回的是源ByteBuf的整个对象的一个浅层复制,包括如下内容:

  • duplicate的读写指针、最大容量值,与源ByteBuf的读写指针相同。
  • duplicate() 不会改变源ByteBuf的引用计数。
  • duplicate() 不会复制源ByteBuf的底层数据。

duplicate() 和slice() 方法都是浅层复制。不同的是,slice()方法是切取一段的浅层复制,而duplicate( )是整体的浅层复制。

浅层复制的问题

浅层复制方法不会实际去复制数据,也不会改变ByteBuf的引用计数,这就会导致一个问题:在源ByteBuf调用release() 之后,一旦引用计数为零,就变得不能访问了;在这种场景下,源ByteBuf的所有浅层复制实例也不能进行读写了;如果强行对浅层复制实例进行读写,则会报错。因此,在调用浅层复制实例时,可以通过调用一次retain() 方法来增加引用,表示它们对应的底层内存多了一次引用,引用计数为2。在浅层复制实例用完后,需要调用两次release()方法,将引用计数减一,这样就不影响源ByteBuf的内存释放。

Netty-bytebuf相关推荐

  1. Netty ByteBuf(图解之 2)| 秒懂

    目录 Netty ByteBuf(图解二):API 图解 源码工程 写在前面 ByteBuf 的四个逻辑部分 ByteBuf 的三个指针 ByteBuf 的三组方法 ByteBuf 的引用计数 Byt ...

  2. 对于 Netty ByteBuf 的零拷贝(Zero Copy) 的理解

    根据 Wiki 对 Zero-copy 的定义: "Zero-copy" describes computer operations in which the CPU does n ...

  3. 【Netty4】netty ByteBuf (二) 引用计数对象(reference counted objects)

    原文出处:http://netty.io/wiki/reference-counted-objects.html 相关文章: netty ByteBuf (一)如何创建ByteBuf对象 netty ...

  4. Netty - ByteBuf

    2019独角兽企业重金招聘Python工程师标准>>> 1.ByteBuf类 - Netty的数据容器 ByteBuf维护了两个不同的索引: readerIndex:用于读取 wri ...

  5. Netty——ByteBuf的API

    ByteBuf 正如前面所提到的,网络数据的基本单位总是字节.Java NIO 提供了 ByteBuffer 作为它 的字节容器,但是这个类使用起来过于复杂,而且也有些繁琐. Netty 的 Byte ...

  6. Netty - ByteBuf索引管理

    2019独角兽企业重金招聘Python工程师标准>>> 之前写过一篇文章介绍过JDK里面InputStream的mark(readlimit)和reset()方法,Java IO:使 ...

  7. netty ByteBuf对象池和内存泄漏检测实现走读

    该文已重写,请移步: https://blog.csdn.net/weixin_40318210/article/details/108970208 https://blog.csdn.net/wei ...

  8. Netty中ByteBuf 的零拷贝

    转载:https://www.jianshu.com/p/1d1fa2fe1ed9 此文章已同步发布在我的 segmentfault 专栏. 根据 Wiki 对 Zero-copy 的定义: &quo ...

  9. Netty 学习之旅:ByteBuf 篇之 ByteBuf 内部结构与 API 学习

    1.原生 ByteBuffer.ByteBuf 内部结构设计 首先我们来回顾一下 java.nio.ByteBuffe r的内部结构设计. ByteBuffer 内部持有一个 byte[] bt, 再 ...

  10. Netty(九)——ByteBuf源码之析

    ByteBuf在Netty占据着中重要的位置,上篇<Netty--ByteBuf功能之说>讲了ByteBuf的工作原理和重要功能介绍.这篇从源码的角度来看ByteBuf.首先,来看一下主要 ...

最新文章

  1. Android:ViewPager为页卡内视图组件添加事件
  2. File 类型的文本框,选择文件时响应很慢解决方法
  3. bsp模型适用于图计算_CODE V微信简报:光束合成传播算法(BSP)
  4. SkinMagic使用后按钮加自定义图标或菜单GetMneu返回NULL的解决方法
  5. Java ObjectStreamClass getSerialVersionUID()方法(带示例)
  6. 保存多序列tiff文件_解码TIFF文件
  7. 解决hash冲突的三种方法
  8. 【硬件】详解电流采样电路
  9. 无法启动程序因为计算机中丢失msvcr100,win7系统无法启动程序提示计算机中丢失msvcr100.dll的解决方法...
  10. 萝卜内存对比工具使用说明
  11. 39《黑客与画家 : 硅谷创业之父Paul Graham文集》 -豆瓣评分8.8
  12. Java游戏项目之坦克大战
  13. android获取全部ram和rom,ROM与RAM的那点事,超详细解说
  14. sts工具如何配置mvn,解决settings.xml文件配置问题
  15. 两位数码管秒表c语言,单片机C语言编程实现双数码管可调秒表
  16. 计算机集中控制系统结构上和DCS基本一致,dcs集中控制系统
  17. 租车APP开发的市场优势和主要功能
  18. Free Code Camp 练习总结(五) ——Introduction to the ES6 Challenges ES6基本用法和基础知识
  19. 《你不可不知的人性》刘墉 读书笔记(二)
  20. IOS 如何排除闪退和内存泄漏问题

热门文章

  1. 如何免费在线把Figma转成Sketch
  2. Java的API进制转换
  3. PTA 最大子序列和
  4. Vue 获取URL中的参数
  5. 【微信小程序】理论学习笔记
  6. JS中DOM元素的操作
  7. 万用表可以测量自身的电池电压吗?
  8. 微信服务商商户进件php,关于微信服务商特约商户进件的问题
  9. Ubuntu 18.04 右上角音量图标消失问题解决方法
  10. js面向对象,实现星星(萤火虫)闪