Netty框架Bind流程浅析
Netty简介:
Netty是JBOSS 基于Java NIO开源的提供异步、事件驱动的网络通信框架,用于开发高性能网络服务器
Zookeeper RocketMq Dubbo ShardingSphere 底层通信都有用到它
服务端代码实例
public class DiscardServer {private int port;public DiscardServer(int port) {this.port = port;}public void run() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap(); // (2)b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3).childHandler(new ChannelInitializer<SocketChannel>() { // (4)@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new DiscardServerHandler());}}).option(ChannelOption.SO_BACKLOG, 128) // (5).childOption(ChannelOption.SO_KEEPALIVE, true); // (6)// Bind and start to accept incoming connections.ChannelFuture f = b.bind(port).sync(); // (7)// Wait until the server socket is closed.// In this example, this does not happen, but you can do that to gracefully// shut down your server.f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {int port = 8080;if (args.length > 0) {port = Integer.parseInt(args[0]);}new DiscardServer(port).run();}
}
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)// Discard the received data silently.((ByteBuf) msg).release(); // (3)}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)// Close the connection when an exception is raised.cause.printStackTrace();ctx.close();}
}
这个是netty官网上的代码,服务端的netty代码基本都是这个模板,在执行bind操作后就可以监听端口接收客户端请求了
这个模板代码里面主要有以下几个步骤
- 创建服务端启动引导类 ServerBootstrap
- ServerBootstrap 绑定两个EventLoopGroup ,第一个处理网络连接,另外一个处理业务请求(约定处理连接的叫bossGroup ,处理业务的叫workerGroup )
- 绑定Channel
- 设置workerGroup 的业务逻辑处理Handler
- 设置网络相关参数
- 执行bind操作,开始监听端口接收客户端请求
netty组件
ServerBootstrap:
服务端引导程序启动类,继承了AbstractBootstrap
内部属性
属性 | 描述 |
---|---|
Map<ChannelOption<?>, Object> childOptions | work线程channel相关参数 |
EventLoopGroup childGroup | 实际处理业务的工作者 事件循环线程池 |
ChannelHandler childHandler | 处理添加到工作者事件循环线程的Channel |
AbstractBootstrap:
内部属性
属性 | 描述 |
---|---|
EventLoopGroup group | 事件循环线程池负责接收外部请求,一般内部只要一个NioEventLoop就可以 |
ChannelFactory<? extends C> channelFactory | 实际处理业务的工作者 事件循环线程池 |
SocketAddress localAddress | Socket地址可通过构造函数或者localAddress方法赋值,这个属性有值可以调用bind方法不用传端口信息 |
ChannelFactory<? extends C> channelFactory | 实现类是ReflectiveChannelFactory ,负责创建Channel NioServerSocketChannel 或者NioSocketChannel ,channel方法传入具体Channel类构建 |
Map<ChannelOption<?>, Object> | 定义了一些网络有关的参数 |
ChannelHandler handler | BossGroup绑定的ChannelHandler处理类 |
Bootstrap
客户端引导类同样父类继承了AbstractBootstrap,只需要一个事件循环线程池
内部属性
属性 | 描述 |
---|---|
AddressResolverGroup<?> | 地址解析器 |
SocketAddress remoteAddress | 服务端远程地址 |
NioEventLoopGroup
事件循环线程池 Bootstrap内部处理任务的线程池组
类继承体系及作用
类或者接口 | 作用 |
---|---|
Executor | 执行器接口 NioEventLoopGroup可以执行execute方法 |
ExecutorService | 扩展执行器接口 定义了shutdown submit invokeAll invokeAny等方法 |
ScheduledExecutorService | 表示NioEventLoopGroup可执行调度任务 |
Iterable | 迭代器接口 |
EventExecutorGroup | 定义了next方法 获取Group内的下一个事件循环线程 |
AbstractEventExecutorGroup | 实现了一些执行submit invokeAll schedule 执行任务的方法 ,next()方法获取下一个事件循环线程实际执行 |
EventLoopGroup | 定义next方法获取Group里面的线程 另外提供了三个register重载方法 将Channel具体绑定到一个 EventLoop |
MultithreadEventExecutorGroup | 有集合记录了具体的EventLoop 实现了next方法的功能 |
MultithreadEventLoopGroup | 实现了Channel注册功能 |
NioEventLoopGroup | newChild 方法创建一个NioEventLoop |
NioEventLoop
类继承体系及作用
类或者接口 | 作用 |
---|---|
EventExecutorGroup | 接口 : 提供了获取下一个EventExecutor的方法 EventExecutor迭代器的方法 |
EventExecutor | 接口 起到标记作用 定义了inEventLoop方法 |
AbstractEventExecutor | 实现了submit提交任务方法 交由父类AbstractExecutorService 执行 |
AbstractExecutorService | JDK并发包里面的类 实现了 submit invokeAny 提交,执行任务的方法, 包装Runnable Callable 接口成FutureTask 的newTaskFo方法 |
AbstractScheduledEventExecutor | 实现了ScheduledExecutorService的方法 内部有一个ScheduledFutureTask 的优先级队列 提供一些获取 ScheduledFutureTask 的方法给子类调用 |
SingleThreadEventExecutor | 提供抽象方法run 调用NioEventLoop Run方法 ,实现了线程池的大部分方法 |
SingleThreadEventLoop | register 方法将Channel绑定到一个事件循环线程 |
NioEventLoop | 有Nio 的Selector选择器类,run方法筛选注册在Selector上的Channel |
NioServerSocketChannel
服务端Channel
类继承体系及作用
类或接口 | 作用 |
---|---|
AbstractChannel | 比较顶层的实现,这个类定义了DefaultChannelPipeline Unsafe对象 没有涉及到Java NIO方面的概念 |
AbstractNioChannel | 这里开始关联到jJava NIO方面的概念 有提供方法往事件循环线程池对应Selector注册SelectionKey |
AbstractNioMessageChannel | 实现了读方法,接收到客户端请求后交给workGroup执行 |
NioSocketChannel
DefaultChannelPipeline
AbstractChannel (NioSocketChannel NioServerSocketChannel 都继承了这个类) 类有一个 DefaultChannelPipeline 属性
DefaultChannelPipeline 有两个 AbstractChannelHandlerContext属性,分别是hean tail,看命名就知道 AbstractChannelHandlerContext的数据能组成一个链表结构(也就是Netty中的Channel内部有一个链表来处理请求)
HeadContext
TailContext
可以看到 HeadContext 比TailContext多继承了一个ChannelOutboundHandler接口(提供write flush方法),所以向网络的另外一端写出数据最终是通过HeadContext
服务端bind过程
bind之后可以监听端口,我们来看看具体过程,代码入口在AbstractBootstrap doBind方法
private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}if (regFuture.isDone()) {// At this point we know that the registration was complete and successful.ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel.promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.registered();doBind0(regFuture, channel, localAddress, promise);}}});return promise;}}
第2-3行通过initAndRegister方法创建一个Channel (这里的Channel 模板方法channel传进来的对象,服务端启动传的是NioServerSocketChannel)跟ChannelFuture ,ChannelFuture 实现了JDK里面的Future接口,这个是在后面Channel注册到事件循环线程池结束后来回调的
我们接着看doBind 后面的代码第4到6 ,7到12行代码分别判断这个异步Future是否异常 或者完成
在netty服务端启动流程中,正常情况下都是走到13行的else分支代码的,这里创建了一个PendingRegistrationPromise对象,并在前面返回的Future对象上添加一个监听,这个监听在什么时候执行呢,就是Channel注册到事件循环线程池的逻辑完成后(无论成功注册或者异常都会回调),当然注册成功之后会执行doBind0方法
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}
这个doBind0会提交一个任务到Channel绑定的事件循环线程池等待执行(如果线程池没启动的话会先启动),然后我们具体看看这个后面要被线程池执行的任务具体做了什么,调用Channel的bind方法,这里的Channel 还是Netty层面的channel,bind操作现在Channel DefaultChannelPipeline上传递,从TailContext节点开始,HeadContext 通过Unsafe bind方法最终调用到Channel的doBind方法
在NioServerSocketChannel
里面这个方法的实现是
// NioServerSocketChannelprotected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}
}
// AbstractNioChannel
protected SelectableChannel javaChannel() {return ch;
}
代码执行到这就可以监听端口接收客户端请求
前面说到,bind方法是在Channel注册完成后才会调用,我们再回头看看Channel怎么注册的
服务端Channel注册流程
创建跟注册Channel的代码是AbstractBootstrap initAndRegister
final ChannelFuture initAndRegister() {Channel channel = null;try {channel = channelFactory.newChannel();init(channel);} catch (Throwable t) {if (channel != null) {// channel can be null if newChannel crashed (eg SocketException("too many open files"))channel.unsafe().closeForcibly();// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);} return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);} ChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}return regFuture;}
- 第4行创建了一个Channel
- 第5行子类ServerBootstrap实现,在Channel的DefaultChannelPromise处理链表上增加了一个Handler节点
p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});
public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);setChannelOptions(child, childOptions, logger);setAttributes(child, childAttrs);try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}
ServerBootstrapAcceptor 处理读事件的方法channelRead,将接收到的消息(这个消息可以转成Channel 实际是accept接收客户端请求后包装的Channel)注册到childGroup,也就实现了将接收到的请求提交到workGroup执行,具体在哪里传递消息过来的,后面会提到
- 将channel注册到BossGroup
ChannelFuture regFuture = config().group().register(channel);
是的,这里的group就是BossGroup,专门负责处理网络请求的
// SingleThreadEventLooppublic ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;}
netty 里面的这些 register bind write最后都是通过Channel内部的Unsafe来处理
// AbstractChannel 的 AbstractUnsafe内部类 public final void register(EventLoop eventLoop, final ChannelPromise promise) {ObjectUtil.checkNotNull(eventLoop, "eventLoop");if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}}
注册方法掉到了AbstractCannel register方法,
第15至16行如果当前线程是事件循环线程里面的线程,直接调用register0方法,这里执行到这里的时候是main线程,所以走的是else分支我们把register0包装成一个任务取异步执行,
这里的register0 调用了doRegister方法,实现了在Selector中注册一个Key的功能
// AbstractNioChannelselectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
// SingleThreadEventExecutorprivate void execute(Runnable task, boolean immediate) {boolean inEventLoop = inEventLoop();addTask(task);if (!inEventLoop) {startThread();if (isShutdown()) {boolean reject = false;try {if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {// The task queue does not support removal so the best thing we can do is to just move on and// hope we will be able to pick-up the task before its completely terminated.// In worst case we will log on termination.}if (reject) {reject();}}}if (!addTaskWakesUp && immediate) {wakeup(inEventLoop);}}
// SingleThreadEventExecutorprivate void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {boolean success = false;try {doStartThread();success = true;} finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);}}}}}
如果NioEventLoop 没启动调用doStartThread
方法
在startThread 方法里面提交了个异步任务调用到了NioEventloop的run方法
总结下Channel注册到事件循环线程池的流程
NioEventloop run方法
protected void run() {int selectCnt = 0;for (;;) {try {int strategy;try {strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());switch (strategy) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:// fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT:long curDeadlineNanos = nextScheduledTaskDeadlineNanos();if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE; // nothing on the calendar}nextWakeupNanos.set(curDeadlineNanos);try {if (!hasTasks()) {strategy = select(curDeadlineNanos);}} finally {// This update is just to help block unnecessary selector wakeups// so use of lazySet is ok (no race condition)nextWakeupNanos.lazySet(AWAKE);}// fall throughdefault:}} catch (IOException e) {// If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0();selectCnt = 0;handleLoopException(e);continue;}selectCnt++;cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;boolean ranTasks;if (ioRatio == 100) {try {if (strategy > 0) {processSelectedKeys();}} finally {// Ensure we always run tasks.ranTasks = runAllTasks();}} else if (strategy > 0) {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}} else {ranTasks = runAllTasks(0); // This will run the minimum number of tasks}if (ranTasks || strategy > 0) {if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}selectCnt = 0;} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)selectCnt = 0;}} catch (CancelledKeyException e) {// Harmless exception - log anywayif (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}} catch (Error e) {throw (Error) e;} catch (Throwable t) {handleLoopException(t);} finally {// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Error e) {throw (Error) e;} catch (Throwable t) {handleLoopException(t);}}}}
第7行代码 调用到selector.selectNow()
返回是0,这个方法非阻塞往下执行到54行,调用runAllTasks
执行提交到线程池队列中的任务,前面ServerSocketChannel注册到NioEventLoop的流程中就提交了一个register0任务
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
前面的runAllTasks
方法就会执行到register0
doRegister
向事件循环线程池的Selector注册了一个Key(AbstractNioChannel
实现)
private void register0(ChannelPromise promise) {try {// check if the channel is still open as it could be closed in the mean time when the register// call was outside of the eventLoopif (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;doRegister();neverRegistered = false;registered = true;// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the// user may already fire events through the pipeline in the ChannelFutureListener.pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();// Only fire a channelActive if the channel has never been registered. This prevents firing// multiple channel actives if the channel is deregistered and re-registered.if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {// This channel was registered before and autoRead() is set. This means we need to begin read// again so that we process inbound data.//// See https://github.com/netty/netty/issues/4805beginRead();}}} catch (Throwable t) {// Close the channel directly to avoid FD leak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
register0
执行完后再次进入NioEventLoop的run方法,这时候第7行代码会返回-1,代码就执行到23行,这里面会调用到selector.select
(没有添加调度任务)这个方法会阻塞,客户端有请求进来后会解除阻塞,
客户端有请求进来后解除阻塞,代码往下执行到processSelectedKeys
private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys());}}
对于这个selectedKeys openSelector
又给它赋值,所有我们直接关注下processSelectedKeysOptimized
方法
private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];// null out entry in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.keys[i] = null;final Object a = k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {// null out entries in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.reset(i + 1);selectAgain();i = -1;}}}
前面提到在AbstractNioChannel
doRegister方法里面往NioEventLoop的Selector注册了一个selectionKey,第三个参数就是attachment,这里是NioServerSocketChannel
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
我们接下来看看processSelectedKey
方法
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return;}// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See https://github.com/netty/netty/issues/5125if (eventLoop == this) {// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());}return;}try {int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// &都为1时为1 !=0 说明当前是OP_READ 或者OP_ACCEPT NioServerSocket注册的是0// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}
NioMessageUnsafe 第25行拿到的值就是AbstractNioChannel doRegister方法里面注册selectionKey传的第二个参数,也就是0,进入到第48行代码 unsafe.read()
;
第41行在当前Channel(NioServerSocketChannel)的ChannelPipeline处理链表上传递读事件,参数是NioSocketChannel
(这个是Netty的Channel 内部有一个java nio的SocketChannel
),这个读事件传递下去后续的逻辑是怎么处理的呢,这也就是前面说的在ServerSocketChannel初始化过程中在它的处理链表中加了一个ServerBootstrapAcceptor ,把客户端SocketChannel包装后的NioSocketChannel注册到了workGroup当中,绑定了具体的业务逻辑处理Handler,这样Netty就实现了bossGroup负责接入请求,workGroup负责处理请求的逻辑
在AbstractNioMessageChannel(NioServerSocketChannel)实现了read
private final class NioMessageUnsafe extends AbstractNioUnsafe {private final List<Object> readBuf = new ArrayList<Object>();@Overridepublic void read() {assert eventLoop().inEventLoop();final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();// ServerSocketChannel创建时 创建了一个ServerSocketChannelConfig 传入了一个AdaptiveRecvByteBufAllocator对象,unsafe().recvBufAllocHandle()拿到的就是这个Allocator的一个内部类HandleImplfinal RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {// 这里ServerSocketChannel重写方法,accept等待客户端请求接入拿到一个SocketChannel客户端对象int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;/*** readBuf 包装了Nio 的NioSocketChannel*/pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);pipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}}
protected int doReadMessages(List<Object> buf) throws Exception {/*** 对于非阻塞(设置为false) 没有请求进来 accept也会往下走*/SocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;}
Netty框架Bind流程浅析相关推荐
- Netty框架架构解析+API+运行流程+网络编程文章集锦
新手入门:目前为止最透彻的的Netty高性能原理和框架架构解析 <!-- 作者区域 --><div class="author"><a class=& ...
- Netty框架整体架构及源码知识点
Netty概述 Netty是一个高性能.异步事件驱动的NIO框架,它提供了对TCP.UDP和文件传输的支持.作为当前最流行的NIO框架,Netty在互联网领域.大数据分布式计算领域.游戏行业.通信行业 ...
- Netty框架之责任链模式及其应用
Netty框架之概述及基本组件介绍 Reactor网络编程模型解析 前言 在上篇博客介绍完netty框架的基本组件介绍和概述,也跟着代码看了下NioEventLoopGroup的启动过程,以及基于Re ...
- netty框架及原理解析
本篇首发于橙寂博客转载请加上此标示. 正式开始了netty的学习,netty是基于nio上的一个框架.期间翻阅了很多文档以及资料.先推荐给大家. 相关文档 Netty源码在线阅读: Netty-4.1 ...
- Android R WiFi热点流程浅析
Android R WiFi热点流程浅析 Android上的WiFi SoftAp功能是用户常用的功能之一,它能让我们分享手机的网络给其他设备使用. 那Android系统是如何实现SoftAp的呢,这 ...
- Android中measure过程、WRAP_CONTENT详解以及xml布局文件解析流程浅析(下)
本文原创, 转载请注明出处:http://blog.csdn.net/qinjuning 上篇文章<<Android中measure过程.WRAP_CONTENT详解以及xml布局文 ...
- Netty框架多人聊天案例,代码示例
Netty框架多人聊天案例,代码示例 pom <?xml version="1.0" encoding="UTF-8"?> <project ...
- Netty框架入门案例,代码示例
Netty框架入门案例 pom文件 <?xml version="1.0" encoding="UTF-8"?> <project xmlns ...
- netty框架实现websocket达到高并发
websocket(三) 进阶!netty框架实现websocket达到高并发 引言: 在前面两篇文章中,我们对原生websocket进行了了解,且用demo来简单的讲解了其用法.但是在实际项目中,那 ...
最新文章
- 前端技术选型的遗憾和经验教训
- Android — 长按ListView 利用上下文菜单(ActionMode) 进行批量事件处理
- 数据库被挂马的ASP处理方法
- java.lang.IllegalStateException: Context namespace element ‘annotation-config’ and its parser class
- c语言中abc是什么类型,基金分为ABC三类,分别代表什么意思,哪一类适合普通投资者?...
- 里氏替换原则_代码需要有单一职责,还要开闭,里氏替换又是什么鬼?
- java vbs_VBS基础篇 - vbscript Dictionary对象
- 我希望早几年知道的5个Unix命令
- 项目合作 | 室内场景三维重建
- mvc @html.action() 跨area调用controller 中的action
- Swing超市收银系统附图
- python学习笔记(对象)
- MyEclipse添加tomcat7出现“Value must be an existing directory”解决方案
- python入门教程pdf-python基础教程第4版pdf
- python表白代码-如何用Python代码向心爱的姑娘花式表白?
- 计算机课程联合考试是什么意思,计算机技术在职研究生能否通过一月联考的方式学习课程内容...
- sql 安装程序文件_【病毒文件分析】MedusaLocker勒索病毒,小心全网被加密
- AHCI 与 IDE
- e480Linux无法发现无线网卡,ThinkPad无线不能用无法连接无线网络的具体排查流程图解...
- Python库详解。python有那些库你都知道了嘛?