Spring对数据库连接池的支持

常见的数据库连接池有c3p0,dbcp以及druid,这里使用的是dbcp

前文中使用DataSourceUtils获取和释放connection,代码如下:

//org.springframework.jdbc.datasource.DataSourceUtilspublic abstract class DataSourceUtils {//获取连接public static Connection getConnection(DataSource dataSource) {return doGetConnection(dataSource);}public static Connection doGetConnection(DataSource dataSource) {//spring 事务相关--不做过多分析,后续讲解ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {conHolder.requested();if (!conHolder.hasConnection()) {conHolder.setConnection(dataSource.getConnection());}return conHolder.getConnection();}//1.Connection con = dataSource.getConnection();//spring 事务相关--不做过多分析,后续讲解if (TransactionSynchronizationManager.isSynchronizationActive()) {ConnectionHolder holderToUse = conHolder;if (holderToUse == null) {holderToUse = new ConnectionHolder(con);} else {holderToUse.setConnection(con);}holderToUse.requested();TransactionSynchronizationManager.registerSynchronization(new ConnectionSynchronization(holderToUse, dataSource));holderToUse.setSynchronizedWithTransaction(true);if (holderToUse != conHolder) {TransactionSynchronizationManager.bindResource(dataSource, holderToUse);}}return con;}public static void releaseConnection(Connection con, DataSource dataSource) {doReleaseConnection(con, dataSource);}public static void doReleaseConnection(Connection con, DataSource dataSource){if (con == null) {return;}//spring 事务相关 .... 略....doCloseConnection(con, dataSource);}
}

针对上述关于事务的部分:
在spring事务部分会做详细介绍,这里大致说明一下流程:

  • 在spring事务方法invoke时,或尝试获取事务信息,在org.springframework.jdbc.datasource.DataSourceTransactionManager#getTransaction(TransactionDefinition)
  • 在该方法第一步,会通过doGetTransaction()来从TransactionSynchronizationManager.getResource(this.dataSource)获取conn信息(ThreadLocal,如果没有则返回null),并绑定到事务对象txObject中
  • 如果当前事务对象中没有conn,则会新创建一个,然后通过TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());绑定到当前线程中; 这段逻辑在doBegin()体现;

dbcp.BasicDataSource

根据配置,<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">,

我们得知使用的dataSource具体类型为BasicDataSource.

//org.apache.commons.dbcp.BasicDataSourcepublic class BasicDataSource implements DataSource {protected volatile GenericObjectPool connectionPool = null;protected volatile DataSource dataSource = null;//配置一系列参数......protected String driverClassName = null;protected String username = null;protected volatile String password = null;protected String url = null;//验证sql,如: select 1 from dual;protected volatile String validationQuery = null;public Connection getConnection() throws SQLException {//a. createDataSource()//b. 根据创建的DataSourceImpl.getConnection(); //这里使用的是PoolingDataSourcereturn createDataSource().getConnection();}protected synchronized DataSource createDataSource(){// 如果dataSource已经创建 直接返回.if (dataSource != null) {return (dataSource);}//1.创建返回原始物理连接的工厂ConnectionFactory driverConnectionFactory = createConnectionFactory();//2.为我们的连接创建一个池createConnectionPool();//statementPoolFactory 略...GenericKeyedObjectPoolFactory statementPoolFactory = null;//3.创建池化的连接工厂createPoolableConnectionFactory(driverConnectionFactory, statementPoolFactory, abandonedConfig);//4.创建dataSource实例,createDataSourceInstance();//5. 连接池 初始化 initialSize 个connectionfor (int i = 0 ; i < initialSize ; i++) {connectionPool.addObject();}return dataSource;}
}

获取连接池分为两步:

  • 创建连接工厂-createConnectionFactory()
  • 获取连接池-getConnection()

createDataSource()

总体的步骤如下图:

  1. 创建"物理"连接的工厂— createConnectionFactory()
  2. 创建连接池 — createConnectionPool()
  3. 创建“池化”的工厂
  4. 创建"池化"DataSource
  5. 初始化connection

1. createConnectionFactory()

protected ConnectionFactory createConnectionFactory() throws SQLException {// 创建JDBC driver实例(略....)Driver driver = DriverManager.getDriver(url);// Can't test without a validationQueryif (validationQuery == null) {setTestOnBorrow(false);setTestOnReturn(false);setTestWhileIdle(false);}String user = username;connectionProperties.put("user", user);String pwd = password;connectionProperties.put("password", pwd);//创建 DriverConnectionFactoryConnectionFactory driverConnectionFactory = new DriverConnectionFactory(driver, url, connectionProperties);return driverConnectionFactory;
}

