Netty简介:

Netty是JBOSS 基于Java NIO开源的提供异步、事件驱动的网络通信框架,用于开发高性能网络服务器
Zookeeper RocketMq Dubbo ShardingSphere 底层通信都有用到它

服务端代码实例

public class DiscardServer {private int port;public DiscardServer(int port) {this.port = port;}public void run() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap(); // (2)b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3).childHandler(new ChannelInitializer<SocketChannel>() { // (4)@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new DiscardServerHandler());}}).option(ChannelOption.SO_BACKLOG, 128)          // (5).childOption(ChannelOption.SO_KEEPALIVE, true); // (6)// Bind and start to accept incoming connections.ChannelFuture f = b.bind(port).sync(); // (7)// Wait until the server socket is closed.// In this example, this does not happen, but you can do that to gracefully// shut down your server.f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {int port = 8080;if (args.length > 0) {port = Integer.parseInt(args[0]);}new DiscardServer(port).run();}
}
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)// Discard the received data silently.((ByteBuf) msg).release(); // (3)}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)// Close the connection when an exception is raised.cause.printStackTrace();ctx.close();}
}

这个是netty官网上的代码,服务端的netty代码基本都是这个模板,在执行bind操作后就可以监听端口接收客户端请求了
这个模板代码里面主要有以下几个步骤

  1. 创建服务端启动引导类 ServerBootstrap
  2. ServerBootstrap 绑定两个EventLoopGroup ,第一个处理网络连接,另外一个处理业务请求(约定处理连接的叫bossGroup ,处理业务的叫workerGroup )
  3. 绑定Channel
  4. 设置workerGroup 的业务逻辑处理Handler
  5. 设置网络相关参数
  6. 执行bind操作,开始监听端口接收客户端请求

netty组件

ServerBootstrap:

服务端引导程序启动类,继承了AbstractBootstrap

内部属性

属性 描述
Map<ChannelOption<?>, Object> childOptions work线程channel相关参数
EventLoopGroup childGroup 实际处理业务的工作者 事件循环线程池
ChannelHandler childHandler 处理添加到工作者事件循环线程的Channel

AbstractBootstrap:

内部属性

属性 描述
EventLoopGroup group 事件循环线程池负责接收外部请求,一般内部只要一个NioEventLoop就可以
ChannelFactory<? extends C> channelFactory 实际处理业务的工作者 事件循环线程池
SocketAddress localAddress Socket地址可通过构造函数或者localAddress方法赋值,这个属性有值可以调用bind方法不用传端口信息
ChannelFactory<? extends C> channelFactory 实现类是ReflectiveChannelFactory ,负责创建Channel NioServerSocketChannel 或者NioSocketChannel ,channel方法传入具体Channel类构建
Map<ChannelOption<?>, Object> 定义了一些网络有关的参数
ChannelHandler handler BossGroup绑定的ChannelHandler处理类

Bootstrap

客户端引导类同样父类继承了AbstractBootstrap,只需要一个事件循环线程池

内部属性

属性 描述
AddressResolverGroup<?> 地址解析器
SocketAddress remoteAddress 服务端远程地址

NioEventLoopGroup

事件循环线程池 Bootstrap内部处理任务的线程池组

类继承体系及作用

类或者接口 作用
Executor 执行器接口 NioEventLoopGroup可以执行execute方法
ExecutorService 扩展执行器接口 定义了shutdown submit invokeAll invokeAny等方法
ScheduledExecutorService 表示NioEventLoopGroup可执行调度任务
Iterable 迭代器接口
EventExecutorGroup 定义了next方法 获取Group内的下一个事件循环线程
AbstractEventExecutorGroup 实现了一些执行submit invokeAll schedule 执行任务的方法 ,next()方法获取下一个事件循环线程实际执行
EventLoopGroup 定义next方法获取Group里面的线程 另外提供了三个register重载方法 将Channel具体绑定到一个 EventLoop
MultithreadEventExecutorGroup 有集合记录了具体的EventLoop 实现了next方法的功能
MultithreadEventLoopGroup 实现了Channel注册功能
NioEventLoopGroup newChild 方法创建一个NioEventLoop

