10分钟教你写一个数据库
今天教大家借助一款框架快速实现一个数据库,这个框架就是Calcite
,下面会带大家通过两个例子快速教会大家怎么实现,一个是可以通过 SQL 语句的方式可以直接查询文件内容,第二个是模拟 Mysql 查询功能,以及最后告诉大家怎么实现 SQL 查询 Kafka 数据。
Calcite
Calcite
是一个用于优化异构数据源的查询处理的可插拔基础框架(他是一个框架),可以将任意数据(Any data, Anywhere)DML 转换成基于 SQL 的 DML 引擎,并且我们可以选择性的使用它的部分功能。
Calcite能干什么
使用 SQL 访问内存中某个数据
使用 SQL 访问某个文件的数据
跨数据源的数据访问、聚合、排序等(例如 Mysql 和 Redis 数据源中的数据进行join)
当我们需要自建一个数据库的时候,数据可以为任何格式的,比如text、word、xml、mysql、es、csv、第三方接口数据等等,我们只有数据,我们想让这些数据支持 SQL 形式动态增删改查。
另外,像Hive、Drill、Flink、Phoenix 和 Storm 等项目中,数据处理系统都是使用 Calcite 来做 SQL 解析和查询优化,当然,还有部分用来构建自己的 JDBC driver。
名词解释
Token
就是将标准 SQL(可以理解为Mysql)关键词以及关键词之间的字符串截取出来,每一个token
,会被封装为一个SqlNode
,SqlNode
会衍生很多子类,比如Select
会被封装为SqlSelect
,当前 SqlNode
也能反解析为 SQL 文本。
RelDataTypeField
某个字段的名称和类型信息
RelDataType
多个 RelDataTypeField 组成了 RelDataType,可以理解为数据行
Table
一个完整的表的信息
Schema
所有元数据的组合,可以理解为一组 Table 或者库的概念
开始使用
1. 引入包
<dependency><groupId>org.apache.calcite</groupId><artifactId>calcite-core</artifactId><!-- 目前最新版本 2022-09-10日更新--><version>1.32.0</version>
</dependency>
2. 创建model.json文件和表结构csv
model.json 里面主要描述或者说告诉 Calcite
如何创建 Schema
,也就是告诉框架怎么创建出库。
{
"version": "1.0",//忽略
"defaultSchema": "CSV",//设置默认的schema
"schemas": [//可定义多个schema{"name": "CSV",//相当于namespace和上面的defaultSchema的值对应"type": "custom",//写死"factory": "csv.CsvSchemaFactory",//factory的类名必须是你自己实现的factory的包的全路径"operand": { //这里可以传递自定义参数,最终会以map的形式传递给factory的operand参数"directory": "csv"//directory代表calcite会在resources下面的csv目录下面读取所有的csv文件,factory创建的Schema会吧这些文件全部构建成Table,可以理解为读取数据文件的根目录,当然key的名称也不一定非得用directory,你可以随意指定}}]
}
接下来还需要定义一个 csv
文件,用来定义表结构。
NAME:string,MONEY:string
aixiaoxian,10000万
xiaobai,10000万
adong,10000万
maomao,10000万
xixi,10000万
zizi,10000万
wuwu,10000万
kuku,10000万
整个项目的结构大概就是这样:
3. 实现Schema的工厂类
在上述文件中指定的包路径下去编写 CsvSchemaFactory
类,实现 SchemaFactory
接口,并且实现里面唯一的方法 create
方法,创建Schema
(库)。
public class CsvSchemaFactory implements SchemaFactory {/*** parentSchema 父节点,一般为root* name 为model.json中定义的名字* operand 为model.json中定于的数据,这里可以传递自定义参数** @param parentSchema Parent schema* @param name Name of this schema* @param operand The "operand" JSON property* @return*/@Overridepublic Schema create(SchemaPlus parentSchema, String name,Map<String, Object> operand) {final String directory = (String) operand.get("directory");File directoryFile = new File(directory);return new CsvSchema(directoryFile, "scannable");}
}
4. 自定义Schma类
有了 SchemaFactory
,接下来需要自定义 Schema
类。
自定义的 Schema
需要实现 Schema
接口,但是直接实现要实现的方法太多,我们去实现官方的 AbstractSchema
类,这样就只需要实现一个方法就行(如果有其他定制化需求可以实现原生接口)。
核心的逻辑就是createTableMap
方法,用于创建出 Table
表。
他会扫描指定的Resource
下面的所有 csv
文件,将每个文件映射成Table
对象,最终以map
形式返回,Schema
接口的其他几个方法会用到这个对象。
//实现这一个方法就行了@Overrideprotected Map<String, Table> getTableMap() {if (tableMap == null) {tableMap = createTableMap();}return tableMap;}private Map<String, Table> createTableMap() {// Look for files in the directory ending in ".csv"final Source baseSource = Sources.of(directoryFile);//会自动过滤掉非指定文件后缀的文件,我这里写的csvFile[] files = directoryFile.listFiles((dir, name) -> {final String nameSansGz = trim(name, ".gz");return nameSansGz.endsWith(".csv");});if (files == null) {System.out.println("directory " + directoryFile + " not found");files = new File[0];}// Build a map from table name to table; each file becomes a table.final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();for (File file : files) {Source source = Sources.of(file);final Source sourceSansCsv = source.trimOrNull(".csv");if (sourceSansCsv != null) {final Table table = createTable(source);builder.put(sourceSansCsv.relative(baseSource).path(), table);}}return builder.build();}
5. 自定义 Table
Schema
有了,并且数据文件 csv
也映射成 Table
了,一个 csv
文件对应一个 Table
。
接下来我们去自定义 Table
,自定义 Table
的核心是我们要定义字段的类型和名称,以及如何读取 csv
文件。
先获取数据类型和名称,即单表结构,从
csv
文件头中获取(当前文件头需要我们自己定义,包括规则我们也可以定制化)。
/*** Base class for table that reads CSV files.*/
public abstract class CsvTable extends AbstractTable {protected final Source source;protected final @Nullable RelProtoDataType protoRowType;private @Nullable RelDataType rowType;private @Nullable List<RelDataType> fieldTypes;/*** Creates a CsvTable.*/CsvTable(Source source, @Nullable RelProtoDataType protoRowType) {this.source = source;this.protoRowType = protoRowType;}/*** 创建一个CsvTable,继承AbstractTable,需要实现里面的getRowType方法,此方法就是获取当前的表结构。Table的类型有很多种,比如还有视图类型,AbstractTable类中帮我们默认实现了Table接口的一些方法,比如getJdbcTableType 方法,默认为Table类型,如果有其他定制化需求可直接实现Table接口。和AbstractSchema很像*/@Overridepublic RelDataType getRowType(RelDataTypeFactory typeFactory) {if (protoRowType != null) {return protoRowType.apply(typeFactory);}if (rowType == null) {rowType = CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,null);}return rowType;}/*** Returns the field types of this CSV table.*/public List<RelDataType> getFieldTypes(RelDataTypeFactory typeFactory) {if (fieldTypes == null) {fieldTypes = new ArrayList<>();CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,fieldTypes);}return fieldTypes;}public static RelDataType deduceRowType(JavaTypeFactory typeFactory,Source source, @Nullable List<RelDataType> fieldTypes) {final List<RelDataType> types = new ArrayList<>();final List<String> names = new ArrayList<>();try (CSVReader reader = openCsv(source)) {String[] strings = reader.readNext();if (strings == null) {strings = new String[]{"EmptyFileHasNoColumns:boolean"};}for (String string : strings) {final String name;final RelDataType fieldType;//就是简单的读取字符串冒号前面是名称,冒号后面是类型final int colon = string.indexOf(':');if (colon >= 0) {name = string.substring(0, colon);String typeString = string.substring(colon + 1);Matcher decimalMatcher = DECIMAL_TYPE_PATTERN.matcher(typeString);if (decimalMatcher.matches()) {int precision = Integer.parseInt(decimalMatcher.group(1));int scale = Integer.parseInt(decimalMatcher.group(2));fieldType = parseDecimalSqlType(typeFactory, precision, scale);} else {switch (typeString) {case "string":fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);break;case "boolean":fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BOOLEAN);break;case "byte":fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TINYINT);break;case "char":fieldType = toNullableRelDataType(typeFactory, SqlTypeName.CHAR);break;case "short":fieldType = toNullableRelDataType(typeFactory, SqlTypeName.SMALLINT);break;case "int":fieldType = toNullableRelDataType(typeFactory, SqlTypeName.INTEGER);break;case "long":fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BIGINT);break;case "float":fieldType = toNullableRelDataType(typeFactory, SqlTypeName.REAL);break;case "double":fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DOUBLE);break;case "date":fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DATE);break;case "timestamp":fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIMESTAMP);break;case "time":fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIME);break;default:LOGGER.warn("Found unknown type: {} in file: {} for column: {}. Will assume the type of "+ "column is string.",typeString, source.path(), name);fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);break;}}} else {// 如果没定义,默认都是String类型,字段名称也是stringname = string;fieldType = typeFactory.createSqlType(SqlTypeName.VARCHAR);}names.add(name);types.add(fieldType);if (fieldTypes != null) {fieldTypes.add(fieldType);}}} catch (IOException e) {// ignore}if (names.isEmpty()) {names.add("line");types.add(typeFactory.createSqlType(SqlTypeName.VARCHAR));}return typeFactory.createStructType(Pair.zip(names, types));}
}
获取文件中的数据,上面把
Table
的表结构字段名称和类型都获取到了以后,就剩最后一步了,获取文件中的数据。我们需要自定义一个类,实现ScannableTable
接口,并且实现里面唯一的方法scan
方法,其实本质上就是读文件,然后把文件的每一行的数据和上述获取的fileType
进行匹配。
@Overridepublic Enumerable<Object[]> scan(DataContext root) {JavaTypeFactory typeFactory = root.getTypeFactory();final List<RelDataType> fieldTypes = getFieldTypes(typeFactory);final List<Integer> fields = ImmutableIntList.identity(fieldTypes.size());final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);return new AbstractEnumerable<@Nullable Object[]>() {@Overridepublic Enumerator<@Nullable Object[]> enumerator() {//返回我们自定义的读取数据的类return new CsvEnumerator<>(source, cancelFlag, false, null,CsvEnumerator.arrayConverter(fieldTypes, fields, false));}};}public CsvEnumerator(Source source, AtomicBoolean cancelFlag, boolean stream,@Nullable String @Nullable [] filterValues, RowConverter<E> rowConverter) {this.cancelFlag = cancelFlag;this.rowConverter = rowConverter;this.filterValues = filterValues == null ? null: ImmutableNullableList.copyOf(filterValues);try {this.reader = openCsv(source);//跳过第一行,因为第一行是定义类型和名称的this.reader.readNext(); // skip header row} catch (IOException e) {throw new RuntimeException(e);}}
//CsvEnumerator必须实现calcit自己的迭代器,里面有current、moveNext方法,current是返回当前游标所在的数据记录,moveNext是将游标指向下一个记录,官网中自己定义了一个类型转换器,是将csv文件中的数据转换成文件头指定的类型,这个需要我们自己来实现@Overridepublic E current() {return castNonNull(current);}@Overridepublic boolean moveNext() {try {outer:for (; ; ) {if (cancelFlag.get()) {return false;}final String[] strings = reader.readNext();if (strings == null) {current = null;reader.close();return false;}if (filterValues != null) {for (int i = 0; i < strings.length; i++) {String filterValue = filterValues.get(i);if (filterValue != null) {if (!filterValue.equals(strings[i])) {continue outer;}}}}current = rowConverter.convertRow(strings);return true;}} catch (IOException e) {throw new RuntimeException(e);}}protected @Nullable Object convert(@Nullable RelDataType fieldType, @Nullable String string) {if (fieldType == null || string == null) {return string;}switch (fieldType.getSqlTypeName()) {case BOOLEAN:if (string.length() == 0) {return null;}return Boolean.parseBoolean(string);case TINYINT:if (string.length() == 0) {return null;}return Byte.parseByte(string);case SMALLINT:if (string.length() == 0) {return null;}return Short.parseShort(string);case INTEGER:if (string.length() == 0) {return null;}return Integer.parseInt(string);case BIGINT:if (string.length() == 0) {return null;}return Long.parseLong(string);case FLOAT:if (string.length() == 0) {return null;}return Float.parseFloat(string);case DOUBLE:if (string.length() == 0) {return null;}return Double.parseDouble(string);case DECIMAL:if (string.length() == 0) {return null;}return parseDecimal(fieldType.getPrecision(), fieldType.getScale(), string);case DATE:if (string.length() == 0) {return null;}try {Date date = TIME_FORMAT_DATE.parse(string);return (int) (date.getTime() / DateTimeUtils.MILLIS_PER_DAY);} catch (ParseException e) {return null;}case TIME:if (string.length() == 0) {return null;}try {Date date = TIME_FORMAT_TIME.parse(string);return (int) date.getTime();} catch (ParseException e) {return null;}case TIMESTAMP:if (string.length() == 0) {return null;}try {Date date = TIME_FORMAT_TIMESTAMP.parse(string);return date.getTime();} catch (ParseException e) {return null;}case VARCHAR:default:return string;}}
6. 最后
至此我们需要准备的东西:库、表名称、字段名称、字段类型都有了,接下来我们去写我们的 SQL 语句查询我们的数据文件。
创建好几个测试的数据文件,例如上面项目结构中我创建 2 个 csv 文件USERINFO.csv
、ASSET.csv
,然后创建测试类。
这样跑起来,就可以通过 SQL 语句的方式直接查询数据了。
public class Test {public static void main(String[] args) throws SQLException {Connection connection = null;Statement statement = null;try {Properties info = new Properties();info.put("model", Sources.of(Test.class.getResource("/model.json")).file().getAbsolutePath());connection = DriverManager.getConnection("jdbc:calcite:", info);statement = connection.createStatement();print(statement.executeQuery("select * from asset "));print(statement.executeQuery(" select * from userinfo "));print(statement.executeQuery(" select age from userinfo where name ='aixiaoxian' "));print(statement.executeQuery(" select * from userinfo where age >60 "));print(statement.executeQuery(" select * from userinfo where name like 'a%' "));} finally {connection.close();}}private static void print(ResultSet resultSet) throws SQLException {final ResultSetMetaData metaData = resultSet.getMetaData();final int columnCount = metaData.getColumnCount();while (resultSet.next()) {for (int i = 1; ; i++) {System.out.print(resultSet.getString(i));if (i < columnCount) {System.out.print(", ");} else {System.out.println();break;}}}}
}
查询结果:
这里在测试的时候踩到2个坑,大家如果自己实验的时候可以避免下。
Calcite
默认会把你的 SQL 语句中的表名和类名全部转换为大写,因为默认的 csv(其他文件也一样)文件的名称就是表名,除非你自定义规则,所以你的文件名要写成大写。Calcite
有一些默认的关键字不能用作表名,不然会查询失败,比如我刚开始定的user.csv
就一直查不出来,改成USERINFO
就可以了,这点和Mysql
的内置关键字差不多,也可以通过个性化配置去改。
演示Mysql
首先,还是先准备
Calcite
需要的东西:库、表名称、字段名称、字段类型。
如果数据源使用Mysql
的话,这些都不用我们去 JAVA 服务中去定义,直接在 Mysql 客户端创建好,这里直接创建两张表用于测试,就和我们的csv
文件一样。
CREATE TABLE `USERINFO1` (`NAME` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL,`AGE` int DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;CREATE TABLE `ASSET` (`NAME` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL,`MONEY` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
上述
csv
案例中的SchemaFactory
以及Schema
这些都不需要创建,因为Calcite
默认提供了 Mysql 的 Adapter适配器。其实,上述两步都不需要做,我们真正要做的是,告诉
Calcite
你的 JDBC 的连接信息就行了,也是在model.json
文件中定义。
{"version": "1.0","defaultSchema": "Demo","schemas": [{"name": "Demo","type": "custom",// 这里是calcite默认的SchemaFactory,里面的流程和我们上述自己定义的相同,下面会简单看看源码。"factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory","operand": {// 我用的是mysql8以上版本,所以这里注意包的名称"jdbcDriver": "com.mysql.cj.jdbc.Driver","jdbcUrl": "jdbc:mysql://localhost:3306/irving","jdbcUser": "root","jdbcPassword": "123456"}}]
}
在项目中引入 Mysql 的驱动包
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.30</version>
</dependency>
写好测试类,这样直接就相当于完成了所有的功能了。
public class TestMysql {public static void main(String[] args) throws SQLException {Connection connection = null;Statement statement = null;try {Properties info = new Properties();info.put("model", Sources.of(TestMysql.class.getResource("/mysqlmodel.json")).file().getAbsolutePath());connection = DriverManager.getConnection("jdbc:calcite:", info);statement = connection.createStatement();statement.executeUpdate(" insert into userinfo1 values ('xxx',12) ");print(statement.executeQuery("select * from asset "));print(statement.executeQuery(" select * from userinfo1 "));print(statement.executeQuery(" select age from userinfo1 where name ='aixiaoxian' "));print(statement.executeQuery(" select * from userinfo1 where age >60 "));print(statement.executeQuery(" select * from userinfo1 where name like 'a%' "));} finally {connection.close();}}private static void print(ResultSet resultSet) throws SQLException {final ResultSetMetaData metaData = resultSet.getMetaData();final int columnCount = metaData.getColumnCount();while (resultSet.next()) {for (int i = 1; ; i++) {System.out.print(resultSet.getString(i));if (i < columnCount) {System.out.print(", ");} else {System.out.println();break;}}}}
}
查询结果:
Mysql实现原理
上述我们在 model.json
文件中指定了org.apache.calcite.adapter.jdbc.JdbcSchema$Factory
类,可以看下这个类的代码。
这个类是把 Factory
和 Schema
写在了一起,其实就是调用schemafactory
类的create
方法创建一个 schema
出来,和我们上面自定义的流程是一样的。
其中JdbcSchema
类也是 Schema
的子类,所以也会实现getTable
方法(这个我们上述也实现了,我们当时是获取表结构和表的字段类型以及名称,是从csv文件头中读文件的),JdbcSchema
的实现是通过连接 Mysql 服务端查询元数据信息,再将这些信息封装成 Calcite
需要的对象格式。
这里同样要注意 csv
方式的2个注意点,大小写和关键字问题。
public static JdbcSchema create(SchemaPlus parentSchema,String name,Map<String, Object> operand) {DataSource dataSource;try {final String dataSourceName = (String) operand.get("dataSource");if (dataSourceName != null) {dataSource =AvaticaUtils.instantiatePlugin(DataSource.class, dataSourceName);} else {//会走在这里来,这里就是我们在model.json中指定的jdbc的连接信息,最终会创建一个datasourcefinal String jdbcUrl = (String) requireNonNull(operand.get("jdbcUrl"), "jdbcUrl");final String jdbcDriver = (String) operand.get("jdbcDriver");final String jdbcUser = (String) operand.get("jdbcUser");final String jdbcPassword = (String) operand.get("jdbcPassword");dataSource = dataSource(jdbcUrl, jdbcDriver, jdbcUser, jdbcPassword);}} catch (Exception e) {throw new RuntimeException("Error while reading dataSource", e);}String jdbcCatalog = (String) operand.get("jdbcCatalog");String jdbcSchema = (String) operand.get("jdbcSchema");String sqlDialectFactory = (String) operand.get("sqlDialectFactory");if (sqlDialectFactory == null || sqlDialectFactory.isEmpty()) {return JdbcSchema.create(parentSchema, name, dataSource, jdbcCatalog, jdbcSchema);} else {SqlDialectFactory factory = AvaticaUtils.instantiatePlugin(SqlDialectFactory.class, sqlDialectFactory);return JdbcSchema.create(parentSchema, name, dataSource, factory, jdbcCatalog, jdbcSchema);}}@Override public @Nullable Table getTable(String name) {return getTableMap(false).get(name);}private synchronized ImmutableMap<String, JdbcTable> getTableMap(boolean force) {if (force || tableMap == null) {tableMap = computeTables();}return tableMap;}private ImmutableMap<String, JdbcTable> computeTables() {Connection connection = null;ResultSet resultSet = null;try {connection = dataSource.getConnection();final Pair<@Nullable String, @Nullable String> catalogSchema = getCatalogSchema(connection);final String catalog = catalogSchema.left;final String schema = catalogSchema.right;final Iterable<MetaImpl.MetaTable> tableDefs;Foo threadMetadata = THREAD_METADATA.get();if (threadMetadata != null) {tableDefs = threadMetadata.apply(catalog, schema);} else {final List<MetaImpl.MetaTable> tableDefList = new ArrayList<>();// 获取元数据final DatabaseMetaData metaData = connection.getMetaData();resultSet = metaData.getTables(catalog, schema, null, null);while (resultSet.next()) {//获取库名,表明等信息final String catalogName = resultSet.getString(1);final String schemaName = resultSet.getString(2);final String tableName = resultSet.getString(3);final String tableTypeName = resultSet.getString(4);tableDefList.add(new MetaImpl.MetaTable(catalogName, schemaName, tableName,tableTypeName));}tableDefs = tableDefList;}final ImmutableMap.Builder<String, JdbcTable> builder =ImmutableMap.builder();for (MetaImpl.MetaTable tableDef : tableDefs) {final String tableTypeName2 =tableDef.tableType == null? null: tableDef.tableType.toUpperCase(Locale.ROOT).replace(' ', '_');final TableType tableType =Util.enumVal(TableType.OTHER, tableTypeName2);if (tableType == TableType.OTHER && tableTypeName2 != null) {System.out.println("Unknown table type: " + tableTypeName2);}// 最终封装成JdbcTable对象final JdbcTable table =new JdbcTable(this, tableDef.tableCat, tableDef.tableSchem,tableDef.tableName, tableType);builder.put(tableDef.tableName, table);}return builder.build();} catch (SQLException e) {throw new RuntimeException("Exception while reading tables", e);} finally {close(connection, null, resultSet);}}
SQL执行流程
OK,到这里基本上两个简单的案例已经演示好了,最后补充一下整个Calcite
架构和整个 SQL 的执行流程。
整个流程如下:SQL解析(Parser)=> SQL校验(Validator)=> SQL查询优化(optimizer)=> SQL生成 => SQL执行
SQL Parser
所有的 SQL 语句在执行前都需要经历 SQL 解析器解析,解析器的工作内容就是将 SQL 中的 Token 解析成抽象语法树,每个树的节点都是一个 SqlNode,这个过程其实就是 Sql Text => SqlNode 的过程。
我们前面的 Demo 没有自定义 Parser,是因为 Calcite 采用了自己默认的 Parser(SqlParserImpl)。
SqlNode
SqlNode
是整个解析中的核心,比如图中你可以发现,对于每个比如select
、from
、where
关键字之后的内容其实都是一个SqlNode
。
parserConfig
方法主要是设置 SqlParserFactory 的参数,比如我们上面所说的我本地测试的时候踩的大小写的坑,就可以在这里设置。
直接调用setCaseSensitive=false
即不会将 SQL 语句中的表名列名转为大写,下面是默认的,其他的参数可以按需配置。
SQL Validator
SQL 语句先经过 Parser,然后经过语法验证器,注意 Parser 并不会验证语法的正确性。
其实 Parser 只会验证 SQL 关键词的位置是否正确,我们上述2个 Parser 的例子中都没有创建
schema
和 table
这些,但是如果这样写,那就会报错,这个错误就是 parser
检测后抛出来的(ParseLocationErrorTest)。
真正的校验在 validator
中,会去验证查询的表名是否存在,查询的字段是否存在,类型是否匹配,这个过程比较复杂,默认的 validator
是SqlValidatorImpl
。
查询优化
比如关系代数,比如什么投影、笛卡尔积这些,Calcite
提供了很多内部的优化器,也可以实现自己的优化器。
适配器
Calcite
是不包含存储层的,所以提供一种适配器的机制来访问外部的数据存储或者存储引擎。
最后,进阶
官网里面写了未来会支持Kafka
适配器到公共Api
中,到时候使用起来就和上述集成Mysql
一样方便,但是现在还没有支持,我这里给大家提供个自己实现的方式,这样就可以通过 SQL 的方式直接查询 Kafka 中的 Topic 数据等信息。
这里我们内部集成实现了KSQL
的能力,查询结果是OK的。
还是像上述步骤一样,我们需要准备库、表名称、字段名称、字段类型、数据源(多出来的地方)。
自定义
Sql
解析,之前我们都没有自定义解析,这里需要自定义解析,是因为我需要动态解析sql
的where
条件里面的partation
。
配置解析器,就是之前案例中提到的配置大小写之类的
创建解析器,使用的默认
SqlParseImpl
开始解析,生成
AST
,我们可以基于生成的SqlNode
做一些业务相关的校验和参数解析
适配器获取数据源
public class KafkaConsumerAdapter {public static List<KafkaResult> executor(KafkaSqlInfo kafkaSql) {Properties props = new Properties();props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaSql.getSeeds());props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);List<TopicPartition> topics = new ArrayList<>();for (Integer partition : kafkaSql.getPartition()) {TopicPartition tp = new TopicPartition(kafkaSql.getTableName(), partition);topics.add(tp);}consumer.assign(topics);for (TopicPartition tp : topics) {Map<TopicPartition, Long> offsets = consumer.endOffsets(Collections.singleton(tp));long position = 500;if (offsets.get(tp).longValue() > position) {consumer.seek(tp, offsets.get(tp).longValue() - 500);} else {consumer.seek(tp, 0);}}List<KafkaResult> results = new ArrayList<>();boolean flag = true;while (flag) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {//转成我定义的对象集合KafkaResult result = new KafkaResult();result.setPartition(record.partition());result.setOffset(record.offset());result.setMsg(record.value());result.setKey(record.key());results.add(result);}if (!records.isEmpty()) {flag = false;}}consumer.close();return results;}}
执行查询,就可以得到我们想要的效果了。
public class TestKafka {public static void main(String[] args) throws Exception {KafkaService kafkaService = new KafkaService();//把解析到的参数放在我自己定义的kafkaSqlInfo对象中KafkaSqlInfo sqlInfo = kafkaService.parseSql("select * from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 ");//适配器获取数据源,主要是从上述的sqlInfo对象中去poll数据List<KafkaResult> results = KafkaConsumerAdapter.executor(sqlInfo);//执行查询query(sqlInfo.getTableName(), results, sqlInfo.getSql());sqlInfo = kafkaService.parseSql("select * from `cmdb-calltopo` where `partition` in (0,1,2) AND msg like '%account%' limit 1000 ");results = KafkaConsumerAdapter.executor(sqlInfo);query(sqlInfo.getTableName(), results, sqlInfo.getSql());sqlInfo = kafkaService.parseSql("select count(*) AS addad from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 ");results = KafkaConsumerAdapter.executor(sqlInfo);query(sqlInfo.getTableName(), results, sqlInfo.getSql());}private static void query(String tableName, List<KafkaResult> results,String sql) throws Exception {//创建model.json,设置我的SchemaFactory,设置库名String model = createTempJson();//设置我的表结构,表名称和表字段名以及类型KafkaTableSchema.generateSchema(tableName, results);Properties info = new Properties();info.setProperty("lex", Lex.JAVA.toString());Connection connection = DriverManager.getConnection(Driver.CONNECT_STRING_PREFIX + "model=inline:" + model, info);Statement st = connection.createStatement();//执行ResultSet result = st.executeQuery(sql);ResultSetMetaData rsmd = result.getMetaData();List<Map<String, Object>> ret = new ArrayList<>();while (result.next()) {Map<String, Object> map = new LinkedHashMap<>();for (int i = 1; i <= rsmd.getColumnCount(); i++) {map.put(rsmd.getColumnName(i), result.getString(rsmd.getColumnName(i)));}ret.add(map);}result.close();st.close();connection.close();}private static void print(ResultSet resultSet) throws SQLException {final ResultSetMetaData metaData = resultSet.getMetaData();final int columnCount = metaData.getColumnCount();while (resultSet.next()) {for (int i = 1; ; i++) {System.out.print(resultSet.getString(i));if (i < columnCount) {System.out.print(", ");} else {System.out.println();break;}}}}private static String createTempJson() throws IOException {JSONObject object = new JSONObject();object.put("version", "1.0");object.put("defaultSchema", "QAKAFKA");JSONArray array = new JSONArray();JSONObject tmp = new JSONObject();tmp.put("name", "QAKAFKA");tmp.put("type", "custom");tmp.put("factory", "kafka.KafkaSchemaFactory");array.add(tmp);object.put("schemas", array);return object.toJSONString();}}
生成临时的
model.json
,之前是基于文件,现在基于text
字符串,mode=inline
模式设置我的表结构、表名称、字段名、字段类型等,并放置在内存中,同时将适配器查询出来的数据也放进去
table
里面获取连接,执行查询,完美!
10分钟教你写一个数据库相关推荐
- 10分钟教你搭建一个好玩的Python全文搜索引擎
有一个群友在群里问个如何快速搭建一个搜索引擎,在搜索之后我看到了这个 代码所在 Git:https://github.com/asciimoo/searx 官方很贴心,很方便的是已经提供了docker ...
- 10分钟教你搭建一个可公网访问的私人网盘,和付费网盘彻底say goodbye~
今天偶然间看一个私人网盘的搭建,给大家一起分享一下.我这边是在windows环境下运行的. 检查配置 首先我们需要检查一下配置,需要本地安装有JDK,没有的话自行下载安装.如下所示则表示已经安装. 运 ...
- 10分钟教你用python打造贪吃蛇超详细教程
更多精彩尽在微信公众号[程序猿声] 10分钟教你用python打造贪吃蛇超详细教程 在家闲着没妹子约, 刚好最近又学了一下python,听说pygame挺好玩的.今天就在家研究一下, 弄了个贪吃蛇出来 ...
- 手把手教你写一个生成对抗网络
成对抗网络代码全解析, 详细代码解析(TensorFlow, numpy, matplotlib, scipy) 那么,什么是 GANs? 用 Ian Goodfellow 自己的话来说: " ...
- 手把手教你写一个spring IOC容器
本文分享自华为云社区<手把手教你写一个spring IOC容器>,原文作者:技术火炬手. spring框架的基础核心和起点毫无疑问就是IOC,IOC作为spring容器提供的核心技术,成功 ...
- 花三分钟给女票写一个爬虫,做一回模范男友!
烦恼波及 女票是做运营的,一直苦恼于起推文标题.领导还会频繁突袭:"XX,给这次活动拟一个标题." tmd,自己分内的活都干不完,却要为活动策划起标题流汗,不是自己的活,还得笑着干 ...
- 如何用计算机自动回复微信,10分钟教你用Python实现微信自动回复功能
01 前言&&效果展示 相信大家都有忙碌的时候,不可能一直守在微信上及时回复消息.但微信又不能像QQ一样设置自动回复.无妨,今天,我们就来用Python实现微信的自动回复功能吧,并且把 ...
- 10分钟学计算机,电脑运行越来越慢?程序员大牛10分钟教你学会电脑瘦身
原标题:电脑运行越来越慢?程序员大牛10分钟教你学会电脑瘦身 你的电脑是不是越来越慢?这里让程序员大佬用10分钟时间教你学会给电脑软件瘦身,1分钟了解计算机硬件升级.分分钟让你成为别人眼中的计算机大牛 ...
- 10分钟教你用python如何正确把妹
前言 今天没妹子约,刚好研究一下.如何用神奇的python打造一个把妹神器吧.看完这个,你们就能走向人生巅峰,迎娶白富美啦. 我知道你们想看看效果 当然啦,这只是测试版的效果,真正的版本可比这个厉害多 ...
最新文章
- 为什么要在JavaScript中使用静态类型? (使用Flow进行静态打字的4部分入门)
- linux 安装 nslookup
- 每日一皮:学PHP的不容易...
- 如何导出SAP的数据表字段和字段描述
- 历届试题 错误票据(multiset)
- 【风农翻译】开始画像素画 #8
- 剑指offer 最小的k个数
- intellij idea 好用的快捷键(mac版)
- 【Java线程】深入理解Volatile关键字和使用
- 【干货】神策标签生产引擎架构
- 将矩阵转为一行_理解矩阵乘法
- c语言股票最大收益_长期持有指数基金是最好的选择?指数基金的历史年化收益率是多少?...
- 若依ruoyi框架整合magic-api增删改查Demo
- 三十九级台阶java_蓝桥杯 -- 第三十九级台阶
- 关于Layout Constraint的动态update方式
- Cathy学习Java——GUI与正则表达式
- 计算机中函数vlookup怎么用,教您使用excel函数vlookup
- NFT会接力Defi,成为下一个热点么?
- 详解ASEMI整流桥MB6S在恒流LED驱动电源中的应用
- 使用vue-cli创建项目
热门文章
- matlab:转置运算符
- 机器学习基石2(noise和error)
- Python serial库实现一个串口插入监测工具
- 【老生谈算法】matlab实现巴特沃斯IIR滤波器程序设计源码
- 【调剂】国家蛋白质科学中心·北京贺福初院士课题组招收硕士研究生
- 北斗RTK高精度定位在AI领域的应用
- 继电器的工作原理及应用
- linux redhat 7配置yum源,redhat7 配置国内清华大学yum源
- 计算机组成原理(白中英版)绪论【思维导图】【简化知识点】
- [ERROR] InnoDB: Cannot open datafile for read-only: './dxh_sys/vendorUser.ibd' OS error: 71