2. createConnectionPool()

protected void createConnectionPool() {// 创建一个连接池以包含所有的活动连接。GenericObjectPool gop;if ((abandonedConfig != null) && (abandonedConfig.getRemoveAbandoned())) {gop = new AbandonedObjectPool(null,abandonedConfig);} else {gop = new GenericObjectPool();}gop.setMaxActive(maxActive);gop.setMaxIdle(maxIdle);gop.setMinIdle(minIdle);gop.setMaxWait(maxWait);gop.setTestOnBorrow(testOnBorrow);gop.setTestOnReturn(testOnReturn);gop.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);gop.setNumTestsPerEvictionRun(numTestsPerEvictionRun);gop.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);gop.setTestWhileIdle(testWhileIdle);connectionPool = gop;
}

createPoolableConnectionFactory()

protected void createPoolableConnectionFactory(ConnectionFactory driverConnectionFactory,KeyedObjectPoolFactory statementPoolFactory, AbandonedConfig configuration){PoolableConnectionFactory connectionFactory = null;//PoolableConnectionFactory的构造函数中://_pool.setFactory(this); //即:将PoolableConnectionFactory设置到connectionPool的_factory属性中.connectionFactory =new PoolableConnectionFactory(driverConnectionFactory,connectionPool,statementPoolFactory,validationQuery,validationQueryTimeout,connectionInitSqls,defaultReadOnly,defaultAutoCommit,defaultTransactionIsolation,defaultCatalog,configuration);validateConnectionFactory(connectionFactory);
}

4. createDataSourceInstance()

protected void createDataSourceInstance() {PoolingDataSource pds = new PoolingDataSource(connectionPool);pds.setAccessToUnderlyingConnectionAllowed(isAccessToUnderlyingConnectionAllowed());pds.setLogWriter(logWriter);dataSource = pds;}

5. 初始化连接池

根据配置的initialSize属性,来初始n个连接,填充连接池。

for (int i = 0 ; i < initialSize ; i++) {connectionPool.addObject();
}

操作连接池

代码逻辑(粗)

1.初始化连接池

createDateSource中会for-each调用connectionPool.addObject();初始化连接池:

//org.apache.commons.pool.impl.GenericObjectPool
public void addObject() throws Exception {//1. 此时_factory的具体类型是:PoolableConnectionFactory,创建连接Object obj = _factory.makeObject();addObjectToPool(obj, false);
}

2.获取连接 – getConnection()

createDataSource返回的真正类型为PoolingDataSource

//org.apache.commons.dbcp.PoolingDataSourcepublic Connection getConnection() throws SQLException {Connection conn = (Connection)(_pool.borrowObject());if (conn != null) {conn = new PoolGuardConnectionWrapper(conn);} return conn;
}

3.释放连接—releaseConnection()

PoolGuardConnectionWrapperPoolingDataSource的内部类,它由个delegate成员变量,释放连接的是有delegate完成的。