NioEventLoop

类继承体系及作用

类或者接口 作用
EventExecutorGroup 接口 : 提供了获取下一个EventExecutor的方法 EventExecutor迭代器的方法
EventExecutor 接口 起到标记作用 定义了inEventLoop方法
AbstractEventExecutor 实现了submit提交任务方法 交由父类AbstractExecutorService 执行
AbstractExecutorService JDK并发包里面的类 实现了 submit invokeAny 提交,执行任务的方法, 包装Runnable Callable 接口成FutureTask 的newTaskFo方法
AbstractScheduledEventExecutor 实现了ScheduledExecutorService的方法 内部有一个ScheduledFutureTask 的优先级队列 提供一些获取 ScheduledFutureTask 的方法给子类调用
SingleThreadEventExecutor 提供抽象方法run 调用NioEventLoop Run方法 ,实现了线程池的大部分方法
SingleThreadEventLoop register 方法将Channel绑定到一个事件循环线程
NioEventLoop 有Nio 的Selector选择器类,run方法筛选注册在Selector上的Channel

NioServerSocketChannel

服务端Channel

类继承体系及作用

类或接口 作用
AbstractChannel 比较顶层的实现,这个类定义了DefaultChannelPipeline Unsafe对象 没有涉及到Java NIO方面的概念
AbstractNioChannel 这里开始关联到jJava NIO方面的概念 有提供方法往事件循环线程池对应Selector注册SelectionKey
AbstractNioMessageChannel 实现了读方法,接收到客户端请求后交给workGroup执行

NioSocketChannel

DefaultChannelPipeline

AbstractChannel (NioSocketChannel NioServerSocketChannel 都继承了这个类) 类有一个 DefaultChannelPipeline 属性
DefaultChannelPipeline 有两个 AbstractChannelHandlerContext属性,分别是hean tail,看命名就知道 AbstractChannelHandlerContext的数据能组成一个链表结构(也就是Netty中的Channel内部有一个链表来处理请求)

HeadContext

TailContext

可以看到 HeadContext 比TailContext多继承了一个ChannelOutboundHandler接口(提供write flush方法),所以向网络的另外一端写出数据最终是通过HeadContext

服务端bind过程

bind之后可以监听端口,我们来看看具体过程,代码入口在AbstractBootstrap doBind方法

 private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}if (regFuture.isDone()) {// At this point we know that the registration was complete and successful.ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel.promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.registered();doBind0(regFuture, channel, localAddress, promise);}}});return promise;}}

第2-3行通过initAndRegister方法创建一个Channel (这里的Channel 模板方法channel传进来的对象,服务端启动传的是NioServerSocketChannel)跟ChannelFuture ,ChannelFuture 实现了JDK里面的Future接口,这个是在后面Channel注册到事件循环线程池结束后来回调的

我们接着看doBind 后面的代码第4到6 ,7到12行代码分别判断这个异步Future是否异常 或者完成

在netty服务端启动流程中,正常情况下都是走到13行的else分支代码的,这里创建了一个PendingRegistrationPromise对象,并在前面返回的Future对象上添加一个监听,这个监听在什么时候执行呢,就是Channel注册到事件循环线程池的逻辑完成后(无论成功注册或者异常都会回调),当然注册成功之后会执行doBind0方法

private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}

这个doBind0会提交一个任务到Channel绑定的事件循环线程池等待执行(如果线程池没启动的话会先启动),然后我们具体看看这个后面要被线程池执行的任务具体做了什么,调用Channel的bind方法,这里的Channel 还是Netty层面的channel,bind操作现在Channel DefaultChannelPipeline上传递,从TailContext节点开始,HeadContext 通过Unsafe bind方法最终调用到Channel的doBind方法
NioServerSocketChannel里面这个方法的实现是

// NioServerSocketChannelprotected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}
}
// AbstractNioChannel
protected SelectableChannel javaChannel() {return ch;
}

代码执行到这就可以监听端口接收客户端请求
前面说到,bind方法是在Channel注册完成后才会调用,我们再回头看看Channel怎么注册的

