• Zebra源码分析-SingleDataSource

    • 1. 简介

      • 1.1 层级结构
      • 1.2 内部结构
    • 2. 使用示例
      • 2.1 直接使用JDBC
      • 2.2 结合MyBatis以及Spring
    • 3.源码分析
      • 3.1 SingleDataSource初始化流程
      • 3.2 SingleConnection获取及销毁
      • 3.3 SingleStatement执行

Zebra源码分析-SingleDataSource

1. 简介


上一篇文章主要对Zebra数据库中间件做了简单的介绍,并且整理了JDBC相关的知识,接下来开始按照数据源的类型对Zebra进行源码级别的分析。本篇文章将会分析SingleDataSource的继承关系,以及初始化和执行SQL的流程和设计模式。

1.1 层级结构

AbstractDataSource是一个抽象类,对DataSource接口进行了包装,而其他的DataSource都是由它的子类,SingleDataSourceGroupDataSource则是通过继承c3p0的适配而来。总而言之,他们都是DataSource接口的实现。

1.2 内部结构

每一个SingleDataSource实际上都是对真实的数据库连接池进行包装,再其外部增加上配置文件的合并,连接和断开的监控。需要注意的是内部有一个拦截器,通过责任链的设计模式将获取链接、关闭连接、执行语句等通过一个个Filter进行处理。

2. 使用示例


2.1 直接使用JDBC

和使用普通的数据源类似,使用时需要先通过各类参数初始化数据源,获取连接后创建statement对象,并通过它执行对应的SQL语句,最后释放连接。

