在上一篇文章里我们主要介绍了 tomcat NIO 中 poller 线程的阻塞与唤醒,根据以前文章当 poller 线程监测到连接有数据可读事件的时候,会把原始 socket 的包装对象委托到 tomcat io 线程池中处理,包括解析请求行,请求头,调用 servlet API,处理异步等等,在这里我们主要介绍 tomcat io 线程。

对于tomcat io线程我们主要介绍:

  • IO 线程 overall 流程

  • IO 线程主要涉及的类以及作用

IO线程overall流程

对于 tomcat io 线程来说,overall 调用序列图如下 :

  • SocketProcessor.run()-->

    ConnectionHandler.process()-->

    AbstractProcessorLight.process()-->

    Http11Processor.service-->

    CoyoteAdapter.service() 一直到 container 调用标准 servlet API。

  • 涉及的关键类有

    SocketProcessor,ConnectionHandler

    AbstractProcessorLight,

    Http11Processor,CoyoteAdapter

SocketProcessor的核心代码逻辑如下

protected void doRun() {NioChannel socket = socketWrapper.getSocket();SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());Poller poller = NioEndpoint.this.poller;if (poller == null) {socketWrapper.close();return;}try {int handshake = -1;try {if (key != null) {if (socket.isHandshakeComplete()) {handshake = 0;} else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||event == SocketEvent.ERROR) {handshake = -1;} else {handshake = socket.handshake(key.isReadable(), key.isWritable());event = SocketEvent.OPEN_READ;}}} catch (IOException x) {handshake = -1;if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);} catch (CancelledKeyException ckx) {handshake = -1;}if (handshake == 0) {SocketState state = SocketState.OPEN;if (event == null) {state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);} else {state = getHandler().process(socketWrapper, event);}if (state == SocketState.CLOSED) {poller.cancelledKey(key, socketWrapper);}} else if (handshake == -1 ) {poller.cancelledKey(key, socketWrapper);} else if (handshake == SelectionKey.OP_READ){socketWrapper.registerReadInterest();} else if (handshake == SelectionKey.OP_WRITE){socketWrapper.registerWriteInterest();}} catch (CancelledKeyException cx) {poller.cancelledKey(key, socketWrapper);} catch (VirtualMachineError vme) {ExceptionUtils.handleThrowable(vme);} catch (Throwable t) {log.error(sm.getString("endpoint.processing.fail"), t);poller.cancelledKey(key, socketWrapper);} finally {socketWrapper = null;event = null;if (running && !paused && processorCache != null) {processorCache.push(this);}}
}
  • 首先会处理 handshake,如果 handshake 没有问题则返回 handshake 的结果为 0。

  • 如果 handshake 过程处理正常没问题,则会通过调用 getHandler().process(socketWrapper, event) 方法从而来间接触发 ConnectionHandler 实例的 process() 方法,并返回期望原始 socket 的状态 SocketState 枚举。

  • 如果返回的 SocketState 为 CLOSED ,则调用 poller.cancelledKey() 方法,会把原始 sockte 关闭。

  • 最后会把 SocketProcessor 实例回收到缓存 processorCache 中,以便下次使用不需要重新创建对象,从而提高效率。

  • 另外 ConnectionHandler 是 global 对象,也就是说所有的连接处理均由这个对象处理。根据以前文章,该实例中有一个 Map 对象,key 为SocketWrapper 对象类型,对应的 value 为 Http11Processor 类型。也就是说为连接中的每一个请求(request)都去分配了相应处理类 Http11Processor 实例,可以保存连接上请求的状态信息(例如解析请求行,请求头等数据)。

ConnectionHandler的核心代码逻辑如下