服务端Channel注册流程

创建跟注册Channel的代码是AbstractBootstrap initAndRegister

  final ChannelFuture initAndRegister() {Channel channel = null;try {channel = channelFactory.newChannel();init(channel);} catch (Throwable t) {if (channel != null) {// channel can be null if newChannel crashed (eg SocketException("too many open files"))channel.unsafe().closeForcibly();// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}    return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}   ChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}return regFuture;}
  1. 第4行创建了一个Channel
  2. 第5行子类ServerBootstrap实现,在Channel的DefaultChannelPromise处理链表上增加了一个Handler节点
  p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});
public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);setChannelOptions(child, childOptions, logger);setAttributes(child, childAttrs);try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}

ServerBootstrapAcceptor 处理读事件的方法channelRead,将接收到的消息(这个消息可以转成Channel 实际是accept接收客户端请求后包装的Channel)注册到childGroup,也就实现了将接收到的请求提交到workGroup执行,具体在哪里传递消息过来的,后面会提到

  1. 将channel注册到BossGroup
 ChannelFuture regFuture = config().group().register(channel);

是的,这里的group就是BossGroup,专门负责处理网络请求的

  // SingleThreadEventLooppublic ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;}

netty 里面的这些 register bind write最后都是通过Channel内部的Unsafe来处理

  // AbstractChannel 的 AbstractUnsafe内部类 public final void register(EventLoop eventLoop, final ChannelPromise promise) {ObjectUtil.checkNotNull(eventLoop, "eventLoop");if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}}

注册方法掉到了AbstractCannel register方法,
第15至16行如果当前线程是事件循环线程里面的线程,直接调用register0方法,这里执行到这里的时候是main线程,所以走的是else分支我们把register0包装成一个任务取异步执行,
这里的register0 调用了doRegister方法,实现了在Selector中注册一个Key的功能

    // AbstractNioChannelselectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
   // SingleThreadEventExecutorprivate void execute(Runnable task, boolean immediate) {boolean inEventLoop = inEventLoop();addTask(task);if (!inEventLoop) {startThread();if (isShutdown()) {boolean reject = false;try {if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {// The task queue does not support removal so the best thing we can do is to just move on and// hope we will be able to pick-up the task before its completely terminated.// In worst case we will log on termination.}if (reject) {reject();}}}if (!addTaskWakesUp && immediate) {wakeup(inEventLoop);}}
  // SingleThreadEventExecutorprivate void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {boolean success = false;try {doStartThread();success = true;} finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);}}}}}

如果NioEventLoop 没启动调用doStartThread方法

在startThread 方法里面提交了个异步任务调用到了NioEventloop的run方法
总结下Channel注册到事件循环线程池的流程