package com.dianping.zebra.sample.jdbc;import com.dianping.zebra.group.jdbc.GroupDataSource;
import com.dianping.zebra.single.jdbc.SingleDataSource;
import org.junit.Test;import java.sql.*;public class SingleDataSourceSample {@Testpublic void singleTest() {SingleDataSource ds = buildDs();selectValue(ds);}private void selectValue(SingleDataSource ds) {Connection conn = null;Statement stmt = null;ResultSet rs = null;
//从数据源中获取连接,并创建执行语句,执行完成后获得结果,并最终关闭连接try {conn = ds.getConnection();stmt = conn.createStatement();rs = stmt.executeQuery("SELECT * From `user`");while (rs.next()) {System.out.println("mis : " + rs.getString(2));}} catch (SQLException e) {} finally {if (rs != null) {try {rs.close();} catch (SQLException e) {}}if (stmt != null) {try {stmt.close();} catch (SQLException e) {}}if (conn != null) {try {conn.close();} catch (SQLException e) {}}}}
// 设置地址、用户名、密码以及数据库链接驱动等信息,对数据源进行初始化  private SingleDataSource buildDs() {SingleDataSource ds = new SingleDataSource();ds.setJdbcUrl("jdbc:mysql://127.0.0.1:3306/zebra?characterEncoding=UTF8&socketTimeout=60000&useSSL=false");ds.setPoolType("hikaricp");ds.setUser("root");ds.setPassword("123456");ds.setDriverClass("com.mysql.jdbc.Driver");ds.setMinPoolSize(1);ds.setMaxPoolSize(1);ds.setInitialPoolSize(1);ds.setCheckoutTimeout(1000);ds.setPreferredTestQuery("select 1");ds.init();return ds;}
}

2.2 结合MyBatis以及Spring

需要指定dataSource的参数以及初始化和销毁对象指定的方法,将初始化和销毁对象单独抽出形成方法是为了让filter感知到数据源的操作,并上报至各类监控系统中

<bean id="dataSource" class="com.dianping.zebra.single.jdbc.SingleDataSource" init-method="init"destroy-method="close"><property name="jdbcUrl" value="jdbc:mysql://127.0.0.1:3306/zebra?characterEncoding=UTF8&amp;socketTimeout=60000&amp;useSSL=false"/><property name="user" value="root"/><property name="password" value="123456"/><property name="driverClass" value="com.mysql.jdbc.Driver"/><!-- 选填,默认值为"c3p0",还支持"tomcat-jdbc"或者"druid" --><property name="poolType" value="hikaricp"/><!-- c3p0的minPoolSize,该值对应tomcat-jdbc或druid的"minIdle" --><property name="minPoolSize" value="2"/><!-- c3p0的maxPoolSize,该值对应tomcat-jdbc或druid的"maxActive" --><property name="maxPoolSize" value="10"/><!-- c3p0的initialPoolSize,该值对应tomcat-jdbc或druid的"initialSize" --><property name="initialPoolSize" value="2"/><!-- c3p0的checkoutTimeout,该值对应tomcat-jdbc或druid的"maxWait" --><property name="checkoutTimeout" value="2000"/><property name="maxIdleTime" value="1800"/><property name="idleConnectionTestPeriod" value="60"/><property name="acquireRetryAttempts" value="3"/><property name="acquireRetryDelay" value="300"/><property name="maxStatements" value="0"/><property name="maxStatementsPerConnection" value="100"/><property name="numHelperThreads" value="6"/><property name="maxAdministrativeTaskTime" value="5"/><property name="preferredTestQuery" value="SELECT 1"/></bean>

3.源码分析


3.1 SingleDataSource初始化流程

3.1.1 构造函数

首先,通过构造函数构创建SingleDataSource对象,需要注意的是这里创建了DataSourceConfig对象,并设置了一些默认的值,而父类C3p0DataSourceAdapter内部也有DataSourceConfig对象,名称为dsProperties,properties的设置都是对内部的dsProperties进行赋值。

    public SingleDataSource() {this.config = new DataSourceConfig();this.config.setCanRead(true);this.config.setCanWrite(true);this.config.setActive(true);this.punisher = new CountPunisher(this, config.getTimeWindow(), config.getPunishLimit());this.forceClose = true;}

3.1.2 初始化大致流程

之后调用init()对数据源进行初始化,大致的逻辑如下:

  1. 合并数据源的配置信息
  2. 根据配置创建对应的数据库连接池,但是并没有初始化
  3. 根据配置创建fIlter
  4. 对数据源进行初始化
public synchronized void init() {if (!init) {mergeDataSourceConfig();this.withDefalutValue = false;if (this.getClass().isAssignableFrom(SingleDataSource.class)) {if (!this.poolType.equals(Constants.CONNECTION_POOL_TYPE_C3P0)) {this.withDefalutValue = true;}}this.dataSourcePool = DataSourcePoolFactory.buildDataSourcePool(this.config);this.filters = FilterManagerFactory.getFilterManager().loadFilters("cat", configManagerType, serviceConfigs);initDataSourceWithFilters(this.config);init = true;}}

3.1.3 合并数据源的配置信息

将所有的信息全部合并至this.config对象中

private void mergeDataSourceConfig() {if (config.getDriverClass() == null || config.getDriverClass().length() <= 0) {// in case that DBA has not give default value to driverClass.config.setDriverClass(dsProperties.getDriverClass());}this.config.setType(this.poolType);this.config.setProperties(dsProperties.getProperties());}

3.1.3 数据库连接池初始化

通过DataSourcePoolFactory工厂模式中的静态方法来构建数据库连接池,连接池的类型、驱动的类名和url信息都从config中获取:

  1. 加载驱动
  2. 根据配置信息创建数据源返回实例。
public class DataSourcePoolFactory {public static DataSourcePool buildDataSourcePool(DataSourceConfig value) {JdbcDriverClassHelper.loadDriverClass(value.getDriverClass(), value.getJdbcUrl());if (value.getType().equalsIgnoreCase(Constants.CONNECTION_POOL_TYPE_C3P0)) {return new C3p0DataSourcePool();}...}
}

3.1.4 filter初始化

首先,需要FilterManagerFactory的工厂类来创建过滤管理器,这里使用的是单例模式,保证全局只有一个对象,使用lazy的初始化。

public class FilterManagerFactory {private volatile static FilterManager filterManager;public static FilterManager getFilterManager() {if (filterManager == null) {synchronized (FilterManagerFactory.class) {if (filterManager == null) {filterManager = new DefaultFilterManager();filterManager.init();}}}return filterManager;}
}

创建好对象之后,进入初始化的函数,又开始了扫盲:

Properties类继承于Hashtable,通常用于配置,并且在Java在获取环境变量时使用到,可以理解为一个哈希表

因此,这一段逻辑是加载Fiilter的信息,调整名字后加入到并发的哈希表ConcurrentHashMap

//FILTER_KEY_NAME = "zebra.filter."
public void init() {try {Properties filterProperties = loadFilterConfig();for (Map.Entry<Object, Object> entry : filterProperties.entrySet()) {String key = (String) entry.getKey();if (key.startsWith(FILTER_KEY_NAME)) {String name = key.substring(FILTER_KEY_NAME.length());aliasMap.put(name, (String) entry.getValue());}}} catch (Exception e) {}
}

接下来,进入到loadFilterConfig函数中,查看代码是如何加载Filter类型的

需要注意的是,使用ClassLoader.getSystemClassLoader()在调用静态资源时,多线程环境下,可能出现不一致的情况

private Properties loadFilterConfig() throws IOException {Properties filterProperties = new Properties();loadFilterConfig(filterProperties, ClassLoader.getSystemClassLoader());loadFilterConfig(filterProperties, Thread.currentThread().getContextClassLoader());return filterProperties;}

loadFilterConfig的执行逻辑为从指定的路径中读取资源,文件的内容如注释所示,使用Properties从输入流中读取,并转为为对该类的对象,最后加入到传入的filterProperties中,完成用户指定的过滤器的加载,保存在内存的中的map里。

//FILTER_PROPERTY_NAME = "META-INF/zebra-filter.properties";
//文件内容:zebra.filter.wall=com.dianping.zebra.filter.wall.WallFilter
private void loadFilterConfig(Properties filterProperties, ClassLoader classLoader) throws IOException {if (classLoader == null) {return;}for (Enumeration<URL> e = classLoader.getResources(FILTER_PROPERTY_NAME); e.hasMoreElements();) {URL url = e.nextElement();Properties property = new Properties();InputStream is = null;try {is = url.openStream();property.load(is);} finally {if (is != null) {is.close();}}filterProperties.putAll(property);}
}

loadFilters中,通过对字符串的切割,通过loadFilterFromCache作为缓存将过滤器加载,如果缓存命中直接返回,否则将进行初始化。

本例子中,传入的的strConfig为cat,通过类加载器获得对应的类型后,实例化后放入List中,并且将此过滤器加载进Map中,下次获取不需要再创建对象,可以直接从内存中获取。

public List<JdbcFilter> loadFilters(String strConfig, String configType, Map<String, Object> serviceConfigs) {List<JdbcFilter> result = new LinkedList<JdbcFilter>();if (strConfig != null) {for (String name : strConfig.trim().split(",")) {List<JdbcFilter> filters = loadFilterFromCache(name.trim(), configType, serviceConfigs);if (filters != null && filters.size() > 0) {result.addAll(filters);}}}Collections.sort(result, new Comparator<JdbcFilter>() {@Overridepublic int compare(JdbcFilter o1, JdbcFilter o2) {int x = o1.getOrder();int y = o2.getOrder();return (x > y) ? -1 : ((x == y) ? 0 : 1);}});return result;}private List<JdbcFilter> loadFilterFromCache(String filterName, String configType, Map<String, Object> configs) {List<JdbcFilter> result = cachedFilterMap.get(filterName);if (result != null) {return result;}String filterClassNames = aliasMap.get(filterName);if (StringUtils.isEmpty(filterClassNames)) {return null;}result = new ArrayList<JdbcFilter>();for (String filterClassName : filterClassNames.split(",")) {Class<?> filterClass = loadClass(filterClassName);if (filterClass == null) {continue;}DefaultJdbcFilter filter;try {filter = (DefaultJdbcFilter) filterClass.newInstance();filter.setConfigManager(configType, configs);filter.init();result.add(filter);} catch (Exception ignore) {}}cachedFilterMap.put(filterName, result);return result;}

下一步就到了初始化数据源,然而这一步需要通过一层层的Filter拦截和处理,这里使用的是责任链的设计模式,具体的代码执行如下:

private DataSource initDataSourceWithFilters(final DataSourceConfig value) {if (filters != null && filters.size() > 0) {JdbcFilter chain = new DefaultJdbcFilterChain(filters) {@Overridepublic DataSource initSingleDataSource(SingleDataSource source, JdbcFilter chain) {if (index < filters.size()) {return filters.get(index++).initSingleDataSource(source, chain);} else {return source.initDataSourceOrigin(value);}}};return chain.initSingleDataSource(this, chain);} else {return initDataSourceOrigin(value);}}

DefaultJdbcFilterChain中有index指针指示当前的过滤器的位置,在CatFilter中,继续调用chan中的initSingleDataSource方法,深度优先得初始化。在CATFilter中主要是初始化监控,并向上上报链接信息

public DataSource initSingleDataSource(SingleDataSource source, JdbcFilter chain) {DataSource result = chain.initSingleDataSource(source, chain);SingleDataSourceMonitor monitor = new SingleDataSourceMonitor(source);this.monitors.put(source, monitor);try {StatusExtensionRegister.getInstance().register(monitor);} catch (Throwable ignore) {}Cat.logEvent("SingleDataSource.Created", source.getConfig().getJdbcUrl());Cat.logEvent("SingleDataSource.Type", source.getConfig().getType());return result;}

index指针到达最后时,会调用source.initDataSourceOrigin(value),这里才会真实的对数据源进行初始化。

需要注意的是DataSourcePool是一个接口,各个类之间的继承关系和SingleDataSource类似,具体如下图所示:

到这,SingleDataSource的初始化流程就完成了。

3.2 SingleConnection获取及销毁

JDBC操作在初始化完成后需要从DataSource中获取数据库的连接,获取连接和初始化类似,也需要通过过滤器进行一层层拦截和操作,例如,示例中也会在CAT监控上上报信息,最终获取到连接。

@Overridepublic Connection getConnection() throws SQLException {return getConnection(null, null);}@Overridepublic Connection getConnection(final String username, final String password) throws SQLException {if (filters != null && filters.size() > 0) {JdbcFilter chain = new DefaultJdbcFilterChain(filters) {@Overridepublic SingleConnection getSingleConnection(SingleDataSource source, JdbcFilter chain) throws SQLException {if (index < filters.size()) {return filters.get(index++).getSingleConnection(source, chain);} else {return source.getConnectionOrigin(username, password);}}};return chain.getSingleConnection(this, chain);} else {return getConnectionOrigin(username, password);}}

SingleDataSource实际上是对连接池的一个包装,因此,具体的操作会通过dataSourcePool获得实际的链接,并通过SingleConnection对它进行一层包装,并对数据源的状态进行标记。

private SingleConnection getConnectionOrigin(String username, String password) throws SQLException {checkState();checkNull();Connection conn = null;try {conn = this.dataSourcePool.getInnerDataSourcePool().getConnection();} catch (SQLException e) {punisher.countAndPunish(e);throw e;}if (state == DataSourceState.INITIAL) {state = DataSourceState.UP;}return new SingleConnection(this, this.config, conn, this.filters);}

通过以下的代码可以看到SingleDataSource是对Connection的包装,增加了filterdsId等参数,和普通的连接没有什么区别

public SingleConnection(SingleDataSource dataSource, final DataSourceConfig config, Connection conn,List<JdbcFilter> filters) {this.dsId = config.getId();this.dataSource = dataSource;this.conn = conn;this.filters = filters;}

prepareStatement需要对SQL进行处理,这里需要对sql语句进行过滤,例如对SQL设置黑白名单,详细的过滤参考WallFilter

3.3 SingleStatement执行

在这一阶段执行指定的SQL语句,和SingleDataSource类似,具体的操作是由内部的innerStatement执行的,但是在执行之后需要过滤器的拦截和处理操作。

定义了JdbcOperationCallback,在内部使用innerStatement执行实际的查询或更新的操作,并返回ResultSet,并且使用SingleResultSet进行了包装。

public interface JdbcOperationCallback<T> {T doAction(Connection conn) throws SQLException;
}public ResultSet executeQuery(final String sql) throws SQLException {checkClosed();final String processedSql = processSQL(sql, false);return executeWithFilter(new JdbcOperationCallback<ResultSet>() {@Overridepublic ResultSet doAction(Connection conn) throws SQLException {ResultSet rs = innerStatement.executeQuery(processedSql);return new SingleResultSet(rs, filters);}}, processedSql, null, false);}

和前面类似,也是使用责任链的设计模式,使用深度优先的方式访问过滤器,当list中没有过滤器后,执行executeWithFilterOrigin进行实际的执行操作。

protected <T> T executeWithFilter(final JdbcOperationCallback<T> callback, final String sql, Object params, boolean isBatch) throws SQLException {if (filters != null && filters.size() > 0) {JdbcFilter chain = new DefaultJdbcFilterChain(filters) {@Overridepublic <T> T executeSingleStatement(SingleStatement source, SingleConnection conn, String sql, List<String> batchedSql, boolean isBatched, boolean autoCommit, Object params, JdbcFilter chain)throws SQLException {if (index < filters.size()) {return filters.get(index++).executeSingleStatement(source, conn, sql, (List<String>) batchedSql, isBatched, autoCommit, params, chain);} else {return (T) executeWithFilterOrigin(callback, conn);}}};return chain.executeSingleStatement(this, this.singleConnection, sql, null, isBatch,this.singleConnection.getAutoCommit(), params, chain);} else {return executeWithFilterOrigin(callback, this.singleConnection);}}

而其实际的操作也就是调用callback中的唯一的方法doAction完成语句的执行操作

private <T> T executeWithFilterOrigin(JdbcOperationCallback<T> callback, Connection conn) throws SQLException {return callback.doAction(conn);}

Zebra源码分析-SingleDataSource相关推荐

  1. zebra源码分析-导读

    zebra源码分析-导读 客户端架构 JDBC 核心部分介绍 代码流程 数据源 Statement 参考 zebra源码分析-导读 zebra是一个基于JDBC API协议上开发出的高可用.高性能的数 ...

  2. Zebra源码分析-GroupDataSource

    Zebra源码分析-GroupDataSource 1 简介 GroupDataSource是读写分离数据源,由多个SingleDataSource组成,其中有一个主库,用于写操作和一部分指定得读操作 ...

  3. gallery3d的源码分析——入口

    gallery3d的源码分析很多,有些也很透彻.我的源码分析的参考资料也是来源于网络. gallery3d的入口代码在gallery.java文件.首先来分析入口做了哪些事情. uper.onCrea ...

  4. 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析

    目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...

  5. SpringBoot-web开发(四): SpringMVC的拓展、接管(源码分析)

    [SpringBoot-web系列]前文: SpringBoot-web开发(一): 静态资源的导入(源码分析) SpringBoot-web开发(二): 页面和图标定制(源码分析) SpringBo ...

  6. SpringBoot-web开发(二): 页面和图标定制(源码分析)

    [SpringBoot-web系列]前文: SpringBoot-web开发(一): 静态资源的导入(源码分析) 目录 一.首页 1. 源码分析 2. 访问首页测试 二.动态页面 1. 动态资源目录t ...

  7. SpringBoot-web开发(一): 静态资源的导入(源码分析)

    目录 方式一:通过WebJars 1. 什么是webjars? 2. webjars的使用 3. webjars结构 4. 解析源码 5. 测试访问 方式二:放入静态资源目录 1. 源码分析 2. 测 ...

  8. Yolov3Yolov4网络结构与源码分析

    Yolov3&Yolov4网络结构与源码分析 从2018年Yolov3年提出的两年后,在原作者声名放弃更新Yolo算法后,俄罗斯的Alexey大神扛起了Yolov4的大旗. 文章目录 论文汇总 ...

  9. ViewGroup的Touch事件分发(源码分析)

    Android中Touch事件的分发又分为View和ViewGroup的事件分发,View的touch事件分发相对比较简单,可参考 View的Touch事件分发(一.初步了解) View的Touch事 ...

最新文章

  1. uvc音频传输协议_蓝牙中的三种音频编码:Apt-X、SBC、AAC,请问分别有什么区别?...
  2. [数据结构]顺序单链表插入
  3. idea 无法打开项目_Premiere出现quot;项目看来已经损坏,无法打开”的解决方法...
  4. 利用cookie模拟登陆知乎
  5. 解决Too many open files问题
  6. 游戏教案 电子计算机,计算机模板电子教案.doc
  7. Openssl证书管理
  8. 验毛坯房要注意什么?
  9. java 文本框输入监听事件_JAVA GUI 事件监听事件 详解 和 案例.
  10. 2019-08-12 计划与安排
  11. 路径规划;a*算法 demo_路径规划A*算法
  12. 用python写MapReduce函数——以WordCount为例
  13. 我的Java传承名单(不知为何以前的又没有了,幸亏有备份才可以又贴出来)
  14. 关于谷哥传奇工程师Jeff Dean的笑话
  15. HIVE厂牌艺人_北京音乐节-北京音乐节全攻略 - 马蜂窝
  16. 微信公众号页面开发记录
  17. iceberg-flink 九:累积窗口按照天统计,数据不是从0:00-11:59 统计。
  18. ESlint 自动格式化代码 补缺代码 添加空格
  19. 七夕 | 情人节 | 用Python给你送个钻石戒指
  20. 汽车计算机控制系统及其组成,汽车计算机控制系统的研究.doc

热门文章

  1. wireshark 手机抓包_美团外卖抓包分析
  2. 挂在网盘到本地磁盘(以中移动云盘为例)
  3. Linux Bird
  4. Windows系统下安装CentOS
  5. 为IMX6 交叉编译Mplayer
  6. 火车头内容采集规则之【C#代码】数字序号递增
  7. 安卓pdf阅读器_提笔就写安卓阅读器,文石BOOX NOTE PRO轻体验
  8. 2021批量查备案域名工具
  9. sql查询本年度员工的平均工资
  10. 刚刚出新的Kubernetes 却曝出了“高危”安全漏洞;亚马逊将推免费新闻视频服务,对标苹果 | 极客头条...