//org.apache.commons.dbcp.PoolingDataSource$PoolGuardConnectionWrapperprivate class PoolGuardConnectionWrapper extends DelegatingConnection {private Connection delegate;PoolGuardConnectionWrapper(Connection delegate) {super(delegate);this.delegate = delegate;}public void close() throws SQLException {if (delegate != null) {//delete具体类型为: PoolableConnectionthis.delegate.close();this.delegate = null;super.setDelegate(null);}}
}//org.apache.commons.dbcp.PoolableConnection
public synchronized void close() throws SQLException {if (_closed) {// already closedreturn;}if (!isUnderlyingConectionClosed) {//归还给连接池_pool.returnObject(this);} else {//释放连接(close),_pool.invalidateObject(this); }
}

调用逻辑图

源码分析

PoolableConnectionFactory

//org.apache.commons.dbcp.PoolableConnectionFactorypublic class PoolableConnectionFactory implements PoolableObjectFactory {//1.创建连接对象public Object makeObject() throws Exception {Connection conn = _connFactory.createConnection();//1.1initializeConnection(conn);return new PoolableConnection(conn,_pool,_config);}//1.1 执行配置的_connectionInitSqls 集合,初始化protected void initializeConnection(Connection conn) throws SQLException {Collection sqls = _connectionInitSqls;if(null != sqls) {Statement stmt = conn.createStatement();for (Iterator iterator = sqls.iterator(); iterator.hasNext();){stmt.execute(iterator.next().toString());}stmt.close();           }}//2.销毁连接对象(物理关闭)public void destroyObject(Object obj) throws Exception {if(obj instanceof PoolableConnection) {((PoolableConnection)obj).reallyClose();}}//3.校验连接对象有效性public boolean validateObject(Object obj) {if(obj instanceof Connection) {try {//3.1validateConnection((Connection) obj);return true;} catch(Exception e) {return false;}} else {return false;}}//3.1 执行配置的_validationQuery 集合,验证public void validateConnection(Connection conn) throws SQLException {String query = _validationQuery;if(null != query) {Statement stmt = null;ResultSet rset = null;try {stmt = conn.createStatement();rset = stmt.executeQuery(query);//必须返回一行记录,否则报错if(!rset.next()) {throw new SQLException("validationQuery didn't return a row");}} finally {rset.close();stmt.close();}}}//4. 使连接对象钝化public void passivateObject(Object obj) throws Exception {if(obj instanceof Connection) {Connection conn = (Connection)obj;if(!conn.getAutoCommit() && !conn.isReadOnly()) {conn.rollback();}conn.clearWarnings();if(!conn.getAutoCommit()) {conn.setAutoCommit(true);}}if(obj instanceof DelegatingConnection) {// setLastUsed(0); //将lastUsed设置0// _closed = true;((DelegatingConnection)obj).passivate();}}//5. 激活连接对象public void activateObject(Object obj) throws Exception {if(obj instanceof DelegatingConnection) {// _closed = false;// setLastUsed(); //将lastUsed设置为系统时间ms((DelegatingConnection)obj).activate();}if(obj instanceof Connection) {Connection conn = (Connection)obj;if (conn.getAutoCommit() != _defaultAutoCommit) {conn.setAutoCommit(_defaultAutoCommit);}//修改默认的隔离级别 //NONE ,READ_COMMITTED ,READ_UNCOMMITTED ,REPEATABLE_READ ,SERIALIZABLEif ((_defaultTransactionIsolation != UNKNOWN_TRANSACTIONISOLATION) && (conn.getTransactionIsolation() != _defaultTransactionIsolation)) {conn.setTransactionIsolation(_defaultTransactionIsolation);}if ((_defaultReadOnly != null) && (conn.isReadOnly() != _defaultReadOnly.booleanValue())) {conn.setReadOnly(_defaultReadOnly.booleanValue());}if ((_defaultCatalog != null) &&(!_defaultCatalog.equals(conn.getCatalog()))) {conn.setCatalog(_defaultCatalog);}}}
}

GenericObjectPool