NioEventloop run方法

   protected void run() {int selectCnt = 0;for (;;) {try {int strategy;try {strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());switch (strategy) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:// fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT:long curDeadlineNanos = nextScheduledTaskDeadlineNanos();if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE; // nothing on the calendar}nextWakeupNanos.set(curDeadlineNanos);try {if (!hasTasks()) {strategy = select(curDeadlineNanos);}} finally {// This update is just to help block unnecessary selector wakeups// so use of lazySet is ok (no race condition)nextWakeupNanos.lazySet(AWAKE);}// fall throughdefault:}} catch (IOException e) {// If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0();selectCnt = 0;handleLoopException(e);continue;}selectCnt++;cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;boolean ranTasks;if (ioRatio == 100) {try {if (strategy > 0) {processSelectedKeys();}} finally {// Ensure we always run tasks.ranTasks = runAllTasks();}} else if (strategy > 0) {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}} else {ranTasks = runAllTasks(0); // This will run the minimum number of tasks}if (ranTasks || strategy > 0) {if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}selectCnt = 0;} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)selectCnt = 0;}} catch (CancelledKeyException e) {// Harmless exception - log anywayif (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}} catch (Error e) {throw (Error) e;} catch (Throwable t) {handleLoopException(t);} finally {// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Error e) {throw (Error) e;} catch (Throwable t) {handleLoopException(t);}}}}

第7行代码 调用到selector.selectNow() 返回是0,这个方法非阻塞往下执行到54行,调用runAllTasks 执行提交到线程池队列中的任务,前面ServerSocketChannel注册到NioEventLoop的流程中就提交了一个register0任务

strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());


前面的runAllTasks方法就会执行到register0
doRegister 向事件循环线程池的Selector注册了一个Key(AbstractNioChannel实现)

private void register0(ChannelPromise promise) {try {// check if the channel is still open as it could be closed in the mean time when the register// call was outside of the eventLoopif (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;doRegister();neverRegistered = false;registered = true;// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the// user may already fire events through the pipeline in the ChannelFutureListener.pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();// Only fire a channelActive if the channel has never been registered. This prevents firing// multiple channel actives if the channel is deregistered and re-registered.if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {// This channel was registered before and autoRead() is set. This means we need to begin read// again so that we process inbound data.//// See https://github.com/netty/netty/issues/4805beginRead();}}} catch (Throwable t) {// Close the channel directly to avoid FD leak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}

register0执行完后再次进入NioEventLoop的run方法,这时候第7行代码会返回-1,代码就执行到23行,这里面会调用到selector.select(没有添加调度任务)这个方法会阻塞,客户端有请求进来后会解除阻塞,
客户端有请求进来后解除阻塞,代码往下执行到processSelectedKeys

  private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys());}}

对于这个selectedKeys openSelector又给它赋值,所有我们直接关注下processSelectedKeysOptimized方法

 private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];// null out entry in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.keys[i] = null;final Object a = k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {// null out entries in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.reset(i + 1);selectAgain();i = -1;}}}

前面提到在AbstractNioChannel doRegister方法里面往NioEventLoop的Selector注册了一个selectionKey,第三个参数就是attachment,这里是NioServerSocketChannel

selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

我们接下来看看processSelectedKey方法

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return;}// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See https://github.com/netty/netty/issues/5125if (eventLoop == this) {// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());}return;}try {int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// &都为1时为1 !=0 说明当前是OP_READ 或者OP_ACCEPT NioServerSocket注册的是0// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}

NioMessageUnsafe 第25行拿到的值就是AbstractNioChannel doRegister方法里面注册selectionKey传的第二个参数,也就是0,进入到第48行代码 unsafe.read();
第41行在当前Channel(NioServerSocketChannel)的ChannelPipeline处理链表上传递读事件,参数是NioSocketChannel(这个是Netty的Channel 内部有一个java nio的SocketChannel),这个读事件传递下去后续的逻辑是怎么处理的呢,这也就是前面说的在ServerSocketChannel初始化过程中在它的处理链表中加了一个ServerBootstrapAcceptor ,把客户端SocketChannel包装后的NioSocketChannel注册到了workGroup当中,绑定了具体的业务逻辑处理Handler,这样Netty就实现了bossGroup负责接入请求,workGroup负责处理请求的逻辑

