Openlookeng Redis Connector 移植
Redis Plugin
初步配置
git clone 后在项目根目录下新建一个模块
presto-redis
模块的初步配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>presto-root</artifactId><groupId>io.hetu.core</groupId><version>1.7.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>hetu-redis</artifactId><packaging>hetu-plugin</packaging><!-- <properties>-->
<!-- <maven.compiler.source>8</maven.compiler.source>-->
<!-- <maven.compiler.target>8</maven.compiler.target>-->
<!-- </properties>--><properties>
<!-- <air.main.basedir>${project.parent.basedir}</air.main.basedir>--></properties><dependencies><dependency><groupId>io.hetu.core</groupId><artifactId>presto-spi</artifactId><version>1.7.0-SNAPSHOT</version><scope>provided</scope></dependency><dependency><groupId>io.airlift</groupId><artifactId>bootstrap</artifactId>
<!-- <version></version>--></dependency><dependency><groupId>io.airlift</groupId><artifactId>json</artifactId>
<!-- <version></version>--></dependency></dependencies></project>
注册Plugin
在模块
hetu-server
中的hetu-server/src/main/provisio/hetu.xml
文件中添加如下<artifactSet><artifact id="${project.groupId}:hetu-redis:zip:${project.version}"><unpack /></artifact> </artifactSet>
将模块
presto-example-http/src/main/java/io/prestosql/plugin/example
下的所有样本文件复制到hetu-redis
模块中,然后对Examplexxx.java
重命名为Redisxxx.java
,建议利用idea的智能重构由于Redis是典型的非JDBC数据源,和适配最早的kafka相似,故我主要参考kafka
主要参照如下结构图依次开发
功能概述
redis和kafka都是典型的非jdbc,故要将一行数据映射成表的一行,故要定义表结构,以tpch(常见的基础测试程序)的cutomer表为例
{"tableName": "customer","schemaName": "default","key": {"dataFormat": "raw","name":"id","fields": [{"name": "redis_key","type": "VARCHAR(64)","hidden": "false"}]},"value": {"dataFormat": "json","fields": [{"name": "custkey","mapping": "custkey","type": "BIGINT","hidden":"false"},{"name": "name","mapping": "name","type": "VARCHAR(25)","hidden":"false"},{"name": "address","mapping": "address","type": "VARCHAR(40)"},{"name": "nationkey","mapping": "nationkey","type": "BIGINT"},{"name": "phone","mapping": "phone","type": "VARCHAR(15)"},{"name": "acctbal","mapping": "acctbal","type": "DOUBLE"},{"name": "mktsegment","mapping": "mktsegment","type": "VARCHAR(10)"},{"name": "comment","mapping": "comment","type": "VARCHAR(117)"}]} }
将该表结构写入到
redis.table-description-dir
(默认值etc/redis
)目录下相应的在redis中拥有类似的数据
127.0.0.1:6379> get customer:174 "{\"custkey\":82,\"name\":\"Customer#000000082\",\"address\":\"zhG3EZbap4c992Gj3bK,3Ne,Xn\",\"nationkey\":18,\"phone\":\"28-159-442-5305\",\"acctbal\":9468.34,\"mktsegment\":\"AUTOMOBILE\",\"comment\":\"s wake. bravely regular accounts are furiously. regula\"}"
key值:
schemaName:tableName:id
id由插入的顺序先后决定,schemaName若为default,可如上,省略不写value值:根据表定义json文件转换具体的字段
启动hetu-server
bin/launcher run
启动hetu-cli
java -jar hetu-cli-1.7.0-SNAPSHOT-executable.jar
1. description层
由上述功能概述可知程序启动首先就要知道表结构,故首先读取
redis.table-description-dir
(默认值etc/redis
)目录下的json文件,将json文件映射成java对象。不难得该json表定义文件可划分为三层,分别将其定义为
在构造函数加上
@JsonCreator
注解除此之外还需要一个类用来读取表json文件,即
RedisTableDescriptionSupplier
public class RedisTableDescriptionSupplierimplements Supplier<Map<SchemaTableName,RedisTableDescription>> {}
主要实现
public Map<SchemaTableName, RedisTableDescription> get()
方法首先从json表定义文件中读取数据,然后将表定义与redis.properties文件做对比,json表定义文件必须在properties中的"redis.table-names"有对应定义,否则视为表定义无效,如customer表定义文件在redis.table-names中必须有default.customer,同时redis.table-names无对应的json,那么就会视为dummy table即虚表只支持内部列(完全由connector管理,数据格式都默认为row)
2. 必要实现类
1. Plugin
该文件相当于整个插件模块的main方法,主要是供系统调用获取RedisConnector的工厂方法,可直接在该类中传入一个
Supplier<Map<SchemaTableName, RedisTableDescription>>
(Supplier仅有get()方法,用于值可有可无的情况)@ConnectorConfig(connectorLabel = "Redis: Alllow the use of Redis KV as tables in openLooKeng") public class RedisPluginimplements Plugin {private Optional<Supplier<Map<SchemaTableName, RedisTableDescription>>> tableDescriptionSupplier=Optional.empty();public synchronized void setTableDescriptionSupplier(Supplier<Map<SchemaTableName, RedisTableDescription>> tableDescriptionSupplier) {this.tableDescriptionSupplier=Optional.ofNullable(tableDescriptionSupplier);}@Overridepublic synchronized Iterable<ConnectorFactory> getConnectorFactories(){return ImmutableList.of(new RedisConnectorFactory(tableDescriptionSupplier));} }
2. ConnectorFactory
public class RedisConnectorFactory implements ConnectorFactory
实现函数
String getName();
返回插件名字即
redis
ConnectorHandleResolver getHandleResolver();
返回一个
RedisHandleResolver()
对象Connector create(String catalogName, Map<String, String> config, ConnectorContext context);
Bootstrap app = new Bootstrap(new JsonModule(),new RedisModule(context.getTypeManager(),tableDescriptionSupplier,context.getNodeManager()) );Injector injector = app.strictConfig().doNotInitializeLogging().setRequiredConfigurationProperties(requiredConfig).initialize(); return injector.getInstance(RedisConnector.class);
主要用来定义Bootstrap,传入实现了Module接口的对象,module类主要实现configure方法,该方法中定义需要反射注入的类。
即bootstrap主要用于启动反射注入。
3. Module
public class RedisModule implements Module
module管理plugin大部分对象的反射创建。主要实现configure方法,主要定义需要反射注入那些类,相应的类的构造函数唯一且有
@Injector
注解,该类构造函数传入的对象需要在该处定义public void configure(Binder binder) {configBinder(binder).bindConfig(RedisConfig.class);if (tableDescriptionSupplier.isPresent()){binder.bind(new TypeLiteral<Supplier<Map<SchemaTableName, RedisTableDescription>>>() {}).toInstance(tableDescriptionSupplier.get());}else {binder.bind(new TypeLiteral<Supplier<Map<SchemaTableName,RedisTableDescription>>>() {}).to((Class<? extends Supplier<Map<SchemaTableName, RedisTableDescription>>>) RedisTableDescriptionSupplier.class).in(Scopes.SINGLETON);} //如果在plugin没有传入Map<SchemaTableName, RedisTableDescription> 那么就会启用RedisTableDescriptionSupplier读取json表文件来获取Mapbinder.bind(NodeManager.class).toInstance(nodeManager);binder.bind(TypeManager.class).toInstance(typeManager);binder.bind(RedisConnector.class).in(Scopes.SINGLETON);binder.bind(RedisMetadata.class).in(Scopes.SINGLETON);binder.bind(RedisSplitManager.class).in(Scopes.SINGLETON);binder.bind(RedisRecordSetProvider.class).in(Scopes.SINGLETON);binder.bind(RedisJedisManager.class).in(Scopes.SINGLETON);jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);jsonCodecBinder(binder).bindMapJsonCodec(String.class, listJsonCodec(RedisTableHandle.class));jsonCodecBinder(binder).bindJsonCodec(RedisTableDescription.class);binder.install(new RedisDecoderModule());//相当于是将RedisDecoderModule中configure内容copy到这里 }
4. Config
- 当我们使用一个插件时我们需要定义一个properties配置文件,故这里需要将preperties映射成一个对象
属性
字段名 properties中key名 功能 redisPassword redis.password redis密码 redisConnectTimeout redis.connect-timeout 连接超时,默认 2000ms
defaultSchema(default) redis.default-schema 默认schema名,如果未明确指定schema,使用此 tableNames redis.table-names 表名,例 default.customer,tpch.national tableDescriptionDir redis.table-description-dir 表定义文件夹相对路径,默认是 etc/redis/
hideInternalColumns redis.hide-internal-columns 是否在元数据中隐藏内部列,默认是true keyPrefixSchemaTable redis.key-prefix-schema-table key是否符合 schema:table:*
格式,如果false则将当前数据库所有key当成表的key,默认是falsenodes redis.nodes redis的Ip:port 结合 redisScanCount redis.scan-count 主要通过scan的方式获取key,这里是指每轮scan获取的key数量。默认是100 redisDataBaseIndex redis.database-index redis的数据库索引 redisKeyDelimiter redis.key-delimiter schema和table的分隔符,默认是 :
tableDescriptionFlushInterval redis.table-description-interval description在内存中的持久时间(ms),默认是一直留存在内存中。 例
@Config("redis.table-description-dir") public RedisConfig setTableDescriptionDir(File tableDescriptionDir) {this.tableDescriptionDir = tableDescriptionDir;return this; }
5. Connector
public class RedisConnector implements Connector
主要是作为一个组织者,用于管理metadata、splitManage、recordSetProvider
6. metadata
public class RedisMetadata implements ConnectorMetadata
metadata主要是管理元数据,在使用
describe
等sql语句时会调用相应的方法。
7. jedisManager
- 主要作用是创建销毁一个并发安全的jedis连接池,主要通过
ConcurrentHashMap
实现。需要传入redisconfig相应的配置。
3. handle 层
其中handleResolver用于返回三个类的class用于反射注入,handleResolver在connectorFactory创建对象并返回
TableHandle主要作用于metadata对象中,由
getTableHandle
方法根据Map<SchemaTableName, RedisTableDescription>
创建ColumnHandle主要由
RedisTableFieldDescription
和RedisInternalFieldDescription
创建,主要作用于record层,即对于数据的读取映射处理。
4. decoder层
主要是反射注入key和value格式的解码器,根据需要自定义解码器
public class RedisDecoderModule implements Module {@Overridepublic void configure(Binder binder) {MapBinder<String, RowDecoderFactory> decoderFactoriesByName = MapBinder.newMapBinder(binder, String.class, RowDecoderFactory.class);decoderFactoriesByName.addBinding(DummyRowDecoder.NAME).to(DummyRowDecoderFactory.class).in(SINGLETON);decoderFactoriesByName.addBinding(CsvRowDecoder.NAME).to(CsvRowDecoderFactory.class).in(SINGLETON);decoderFactoriesByName.addBinding(JsonRowDecoder.NAME).to(JsonRowDecoderFactory.class).in(SINGLETON);decoderFactoriesByName.addBinding(RawRowDecoder.NAME).to(RawRowDecoderFactory.class).in(SINGLETON);decoderFactoriesByName.addBinding(AvroRowDecoder.NAME).to(AvroRowDecoderFactory.class).in(SINGLETON);decoderFactoriesByName.addBinding(ZsetRedisRowDecoder.NAME).to(ZsetRedisRowDecoderFactory.class).in(SINGLETON);decoderFactoriesByName.addBinding(HashRedisRowDecoder.NAME).to(HashRedisRowDecoderFactory.class).in(SINGLETON);binder.bind(DispatchingRowDecoderFactory.class).in(SINGLETON);} }
5. split层
- 该层没有大的改动,当且仅当key的格式是zset时,即顺序排序后,才具备将数据切片的条件,默认是将整个表当成一个单分区。
6. recoder层
RedisRecordSetProvider
主要实现getRecordSet,而该方法主要作用在于根据key和value的数据格式用decoderFactory
创建相应的解码器,并将其传入recordsetRedisRecordSet
几乎没有什么作用,只是根据columnHandles返回 列的类型和将RecordSetProvider的参数传入给RecordCursor。RedisRecordCursor
起到整个Plugin的核心作用,即从redis中读取相应的数据,主要是围绕FieldValueProvider[] currentRowValues
,首先在构造函数中,如果key的格式不是zset,那么就需要用scan来扫描获取相应的大量key,用scan需要配置ScanParams,重点是match,如果配置
redis.key-prefix-schema-table
为false,那么match为空,将数据库所有key当成表的key,默认match的是schema:table:*
,如果表的schema为default,那么在redis中对应的key值可以省略schema,例如customer:id
,而不用default:customer:id
。最后初步获取keysIterator,数量和redis.scan-count
值相同advanceNextPosition
:主要由上层调用,每次调用,判断当前keysIterator是否消耗完,如果消耗完,那么再次scan一遍,如果没有消耗完,那么就消耗keysIterator一次,将key用于redis中数据读取,将读取的数据用相应的解码器解码,将解码后的数据,按照ColumnHandle
的顺序写入到currentRowValues
中。即上层每调用一次advanceNextPosition
方法就获取一行数据,然后将数据转换写入到currentRowValues
,如果方法返回false意味着数据读取完毕。剩下的
getObject
、getLong
等方法都是对currentRowValues
的读取类型转换
总结
执行流程:
- 首先Plugin实现类起到了一个main入口作用,得到一个ConnectorFactory工厂,工厂每次create时,首先向Bootstrap注册module实现类(module中主要是注册了整个plugin需要反射注入的类),启动Bootstrap,意味着反射创建整个plugin需要的java对象,bootstrap起到了类似spring容器的作用。create返回一个connector类,该connector本身作用不大,主要是整合
metadata
、splitManage
、recordSetProvider
- bootstarp启动后,从properties文件读取映射成一个config对象,读取表定义json文件,映射成
Map<SchemaTableName, RedisTableDescription>
- metadata主要是对
Map<SchemaTableName, RedisTableDescription>
的操作,由getTableHandle
方法将map转换成tablehandle对象 - splitmanager,主要是根据metadata生成的tablehandle对象,将数据做切片。在redis该场景下,只有当key的格式是zset,即顺序排序后,才具备切片的条件,一般默认将整个table当成单个分片。
- ConnectorRecordSetProvider,需要splitmanager得到的split和metadata对象中转换生成的tablehandle和ColumnHandle的list
- recordsetprovider首先getRecordSet方法根据split获取key和value格式的解码器,传给recordset
- recordset实现getColumnTypes()得到 List<Type> ,然后简单的传值给recordcursor
- recordcursor中的advanceNextPosition方法,每被调用一次读取一行数据,将这行数据用相应的解码器解码转换缓存为
FieldValueProvider[]
数组,其余方法皆是对FieldValueProvider[]
读取类型转换。
- 首先Plugin实现类起到了一个main入口作用,得到一个ConnectorFactory工厂,工厂每次create时,首先向Bootstrap注册module实现类(module中主要是注册了整个plugin需要反射注入的类),启动Bootstrap,意味着反射创建整个plugin需要的java对象,bootstrap起到了类似spring容器的作用。create返回一个connector类,该connector本身作用不大,主要是整合
由于openlookeng定位是OLAP,侧重于从宏观层面对大量数据的分析,故没必要实现写操作,而且对where 执行逻辑是将表的所有数据读取出来之后,然后用where过滤,有一定的局限性。
调试技巧,首先git clone openlookeng连同redis plugin编译连同并部署,
log.properties
中配置io.prestosql=DEBUG
方便调试,然后建议用bin/launcher run
启动方便在控制台查看调试信息。如果出错调试代码 只需要修改代码重新打包redis plugin大概100kB左右,覆盖plugin/redis/
中对应的jar包即可,再次run即可完成代码更新。注意:
<air.check.fail-checkstyle>false</air.check.fail-checkstyle> <air.check.skip-checkstyle>true</air.check.skip-checkstyle>
在pom.xml中properties添加如上,可跳过checkstyle检查
可能会出现如下报错
[WARNING] Rule 0: org.apache.maven.plugins.enforcer.RequireUpperBoundDeps failed with message: Rule 0: org.apache.maven.plugins.enforcer.RequireUpperBoundDeps failed with message:Failed while enforcing RequireUpperBoundDeps. The error(s) are [ Require upper bound dependencies error for org.slf4j:slf4j-api:1.7.29 paths to dependency are: +-io.hetu.core:hetu-redis:1.7.0-SNAPSHOT+-io.hetu.core:presto-main:1.7.0-SNAPSHOT+-org.slf4j:slf4j-api:1.7.29 (managed) <-- org.slf4j:slf4j-api:1.7.25 and
解决方法,在pom.xml中的某些模块导入时注意添加如下
<exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion> </exclusions>
Openlookeng Redis Connector 移植相关推荐
- Flink SQL 自定义 redis connector
一般情况下,我们不需要创建新的 connector,因为 Flink SQL 已经内置了丰富的 connector 供我们使用,但是在实际生产环境中我们的存储是多种多样的,所以原生的 connecto ...
- flink redis connector(支持flink sql)
flink redis connector(支持flink sql) 1. 背景 工作原因,需要基于flink sql做redis sink,但bahir 分支的flink connector支持只是 ...
- Redis Connector / UE4 DTRedis 插件说明
使用蓝图连接Redis,可以设置,获取,删除关键值.订阅 / 发布 自定义消息. 1 CreateRedis 连接到 Redis 服务器 Host:服务器地址 Port:服务器端口 User:用户名, ...
- openlookeng 扩展connector对接达梦和金仓
文章目录 1 概述: 2 准备环境(我这里用的是1.5.0): 2.1 idea环境OK,下载一套所需要版本的openlookeng源码:gitee: https://gitee.com/openlo ...
- OpenLooKeng / Presto Connector原理
OpenLooKeng Connector 扩展 一.什么是Connector 我们知道在整个Presto工程中所有的功能都是以插件的形式实现,而Connector则是一种负责Presto与数据源进行 ...
- 数据湖探索DLI新功能:基于openLooKeng的交互式分析
摘要:基于华为开源openLooKeng引擎的交互式分析功能,将重磅发布便于用户构建轻量级流.批.交互式全场景数据湖. 在这个"信息爆炸"的时代,大数据已经成为这个时代的关键词之一 ...
- flink sql 知其所以然(二)| 自定义 redis 数据维表(附源码)
感谢您的关注 + 点赞 + 再看,对博主的肯定,会督促博主持续的输出更多的优质实战内容!!! 1.序篇-本文结构 背景篇-为啥需要 redis 维表 目标篇-做 redis 维表的预期效果是什么 ...
- 即兴小探华为开源行业领先大数据虚拟化引擎openLooKeng
文章目录 概述 定义 背景 特点 架构 关键技术 应用场景 安装 单台部署 集群部署 命令行接口 连接器 MySQL连接器 ClickHouse连接器 概述 定义 openLooKeng 官网地址 h ...
- Redis搭建及使用
Redis篇 一.Redis介绍 1.1Redis解决项目问题 为了降低对数据库的访问压力,当多个用户请求相同的数据时,我们可以第一时间从数据库查询到的数据进行缓存(存储到内存中),以减少数据库的访问 ...
最新文章
- ReactNative环境配置的坑
- SQLite Version3.3.6源代码文件结构
- docker 推送到本地仓库_Docker_学习笔记系列之仓库
- JDBC-连接数据库代码
- Linux内核引导简析
- 有关phpmailer的详细介绍及使用方法
- c++ java通信 protocol buffer,google protocol buffer (C++,Java序列化应用实例)
- jdk和jre是什么?都有什么用?
- 代理模式-Java实现-静态代理、动态代理
- 那年我整理的JavaEE面试题
- java 64内存不足_window7 64bit解决tomcat内存不足问题
- java经典源码 阅读_公开!阿里甩出“源码阅读指南”,原来源码才是最经典的学习范例...
- 从MySQL复制功能中得到一举三得实惠
- JDBC连接Oracle数据库时出现的ORA-12505错误及解决办法.
- MySQL数据库字段级权限设计
- 设计师的“通天塔”—浅谈设计沟通
- 云主机是什么,怎么才能购买性价比高的云主机
- Android游戏开发+实战开发教程视频
- AVS2的GB帧与s帧
- Windows 七种截图方式 快捷键 系统自带 工具软件
热门文章
- Oracle spatial 空间修正函数(SDO_UTIL.RECTIFY_GEOMETRY)
- Experiment 3. Stack and Queue
- 输入圆形半径,求圆形的周长和圆形的面积,并将结果输出。
- 2019牛客暑期多校训练营(第一场)E-ABBA(dp)
- Hyperion Research:2021年量子计算市场收入已达4.9亿美元
- 3.《程序猿扯淡系列》约会的艺术--教你如何逆袭
- 复杂交联环境下的测试任务快速开发工具系统情况
- web请求流程与HTTP方法刨析
- Windows10 蓝屏 DRIVER_IRQL_NOT_LESS_OR_EQUAL (vfilter.sys)的可能解决方法
- 虹口区企业技术中心认定条件及奖励政策解读