//org.apache.commons.pool.impl.GenericObjectPoolpublic class GenericObjectPool extends BaseObjectPool implements ObjectPool {//从连接池中“借”走且尚未归还的连接对象, 即正在激活状态的连接对象private int _numActive = 0;//正在内部处理的对象(创建或销毁)总数, 不包含(active和idle)状态连接private int _numInternalProcessing = 0;//待分配的连接对象列表(可以理解为请求连接对象列表,按照线程到达顺序)private LinkedList _allocationQueue = new LinkedList();//连接池对象,注意连接池中都是可用对象(idle)private CursorableLinkedList _pool = null;//DEFAULT_TEST_ON_RETURN默认值为false, 标识在归还连接对象时是否需要validateObjectprivate volatile boolean _testOnReturn = DEFAULT_TEST_ON_RETURN; //1.初添加连接对象(此时空闲对象)至连接池中public void addObject() throws Exception {Object obj = _factory.makeObject();//此时的连接对象为空闲状态,所以无需递减正在运行的连接数addObjectToPool(obj, false);}//2.归还连接对象public void returnObject(Object obj) throws Exception {//归还活动的连接对象,decrementNumActive=trueaddObjectToPool(obj, true);}//3.添加连接对象到连接池中//无论是初始化还是return,连接对象在执行完此方法之后都将是空闲状态//decrementNumActive: 是否需要递减正在运行的连接数。private void addObjectToPool(Object obj, boolean decrementNumActive) throws Exception {boolean success = true;if(_testOnReturn && !(_factory.validateObject(obj))) {//如果开启_testOnReturn 且 连接对象未验证通过success = false;} else {//钝化连接对象_factory.passivateObject(obj);}//如果success=false,则需要销毁连接对象(物理销毁)boolean shouldDestroy = !success;synchronized (this) {if (isClosed()) {//如果连接池已经关闭,则直接销毁连接对象shouldDestroy = true;} else {if((_maxIdle >= 0) && (_pool.size() >= _maxIdle)) {//如果配置了最大空闲数 && 连接池中都是可用对象(idle状态) > 大于最大空闲数//则直接销毁对象shouldDestroy = true;} else if(success) {//采用两种策略,添加连接对象至pool,// LIFO(Last In, First out)// FIFO(First In, First Out)if (_lifo) {_pool.addFirst(new ObjectTimestampPair(obj));} else {_pool.addLast(new ObjectTimestampPair(obj));}if (decrementNumActive) {_numActive--;}allocate();}}}if(shouldDestroy) {//如果shouldDestroy,则直接销毁对象_factory.destroyObject(obj);// 如果shouldDestroy=true,则说明上述的 _lifo逻辑没有执行// 这里根据条件需要执行: _numActive--;和 allocate();逻辑if (decrementNumActive) {synchronized(this) {_numActive--;allocate();}}}}//4.从allocationQueue队列中获取可分配的Latch。private synchronized void allocate() {if (isClosed()) return;// 将_pool中空闲的示例分配给_allocationQueue中的latch 直至pool消耗完全;for (;;) {if (!_pool.isEmpty() && !_allocationQueue.isEmpty()) {Latch latch = (Latch) _allocationQueue.removeFirst();latch.setPair((ObjectTimestampPair) _pool.removeFirst());_numInternalProcessing++;synchronized (latch) {latch.notify();}} else {break;}}// 当_pool消耗完之后,而_allocationQueue仍有剩余(即仍有请求未被满足)// 如果未设置最大活动连接数,或者 正在活跃的连接数+正在处理的连接 < 最大活动连接数,则latch.setMayCreate()表示允许新创建;for(;;) {if((!_allocationQueue.isEmpty()) && (_maxActive < 0 || (_numActive + _numInternalProcessing) < _maxActive)) {Latch latch = (Latch) _allocationQueue.removeFirst();//latch.setMayCreate(true);_numInternalProcessing++;synchronized (latch) {latch.notify();}} else {break;}}}//5. 从连接池中"借用"连接对象public Object borrowObject() throws Exception {long starttime = System.currentTimeMillis();Latch latch = new Latch();byte whenExhaustedAction;long maxWait;synchronized (this) {whenExhaustedAction = _whenExhaustedAction;maxWait = _maxWait;// latch添加至queue_allocationQueue.add(latch);// 处理_allocationQueue, 分配_pool空闲idle连接实例allocate();}for(;;) {synchronized (this) {assertOpen();}//如果上述allocate()没有从_pool中分配idle的连接实例if(latch.getPair() == null) {//检查 是否允许创建if(latch.mayCreate()) {// 后续新创建连接} else {// 资源已耗尽....switch(whenExhaustedAction) {case WHEN_EXHAUSTED_GROW://GROW: 新创建连接break;case WHEN_EXHAUSTED_FAIL://FAIL:直接抛出异常throw new NoSuchElementException("Pool exhausted");case WHEN_EXHAUSTED_BLOCK://BLOCK策略:等待超过maxWait秒后抛出异常....default://默认策略:抛出IllegalArgumentException异常}}}boolean newlyCreated = false;if(null == latch.getPair()) {//新建对象连接对象...try {  Object obj = _factory.makeObject();latch.setPair(new ObjectTimestampPair(obj));newlyCreated = true;} finally {if (!newlyCreated) {// object没有被成功创建//能执行到这里,说明在try{}代码中抛出异常.synchronized (this) {_numInternalProcessing--;allocate();}}}}// activate & validate _factory.activateObject(latch.getPair().value);synchronized(this) {_numInternalProcessing--;_numActive++;}return latch.getPair().value;}}//6.废弃连接对象public void invalidateObject(Object obj) throws Exception {try {if (_factory != null) {//销毁连接对象(物理销毁)_factory.destroyObject(obj);}} finally {synchronized (this) {//活动连接数递减_numActive--;allocate();}}}//7.清空连接池public void clear() {List toDestroy = new ArrayList();synchronized(this) {toDestroy.addAll(_pool);_numInternalProcessing = _numInternalProcessing + _pool._size;_pool.clear();}destroy(toDestroy);}//8.for-each调用_factory.destroyObject(obj);销毁连接对象private void destroy(Collection c) {for (Iterator it = c.iterator(); it.hasNext();) {try {_factory.destroyObject(((ObjectTimestampPair)(it.next())).value);} finally {synchronized(this) {//_numInternalProcessing 递减_numInternalProcessing--;allocate();}}}}//9.当前连接池已经关闭(注意不是连接对象关闭)protected final boolean isClosed() {return closed;}
}