在AbstractNioMessageChannel(NioServerSocketChannel)实现了read

    private final class NioMessageUnsafe extends AbstractNioUnsafe {private final List<Object> readBuf = new ArrayList<Object>();@Overridepublic void read() {assert eventLoop().inEventLoop();final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();// ServerSocketChannel创建时 创建了一个ServerSocketChannelConfig 传入了一个AdaptiveRecvByteBufAllocator对象,unsafe().recvBufAllocHandle()拿到的就是这个Allocator的一个内部类HandleImplfinal RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {// 这里ServerSocketChannel重写方法,accept等待客户端请求接入拿到一个SocketChannel客户端对象int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;/*** readBuf 包装了Nio 的NioSocketChannel*/pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);pipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}}
protected int doReadMessages(List<Object> buf) throws Exception {/*** 对于非阻塞(设置为false) 没有请求进来 accept也会往下走*/SocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;}

Netty框架Bind流程浅析相关推荐

  1. Netty框架架构解析+API+运行流程+网络编程文章集锦

    新手入门:目前为止最透彻的的Netty高性能原理和框架架构解析 <!-- 作者区域 --><div class="author"><a class=& ...

  2. Netty框架整体架构及源码知识点

    Netty概述 Netty是一个高性能.异步事件驱动的NIO框架,它提供了对TCP.UDP和文件传输的支持.作为当前最流行的NIO框架,Netty在互联网领域.大数据分布式计算领域.游戏行业.通信行业 ...

  3. Netty框架之责任链模式及其应用

    Netty框架之概述及基本组件介绍 Reactor网络编程模型解析 前言 在上篇博客介绍完netty框架的基本组件介绍和概述,也跟着代码看了下NioEventLoopGroup的启动过程,以及基于Re ...

  4. netty框架及原理解析

    本篇首发于橙寂博客转载请加上此标示. 正式开始了netty的学习,netty是基于nio上的一个框架.期间翻阅了很多文档以及资料.先推荐给大家. 相关文档 Netty源码在线阅读: Netty-4.1 ...

  5. Android R WiFi热点流程浅析

    Android R WiFi热点流程浅析 Android上的WiFi SoftAp功能是用户常用的功能之一,它能让我们分享手机的网络给其他设备使用. 那Android系统是如何实现SoftAp的呢,这 ...

  6. Android中measure过程、WRAP_CONTENT详解以及xml布局文件解析流程浅析(下)

       本文原创, 转载请注明出处:http://blog.csdn.net/qinjuning 上篇文章<<Android中measure过程.WRAP_CONTENT详解以及xml布局文 ...

  7. Netty框架多人聊天案例,代码示例

    Netty框架多人聊天案例,代码示例 pom <?xml version="1.0" encoding="UTF-8"?> <project ...

  8. Netty框架入门案例,代码示例

    Netty框架入门案例 pom文件 <?xml version="1.0" encoding="UTF-8"?> <project xmlns ...

  9. netty框架实现websocket达到高并发

    websocket(三) 进阶!netty框架实现websocket达到高并发 引言: 在前面两篇文章中,我们对原生websocket进行了了解,且用demo来简单的讲解了其用法.但是在实际项目中,那 ...

最新文章

  1. 前端技术选型的遗憾和经验教训
  2. Android — 长按ListView 利用上下文菜单(ActionMode) 进行批量事件处理
  3. 数据库被挂马的ASP处理方法
  4. java.lang.IllegalStateException: Context namespace element ‘annotation-config’ and its parser class
  5. c语言中abc是什么类型,基金分为ABC三类,分别代表什么意思,哪一类适合普通投资者?...
  6. 里氏替换原则_代码需要有单一职责,还要开闭,里氏替换又是什么鬼?
  7. java vbs_VBS基础篇 - vbscript Dictionary对象
  8. 我希望早几年知道的5个Unix命令
  9. 项目合作 | 室内场景三维重建
  10. mvc @html.action() 跨area调用controller 中的action
  11. Swing超市收银系统附图
  12. python学习笔记(对象)
  13. MyEclipse添加tomcat7出现“Value must be an existing directory”解决方案
  14. python入门教程pdf-python基础教程第4版pdf
  15. python表白代码-如何用Python代码向心爱的姑娘花式表白?
  16. 计算机课程联合考试是什么意思,计算机技术在职研究生能否通过一月联考的方式学习课程内容...
  17. sql 安装程序文件_【病毒文件分析】MedusaLocker勒索病毒,小心全网被加密
  18. AHCI 与 IDE
  19. e480Linux无法发现无线网卡,ThinkPad无线不能用无法连接无线网络的具体排查流程图解...
  20. Python库详解。python有那些库你都知道了嘛?

热门文章

  1. noj14求广义表深度
  2. c语言联合有什么作用,C语言union共用体(联合体)基础知识及实际用途
  3. C语言union占用空间知识
  4. 深入浅出 JIT 编译器
  5. vscode搭建前端开发环境
  6. excel 右键打不开表格修复以及excel打开独立窗口的修复
  7. 马的Hamilton周游路线
  8. Sugeno型(TS型)模糊推理系统及自适应神经网络的模糊推理系统(anfis)应用
  9. 简约至上的产品设计(4)百试不爽的三条法则!
  10. 用R进行gwas meta分析,原来如此简单