private final Map<S,Processor> connections = new ConcurrentHashMap<>();
private final RecycledProcessors recycledProcessors = new RecycledProcessors(this);public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.process",wrapper.getSocket(), status));}if (wrapper == null) {return SocketState.CLOSED;}S socket = wrapper.getSocket();Processor processor = connections.get(socket);if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",processor, socket));}if (SocketEvent.TIMEOUT == status && (processor == null || !processor.isAsync() || !processor.checkAsyncTimeoutGeneration())) {return SocketState.OPEN;}if (processor != null) {getProtocol().removeWaitingProcessor(processor);} else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) {return SocketState.CLOSED;}ContainerThreadMarker.set();try {if (processor == null) {String negotiatedProtocol = wrapper.getNegotiatedProtocol();if (negotiatedProtocol != null && negotiatedProtocol.length() > 0) {UpgradeProtocol upgradeProtocol =getProtocol().getNegotiatedProtocol(negotiatedProtocol);if (upgradeProtocol != null) {processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());} else if (negotiatedProtocol.equals("http/1.1")) {// Explicitly negotiated the default protocol.// Obtain a processor below.} else {if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail",negotiatedProtocol));}return SocketState.CLOSED;}}}if (processor == null) {processor = recycledProcessors.pop();if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.processorPop",processor));}}if (processor == null) {processor = getProtocol().createProcessor();register(processor);}processor.setSslSupport(wrapper.getSslSupport(getProtocol().getClientCertProvider()));connections.put(socket, processor);SocketState state = SocketState.CLOSED;do {state = processor.process(wrapper, status);if (state == SocketState.UPGRADING) {UpgradeToken upgradeToken = processor.getUpgradeToken();ByteBuffer leftOverInput = processor.getLeftoverInput();if (upgradeToken == null) {UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c");if (upgradeProtocol != null) {processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());wrapper.unRead(leftOverInput);connections.put(socket, processor);} else {if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail","h2c"));}return SocketState.CLOSED;}} else {HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();release(processor);processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken);if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate",processor, wrapper));}wrapper.unRead(leftOverInput);wrapper.setUpgraded(true);connections.put(socket, processor);if (upgradeToken.getInstanceManager() == null) {httpUpgradeHandler.init((WebConnection) processor);} else {ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);try {httpUpgradeHandler.init((WebConnection) processor);} finally {upgradeToken.getContextBind().unbind(false, oldCL);}}if (httpUpgradeHandler instanceof InternalHttpUpgradeHandler) {if (((InternalHttpUpgradeHandler) httpUpgradeHandler).hasAsyncIO()) {state = SocketState.LONG;}}}}} while ( state == SocketState.UPGRADING);if (state == SocketState.LONG) {longPoll(wrapper, processor);if (processor.isAsync()) {getProtocol().addWaitingProcessor(processor);}} else if (state == SocketState.OPEN) {connections.remove(socket);release(processor);wrapper.registerReadInterest();} else if (state == SocketState.SENDFILE) {// Sendfile in progress. If it fails, the socket will be// closed. If it works, the socket either be added to the// poller (or equivalent) to await more data or processed// if there are any pipe-lined requests remaining.} else if (state == SocketState.UPGRADED) {// Don't add sockets back to the poller if this was a// non-blocking write otherwise the poller may trigger// multiple read events which may lead to thread starvation// in the connector. The write() method will add this socket// to the poller if necessary.if (status != SocketEvent.OPEN_WRITE) {longPoll(wrapper, processor);}} else if (state == SocketState.SUSPENDED) {// Don't add sockets back to the poller.// The resumeProcessing() method will add this socket// to the poller.} else {connections.remove(socket);if (processor.isUpgrade()) {UpgradeToken upgradeToken = processor.getUpgradeToken();HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();InstanceManager instanceManager = upgradeToken.getInstanceManager();if (instanceManager == null) {httpUpgradeHandler.destroy();} else {ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);try {httpUpgradeHandler.destroy();} finally {try {instanceManager.destroyInstance(httpUpgradeHandler);} catch (Throwable e) {ExceptionUtils.handleThrowable(e);getLog().error(sm.getString("abstractConnectionHandler.error"), e);}upgradeToken.getContextBind().unbind(false, oldCL);}}} else {release(processor);}}return state;} catch(java.net.SocketException e) {getLog().debug(sm.getString("abstractConnectionHandler.socketexception.debug"), e);} catch (java.io.IOException e) {getLog().debug(sm.getString("abstractConnectionHandler.ioexception.debug"), e);} catch (ProtocolException e) {getLog().debug(sm.getString("abstractConnectionHandler.protocolexception.debug"), e);}catch (OutOfMemoryError oome) {getLog().error(sm.getString("abstractConnectionHandler.oome"), oome);} catch (Throwable e) {ExceptionUtils.handleThrowable(e);getLog().error(sm.getString("abstractConnectionHandler.error"), e);} finally {ContainerThreadMarker.clear();}connections.remove(socket);release(processor);return SocketState.CLOSED;
}
  • 该实例中有一个 Map 对象,key 为原始 scoket 的包装对象 SocketWrapper 类型,value 为 Http11Processor 类型。为每个连接中的请求都分配处理类 Http11Processor 实例,可以保存连接上的请求状态信息(例如解析请求行,请求头等数据)。

  • 该实例有 recycledProcessors 对象,用来保持已经回收的 Http11Processor 实例,避免下次使用重新创建对象,提高效率。

  • 该实例核心方法为 process() 方法,代码比较多,这里总结关键点。

  • 该实例寻找 Map 中原始 socket 的包装对象关联的连接处理类 Http11Processor 实例,如果没有则创建新实例,并将 socket 包装对象与其关联到 Map 里,下次同一个连接中有数据的时候可以重用,而且也可以保存连接状态信息在 Http11Processor 实例中。

  • 该实例调用 Http11Processor 实例的 process 方法并返回 SocketState。

  • 如果返回的状态是代表 upgrade 协议(例如websocket连接等),则处理 upgrade 协议,这里不对 upgrade 协议详细展开。

  • 如果回的状态为 SocketState.LONG ,则代表要么是数据(请求行/请求头)没有解析完(因为 client 端没有发送完请求行/请求头数据),要么是执行了 servlet 的异步请求。

  • 对于 SocketState.LONG 的返回值调用如下:

    protected void longPoll(SocketWrapperBase<?> socket, Processor processor) {
    if (!processor.isAsync()) {
    // This is currently only used with HTTP
    // Either:
    //  - this is an upgraded connection
    //  - the request line/headers have not been completely
    //    readsocket.registerReadInterest();}
    }
    //SocketWrapper
    public void registerReadInterest() {getPoller().add(this, SelectionKey.OP_READ);
    }
    
  • longPoll() 方法在 servlet 非异步请求(请求行/请求头数据没有解析完)的情况,对 socket 包装对象注册 OP_READ 事件,并添加到 poller 线程的事件队列里,让 poller 线程继续监听 client 端可读事件。

  • 对于 SocketState.OPEN 的返回值调用:

    connections.remove(socket);
    release(processor);
    wrapper.registerReadInterest();
    
  • SocketState.OPEN 一般代表 servlet API 调用正常,返回 OPEN 表示该连接为长连接,不关闭原始 socket 。所以在 Map中会去移除 socket 和Http11Processor 的对应关系,来释放当前 Http11Processor 实例以便后续重用。由于是长连接,所以和异步处理方式一样,对 socket 包装对象注册 OP_READ 事件,并添加到 poller 线程事件队列中,让 poller 线程继续监听 client 端可读事件。

  • 在最后的 else 分支中代表返回的期望状态为 CLOSED ,表示该连接需要关闭,则在 Map 中移除 socket 和 Http11Processor 的对应关系,然后会释放当前 Http11Processor 实例以便后续重用。根据上面 ConnectionHanlder 的分析,如果返回的 SocketState 枚举的结果为 CLOSED,则会去调用 poller.cancelledKey() 方法,从而把原始 socket 关闭。

目前先写到这里,下一篇文章里我们继续介绍 tomcat io 线程涉及的其他关键类。

Tomcat NIO(9)-IO线程-Overall流程和关键类相关推荐

  1. io 错误: socket closed_Tomcat NIO(9)IO线程Overall流程和关键类

    在上一篇文章里我们主要介绍了 tomcat NIO 中 poller 线程的阻塞与唤醒,根据以前文章当 poller 线程监测到连接有数据可读事件的时候,会把原始 socket 的包装对象委托到 to ...

  2. 【JavaSE8 高级编程 IO/NIO】IO入门系列①之抽象基类节点流转换流 2019_8_16

    IO输入输出 IO 实现体系概述 [文档级] ①IO基石 四抽象基类 [IS,OS / R,W]抽象基类简述 子类及其实现接口 字节(FIS,OIS)字符(BR,ISR)读 字节(FOS,OOS,PS ...

  3. Netty开发的基本流程及关键类说明

  4. 详解 Tomcat 的连接数与线程池

    原文出处:编程迷思 前言 在使用tomcat时,经常会遇到连接数.线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector). 在前面的文章 详解Tomcat配置文 ...

  5. 详解tomcat的连接数与线程池

    前言 在使用tomcat时,经常会遇到连接数.线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector). 在前面的文章 详解Tomcat配置文件server.xm ...

  6. tomcat线程释放时间_详解tomcat的连接数与线程池

    前言 在使用tomcat时,经常会遇到连接数.线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector). 在前面的文章 详解Tomcat配置文件server.xm ...

  7. NETGEAR拒绝连接请求_详解 Tomcat 的连接数与线程池

    点击上方蓝色字体,选择"标星公众号" 优质文章,第一时间送达 关注公众号后台回复pay或mall获取实战项目资料视频 点击此链接:多套SpringCloud/SpringBoot实 ...

  8. tomcat的连接数与线程池

    在使用tomcat时,经常会遇到连接数.线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector). Connector的主要功能,是接收连接请求,创建Reques ...

  9. java基础巩固-宇宙第一AiYWM:为了维持生计,四大基础之OS_Part_2整起~IO们那些事【包括五种IO模型:(BIO、NIO、IO多路复用、信号驱动、AIO);零拷贝、事件处理及并发等模型】

    PART0.前情提要: 通常用户进程的一个完整的IO分为两个阶段(IO有内存IO.网络IO和磁盘IO三种,通常我们说的IO指的是后两者!):[操作系统和驱动程序运行在内核空间,应用程序运行在用户空间, ...

最新文章

  1. 【入门级】自学Python第一步,记住这7大编码规范
  2. android notification点击无效,Notification.addAction在Android O中无效
  3. ad file type not recognised_Java实用工具类:File工具类方法学习,可创建目录及文件...
  4. 腾讯视频怎么上传自己的视频?
  5. 【今日CS 视觉论文速览】Fri, 1 Feb 2019
  6. java请求响应中转_J2EE中的请求中转、重定向和包含关系
  7. 后台传一个状态值,如果在vue设置成正常停用?
  8. python documents in chinese_python xlwt 设置 格式
  9. php最小二乘法,Python中实现最小二乘法思路及实现代码
  10. 拉线传感器的数显仪表在测量中起到多大的作用
  11. javascript实现数字补全不足补零
  12. 数组:605.种花问题
  13. css元素发光效果图,纯CSS3实现圆圈动态发光特效动画的示例代码
  14. android2.2智能手机,Android 2.2入门智能手机 海尔N6E评测
  15. B2C电子商务基础系统架构解析
  16. 笔记本/win10 缩放布局改为100% 分辨率1920x1080,任务栏右下角原有的所有图,标信息显示不出来
  17. 打死我也不说(深度优先搜索)
  18. 17、java.lang.UnsatisfiedLinkError: No implementation 处理方法
  19. 第三十一章 即时编译
  20. 数据结构实验1-日期:9月8日

热门文章

  1. 怎么主动开发客户,为什么你需要主动开发海外客户
  2. linux用户环境下进入uboot,嵌入式linux开发uboot移植(一)——uboot项目简介
  3. iOS8-Swift开发教程-李珊-专题视频课程
  4. web期末作业网页设计——我的家乡(网页源码)
  5. 监控白名单WMIC执行payload行为
  6. 电影院场次管理java_电影放映时间选择_09-JAVASE项目实战-电影管理系统_Java视频-51CTO学院...
  7. 银行使用计算机实现通存通兑是什么,银行柜台上办业务用的那种计算机是什么计算机?...
  8. uniapp微信小程序下载文件,保存文件功能总结
  9. 入局先进空中交通,Overair推出全电动垂直起降飞机
  10. C++中野指针产生的途径?避免野指针产生的方法。