其中borrowObject的逻辑较为复杂,它的大致逻辑图如下:

spring+dbcp连接池源码分析相关推荐

  1. mybatisplus 集成druid连接池源码分析

    mybatisplus 集成druid连接池源码分析:从spring的源码过渡到druid的相关jar包,里面是druid相关的类,下面我们开始分析: 1.取数据库连接的地方入口:public abs ...

  2. 线程池源码分析-FutureTask

    1 系列目录 线程池接口分析以及FutureTask设计实现 线程池源码分析-ThreadPoolExecutor 该系列打算从一个最简单的Executor执行器开始一步一步扩展到ThreadPool ...

  3. cl.zk0.info/index.php,兄弟连区块链入门到精通教程btcpool矿池源码分析环境搭建

    原标题:兄弟连区块链入门到精通教程btcpool矿池源码分析环境搭建 btcpool矿池-测试环境搭建及使用cgminer测试 本文档基于Ubuntu 16.04 LTS, 64 Bits. 安装Bi ...

  4. spring boot 2.0 源码分析(二)

    在上一章学习了spring boot 2.0启动的大概流程以后,今天我们来深挖一下SpringApplication实例变量的run函数. 先把这段run函数的代码贴出来: /*** Run the ...

  5. spring cloud集成 consul源码分析

    1.简介 1.1 Consul is a tool for service discovery and configuration. Consul is distributed, highly ava ...

  6. 14.QueuedConnection和BlockingQueuedConnection连接方式源码分析

    QT信号槽直连时的时序和信号槽的连接方式已经在前面的文章中分析过了,见https://blog.csdn.net/Master_Cui/article/details/109011425和https: ...

  7. Java线程池 源码分析

    1.个人总结及想法: (1)ThreadPoolExecutor的继承关系? ThreadPoolExecutor继承AbstractExectorService,AbstractExecutorSe ...

  8. java 线程池 源码_java线程池源码分析

    我们在关闭线程池的时候会使用shutdown()和shutdownNow(),那么问题来了: 这两个方法又什么区别呢? 他们背后的原理是什么呢? 线程池中线程超过了coresize后会怎么操作呢? 为 ...

  9. 吐血整理:Java线程池源码分析(基于JDK1.8建议收藏)

    文章目录 一.引言 二.线程池的参数介绍 1.ThreadPoolExecutor的UML图 三.线程池的使用 1.线程池的工作原理 2.线程池类型 2.1.newCachedThreadPool使用 ...

最新文章

  1. php mysql用户登录_php mysql实现用户登录功能的代码示例
  2. 判断浮点数溢出的方法
  3. d3.js 入门指南 - 仪表盘
  4. 小程序 报错errMsg: “hideLoading:fail:toast can‘t be found“ ?
  5. 飞机上的氧气面罩有什么用_第2部分—另一个面罩检测器……(
  6. 12.1、Libgdx的图像之持续性和非持续性渲染
  7. 150W光速秒充!realme真我GT Neo3正式发布 售价1999元起
  8. C#针对js escape解码
  9. flex布局实现垂直居中
  10. python车辆型号识别_汽车型号和款式的识别 如何通过车架号来识别车的型号款式?查汽车型号...
  11. matlab 如何查数据类型,MATLAB数据类型
  12. Selenium学习笔记
  13. 工作流与BPM的区别
  14. vue组件通讯:父传子、子传父、事件发射详解
  15. echarts饼图 外圈转动动画 pie
  16. centos7 apache 虚拟目录 You don't have permission to access / on this server 解决方法
  17. Ubuntu 系统开机卡住,解决
  18. 部署3PAR VSP5.1.0.0监控3PAR存储状态实时邮件告警
  19. 《英语阅读教学与思维发展》读书笔记(二)
  20. 20年了,广州生活垃圾分类华丽变身——记广州市垃圾分类发展历程

热门文章

  1. jQueryEasyU- Messager基本使用
  2. 联合证券|金融部门开年推出新方案 改善优质房企资产负债状况
  3. Linux的安装与卸载软件
  4. 开篇:机械手设计挑战——仿人机器人设计领域上的高峰
  5. linux安卓教程视频教程,陈超 FFMPEG跨平台iOSAndroidLinux高级开发实战视频教程
  6. 关于晶振的那些事……
  7. 张涵诚:海关大数据平台与应用的场景思考
  8. SV_LAB学习02篇 LAB2
  9. 让网页版面自动适应屏幕分辨率的技巧
  10. GPIO模拟红外发射