Tomcat NIO(9)-IO线程-Overall流程和关键类
在上一篇文章里我们主要介绍了 tomcat NIO 中 poller 线程的阻塞与唤醒,根据以前文章当 poller 线程监测到连接有数据可读事件的时候,会把原始 socket 的包装对象委托到 tomcat io 线程池中处理,包括解析请求行,请求头,调用 servlet API,处理异步等等,在这里我们主要介绍 tomcat io 线程。
对于tomcat io线程我们主要介绍:
IO 线程 overall 流程
IO 线程主要涉及的类以及作用
IO线程overall流程
对于 tomcat io 线程来说,overall 调用序列图如下 :
SocketProcessor.run()-->
ConnectionHandler.process()-->
AbstractProcessorLight.process()-->
Http11Processor.service-->
CoyoteAdapter.service() 一直到 container 调用标准 servlet API。
涉及的关键类有
SocketProcessor,ConnectionHandler
AbstractProcessorLight,
Http11Processor,CoyoteAdapter
SocketProcessor的核心代码逻辑如下
protected void doRun() {NioChannel socket = socketWrapper.getSocket();SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());Poller poller = NioEndpoint.this.poller;if (poller == null) {socketWrapper.close();return;}try {int handshake = -1;try {if (key != null) {if (socket.isHandshakeComplete()) {handshake = 0;} else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||event == SocketEvent.ERROR) {handshake = -1;} else {handshake = socket.handshake(key.isReadable(), key.isWritable());event = SocketEvent.OPEN_READ;}}} catch (IOException x) {handshake = -1;if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);} catch (CancelledKeyException ckx) {handshake = -1;}if (handshake == 0) {SocketState state = SocketState.OPEN;if (event == null) {state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);} else {state = getHandler().process(socketWrapper, event);}if (state == SocketState.CLOSED) {poller.cancelledKey(key, socketWrapper);}} else if (handshake == -1 ) {poller.cancelledKey(key, socketWrapper);} else if (handshake == SelectionKey.OP_READ){socketWrapper.registerReadInterest();} else if (handshake == SelectionKey.OP_WRITE){socketWrapper.registerWriteInterest();}} catch (CancelledKeyException cx) {poller.cancelledKey(key, socketWrapper);} catch (VirtualMachineError vme) {ExceptionUtils.handleThrowable(vme);} catch (Throwable t) {log.error(sm.getString("endpoint.processing.fail"), t);poller.cancelledKey(key, socketWrapper);} finally {socketWrapper = null;event = null;if (running && !paused && processorCache != null) {processorCache.push(this);}}
}
首先会处理 handshake,如果 handshake 没有问题则返回 handshake 的结果为 0。
如果 handshake 过程处理正常没问题,则会通过调用 getHandler().process(socketWrapper, event) 方法从而来间接触发 ConnectionHandler 实例的 process() 方法,并返回期望原始 socket 的状态 SocketState 枚举。
如果返回的 SocketState 为 CLOSED ,则调用 poller.cancelledKey() 方法,会把原始 sockte 关闭。
最后会把 SocketProcessor 实例回收到缓存 processorCache 中,以便下次使用不需要重新创建对象,从而提高效率。
另外 ConnectionHandler 是 global 对象,也就是说所有的连接处理均由这个对象处理。根据以前文章,该实例中有一个 Map 对象,key 为SocketWrapper 对象类型,对应的 value 为 Http11Processor 类型。也就是说为连接中的每一个请求(request)都去分配了相应处理类 Http11Processor 实例,可以保存连接上请求的状态信息(例如解析请求行,请求头等数据)。
ConnectionHandler的核心代码逻辑如下
private final Map<S,Processor> connections = new ConcurrentHashMap<>();
private final RecycledProcessors recycledProcessors = new RecycledProcessors(this);public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.process",wrapper.getSocket(), status));}if (wrapper == null) {return SocketState.CLOSED;}S socket = wrapper.getSocket();Processor processor = connections.get(socket);if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",processor, socket));}if (SocketEvent.TIMEOUT == status && (processor == null || !processor.isAsync() || !processor.checkAsyncTimeoutGeneration())) {return SocketState.OPEN;}if (processor != null) {getProtocol().removeWaitingProcessor(processor);} else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) {return SocketState.CLOSED;}ContainerThreadMarker.set();try {if (processor == null) {String negotiatedProtocol = wrapper.getNegotiatedProtocol();if (negotiatedProtocol != null && negotiatedProtocol.length() > 0) {UpgradeProtocol upgradeProtocol =getProtocol().getNegotiatedProtocol(negotiatedProtocol);if (upgradeProtocol != null) {processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());} else if (negotiatedProtocol.equals("http/1.1")) {// Explicitly negotiated the default protocol.// Obtain a processor below.} else {if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail",negotiatedProtocol));}return SocketState.CLOSED;}}}if (processor == null) {processor = recycledProcessors.pop();if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.processorPop",processor));}}if (processor == null) {processor = getProtocol().createProcessor();register(processor);}processor.setSslSupport(wrapper.getSslSupport(getProtocol().getClientCertProvider()));connections.put(socket, processor);SocketState state = SocketState.CLOSED;do {state = processor.process(wrapper, status);if (state == SocketState.UPGRADING) {UpgradeToken upgradeToken = processor.getUpgradeToken();ByteBuffer leftOverInput = processor.getLeftoverInput();if (upgradeToken == null) {UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c");if (upgradeProtocol != null) {processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());wrapper.unRead(leftOverInput);connections.put(socket, processor);} else {if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail","h2c"));}return SocketState.CLOSED;}} else {HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();release(processor);processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken);if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate",processor, wrapper));}wrapper.unRead(leftOverInput);wrapper.setUpgraded(true);connections.put(socket, processor);if (upgradeToken.getInstanceManager() == null) {httpUpgradeHandler.init((WebConnection) processor);} else {ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);try {httpUpgradeHandler.init((WebConnection) processor);} finally {upgradeToken.getContextBind().unbind(false, oldCL);}}if (httpUpgradeHandler instanceof InternalHttpUpgradeHandler) {if (((InternalHttpUpgradeHandler) httpUpgradeHandler).hasAsyncIO()) {state = SocketState.LONG;}}}}} while ( state == SocketState.UPGRADING);if (state == SocketState.LONG) {longPoll(wrapper, processor);if (processor.isAsync()) {getProtocol().addWaitingProcessor(processor);}} else if (state == SocketState.OPEN) {connections.remove(socket);release(processor);wrapper.registerReadInterest();} else if (state == SocketState.SENDFILE) {// Sendfile in progress. If it fails, the socket will be// closed. If it works, the socket either be added to the// poller (or equivalent) to await more data or processed// if there are any pipe-lined requests remaining.} else if (state == SocketState.UPGRADED) {// Don't add sockets back to the poller if this was a// non-blocking write otherwise the poller may trigger// multiple read events which may lead to thread starvation// in the connector. The write() method will add this socket// to the poller if necessary.if (status != SocketEvent.OPEN_WRITE) {longPoll(wrapper, processor);}} else if (state == SocketState.SUSPENDED) {// Don't add sockets back to the poller.// The resumeProcessing() method will add this socket// to the poller.} else {connections.remove(socket);if (processor.isUpgrade()) {UpgradeToken upgradeToken = processor.getUpgradeToken();HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();InstanceManager instanceManager = upgradeToken.getInstanceManager();if (instanceManager == null) {httpUpgradeHandler.destroy();} else {ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);try {httpUpgradeHandler.destroy();} finally {try {instanceManager.destroyInstance(httpUpgradeHandler);} catch (Throwable e) {ExceptionUtils.handleThrowable(e);getLog().error(sm.getString("abstractConnectionHandler.error"), e);}upgradeToken.getContextBind().unbind(false, oldCL);}}} else {release(processor);}}return state;} catch(java.net.SocketException e) {getLog().debug(sm.getString("abstractConnectionHandler.socketexception.debug"), e);} catch (java.io.IOException e) {getLog().debug(sm.getString("abstractConnectionHandler.ioexception.debug"), e);} catch (ProtocolException e) {getLog().debug(sm.getString("abstractConnectionHandler.protocolexception.debug"), e);}catch (OutOfMemoryError oome) {getLog().error(sm.getString("abstractConnectionHandler.oome"), oome);} catch (Throwable e) {ExceptionUtils.handleThrowable(e);getLog().error(sm.getString("abstractConnectionHandler.error"), e);} finally {ContainerThreadMarker.clear();}connections.remove(socket);release(processor);return SocketState.CLOSED;
}
该实例中有一个 Map 对象,key 为原始 scoket 的包装对象 SocketWrapper 类型,value 为 Http11Processor 类型。为每个连接中的请求都分配处理类 Http11Processor 实例,可以保存连接上的请求状态信息(例如解析请求行,请求头等数据)。
该实例有 recycledProcessors 对象,用来保持已经回收的 Http11Processor 实例,避免下次使用重新创建对象,提高效率。
该实例核心方法为 process() 方法,代码比较多,这里总结关键点。
该实例寻找 Map 中原始 socket 的包装对象关联的连接处理类 Http11Processor 实例,如果没有则创建新实例,并将 socket 包装对象与其关联到 Map 里,下次同一个连接中有数据的时候可以重用,而且也可以保存连接状态信息在 Http11Processor 实例中。
该实例调用 Http11Processor 实例的 process 方法并返回 SocketState。
如果返回的状态是代表 upgrade 协议(例如websocket连接等),则处理 upgrade 协议,这里不对 upgrade 协议详细展开。
如果回的状态为 SocketState.LONG ,则代表要么是数据(请求行/请求头)没有解析完(因为 client 端没有发送完请求行/请求头数据),要么是执行了 servlet 的异步请求。
对于 SocketState.LONG 的返回值调用如下:
protected void longPoll(SocketWrapperBase<?> socket, Processor processor) { if (!processor.isAsync()) { // This is currently only used with HTTP // Either: // - this is an upgraded connection // - the request line/headers have not been completely // readsocket.registerReadInterest();} } //SocketWrapper public void registerReadInterest() {getPoller().add(this, SelectionKey.OP_READ); }
longPoll() 方法在 servlet 非异步请求(请求行/请求头数据没有解析完)的情况,对 socket 包装对象注册 OP_READ 事件,并添加到 poller 线程的事件队列里,让 poller 线程继续监听 client 端可读事件。
对于 SocketState.OPEN 的返回值调用:
connections.remove(socket); release(processor); wrapper.registerReadInterest();
SocketState.OPEN 一般代表 servlet API 调用正常,返回 OPEN 表示该连接为长连接,不关闭原始 socket 。所以在 Map中会去移除 socket 和Http11Processor 的对应关系,来释放当前 Http11Processor 实例以便后续重用。由于是长连接,所以和异步处理方式一样,对 socket 包装对象注册 OP_READ 事件,并添加到 poller 线程事件队列中,让 poller 线程继续监听 client 端可读事件。
在最后的 else 分支中代表返回的期望状态为 CLOSED ,表示该连接需要关闭,则在 Map 中移除 socket 和 Http11Processor 的对应关系,然后会释放当前 Http11Processor 实例以便后续重用。根据上面 ConnectionHanlder 的分析,如果返回的 SocketState 枚举的结果为 CLOSED,则会去调用 poller.cancelledKey() 方法,从而把原始 socket 关闭。
目前先写到这里,下一篇文章里我们继续介绍 tomcat io 线程涉及的其他关键类。
Tomcat NIO(9)-IO线程-Overall流程和关键类相关推荐
- io 错误: socket closed_Tomcat NIO(9)IO线程Overall流程和关键类
在上一篇文章里我们主要介绍了 tomcat NIO 中 poller 线程的阻塞与唤醒,根据以前文章当 poller 线程监测到连接有数据可读事件的时候,会把原始 socket 的包装对象委托到 to ...
- 【JavaSE8 高级编程 IO/NIO】IO入门系列①之抽象基类节点流转换流 2019_8_16
IO输入输出 IO 实现体系概述 [文档级] ①IO基石 四抽象基类 [IS,OS / R,W]抽象基类简述 子类及其实现接口 字节(FIS,OIS)字符(BR,ISR)读 字节(FOS,OOS,PS ...
- Netty开发的基本流程及关键类说明
- 详解 Tomcat 的连接数与线程池
原文出处:编程迷思 前言 在使用tomcat时,经常会遇到连接数.线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector). 在前面的文章 详解Tomcat配置文 ...
- 详解tomcat的连接数与线程池
前言 在使用tomcat时,经常会遇到连接数.线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector). 在前面的文章 详解Tomcat配置文件server.xm ...
- tomcat线程释放时间_详解tomcat的连接数与线程池
前言 在使用tomcat时,经常会遇到连接数.线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector). 在前面的文章 详解Tomcat配置文件server.xm ...
- NETGEAR拒绝连接请求_详解 Tomcat 的连接数与线程池
点击上方蓝色字体,选择"标星公众号" 优质文章,第一时间送达 关注公众号后台回复pay或mall获取实战项目资料视频 点击此链接:多套SpringCloud/SpringBoot实 ...
- tomcat的连接数与线程池
在使用tomcat时,经常会遇到连接数.线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector). Connector的主要功能,是接收连接请求,创建Reques ...
- java基础巩固-宇宙第一AiYWM:为了维持生计,四大基础之OS_Part_2整起~IO们那些事【包括五种IO模型:(BIO、NIO、IO多路复用、信号驱动、AIO);零拷贝、事件处理及并发等模型】
PART0.前情提要: 通常用户进程的一个完整的IO分为两个阶段(IO有内存IO.网络IO和磁盘IO三种,通常我们说的IO指的是后两者!):[操作系统和驱动程序运行在内核空间,应用程序运行在用户空间, ...
最新文章
- 【入门级】自学Python第一步,记住这7大编码规范
- android notification点击无效,Notification.addAction在Android O中无效
- ad file type not recognised_Java实用工具类:File工具类方法学习,可创建目录及文件...
- 腾讯视频怎么上传自己的视频?
- 【今日CS 视觉论文速览】Fri, 1 Feb 2019
- java请求响应中转_J2EE中的请求中转、重定向和包含关系
- 后台传一个状态值,如果在vue设置成正常停用?
- python documents in chinese_python xlwt 设置 格式
- php最小二乘法,Python中实现最小二乘法思路及实现代码
- 拉线传感器的数显仪表在测量中起到多大的作用
- javascript实现数字补全不足补零
- 数组:605.种花问题
- css元素发光效果图,纯CSS3实现圆圈动态发光特效动画的示例代码
- android2.2智能手机,Android 2.2入门智能手机 海尔N6E评测
- B2C电子商务基础系统架构解析
- 笔记本/win10 缩放布局改为100% 分辨率1920x1080,任务栏右下角原有的所有图,标信息显示不出来
- 打死我也不说(深度优先搜索)
- 17、java.lang.UnsatisfiedLinkError: No implementation 处理方法
- 第三十一章 即时编译
- 数据结构实验1-日期:9月8日
热门文章
- 怎么主动开发客户,为什么你需要主动开发海外客户
- linux用户环境下进入uboot,嵌入式linux开发uboot移植(一)——uboot项目简介
- iOS8-Swift开发教程-李珊-专题视频课程
- web期末作业网页设计——我的家乡(网页源码)
- 监控白名单WMIC执行payload行为
- 电影院场次管理java_电影放映时间选择_09-JAVASE项目实战-电影管理系统_Java视频-51CTO学院...
- 银行使用计算机实现通存通兑是什么,银行柜台上办业务用的那种计算机是什么计算机?...
- uniapp微信小程序下载文件,保存文件功能总结
- 入局先进空中交通,Overair推出全电动垂直起降飞机
- C++中野指针产生的途径?避免野指针产生的方法。