Redis Plugin

初步配置

  • git clone 后在项目根目录下新建一个模块presto-redis

  • 模块的初步配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>presto-root</artifactId><groupId>io.hetu.core</groupId><version>1.7.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>hetu-redis</artifactId><packaging>hetu-plugin</packaging><!--    <properties>-->
<!--        <maven.compiler.source>8</maven.compiler.source>-->
<!--        <maven.compiler.target>8</maven.compiler.target>-->
<!--    </properties>--><properties>
<!--        <air.main.basedir>${project.parent.basedir}</air.main.basedir>--></properties><dependencies><dependency><groupId>io.hetu.core</groupId><artifactId>presto-spi</artifactId><version>1.7.0-SNAPSHOT</version><scope>provided</scope></dependency><dependency><groupId>io.airlift</groupId><artifactId>bootstrap</artifactId>
<!--            <version></version>--></dependency><dependency><groupId>io.airlift</groupId><artifactId>json</artifactId>
<!--            <version></version>--></dependency></dependencies></project>
  • 注册Plugin

    在模块hetu-server中的hetu-server/src/main/provisio/hetu.xml文件中添加如下

    <artifactSet><artifact id="${project.groupId}:hetu-redis:zip:${project.version}"><unpack /></artifact>
    </artifactSet>
    
  • 将模块presto-example-http/src/main/java/io/prestosql/plugin/example下的所有样本文件复制到hetu-redis模块中,然后对Examplexxx.java重命名为Redisxxx.java,建议利用idea的智能重构

  • 由于Redis是典型的非JDBC数据源,和适配最早的kafka相似,故我主要参考kafka

    主要参照如下结构图依次开发

功能概述

  • redis和kafka都是典型的非jdbc,故要将一行数据映射成表的一行,故要定义表结构,以tpch(常见的基础测试程序)的cutomer表为例

    {"tableName": "customer","schemaName": "default","key": {"dataFormat": "raw","name":"id","fields": [{"name": "redis_key","type": "VARCHAR(64)","hidden": "false"}]},"value": {"dataFormat": "json","fields": [{"name": "custkey","mapping": "custkey","type": "BIGINT","hidden":"false"},{"name": "name","mapping": "name","type": "VARCHAR(25)","hidden":"false"},{"name": "address","mapping": "address","type": "VARCHAR(40)"},{"name": "nationkey","mapping": "nationkey","type": "BIGINT"},{"name": "phone","mapping": "phone","type": "VARCHAR(15)"},{"name": "acctbal","mapping": "acctbal","type": "DOUBLE"},{"name": "mktsegment","mapping": "mktsegment","type": "VARCHAR(10)"},{"name": "comment","mapping": "comment","type": "VARCHAR(117)"}]}
    }

    将该表结构写入到redis.table-description-dir(默认值etc/redis)目录下

  • 相应的在redis中拥有类似的数据

    127.0.0.1:6379> get customer:174
    "{\"custkey\":82,\"name\":\"Customer#000000082\",\"address\":\"zhG3EZbap4c992Gj3bK,3Ne,Xn\",\"nationkey\":18,\"phone\":\"28-159-442-5305\",\"acctbal\":9468.34,\"mktsegment\":\"AUTOMOBILE\",\"comment\":\"s wake. bravely regular accounts are furiously. regula\"}"
    

    key值:schemaName:tableName:id id由插入的顺序先后决定,schemaName若为default,可如上,省略不写

    value值:根据表定义json文件转换具体的字段

  • 启动hetu-server bin/launcher run

  • 启动hetu-cli java -jar hetu-cli-1.7.0-SNAPSHOT-executable.jar

1. description层

  • 由上述功能概述可知程序启动首先就要知道表结构,故首先读取redis.table-description-dir(默认值etc/redis)目录下的json文件,将json文件映射成java对象。

  • 不难得该json表定义文件可划分为三层,分别将其定义为

    在构造函数加上@JsonCreator注解

  • 除此之外还需要一个类用来读取表json文件,即RedisTableDescriptionSupplier

    public class RedisTableDescriptionSupplierimplements Supplier<Map<SchemaTableName,RedisTableDescription>> {}
    

    主要实现public Map<SchemaTableName, RedisTableDescription> get() 方法

    首先从json表定义文件中读取数据,然后将表定义与redis.properties文件做对比,json表定义文件必须在properties中的"redis.table-names"有对应定义,否则视为表定义无效,如customer表定义文件在redis.table-names中必须有default.customer,同时redis.table-names无对应的json,那么就会视为dummy table即虚表只支持内部列(完全由connector管理,数据格式都默认为row)

2. 必要实现类

1. Plugin

  • 该文件相当于整个插件模块的main方法,主要是供系统调用获取RedisConnector的工厂方法,可直接在该类中传入一个Supplier<Map<SchemaTableName, RedisTableDescription>> (Supplier仅有get()方法,用于值可有可无的情况)

    @ConnectorConfig(connectorLabel = "Redis: Alllow the use of Redis KV as tables in openLooKeng")
    public class RedisPluginimplements Plugin
    {private Optional<Supplier<Map<SchemaTableName, RedisTableDescription>>> tableDescriptionSupplier=Optional.empty();public  synchronized void setTableDescriptionSupplier(Supplier<Map<SchemaTableName, RedisTableDescription>> tableDescriptionSupplier) {this.tableDescriptionSupplier=Optional.ofNullable(tableDescriptionSupplier);}@Overridepublic synchronized Iterable<ConnectorFactory> getConnectorFactories(){return ImmutableList.of(new RedisConnectorFactory(tableDescriptionSupplier));}
    }
    

2. ConnectorFactory

  • public class RedisConnectorFactory implements ConnectorFactory

  • 实现函数

    • String getName();

      返回插件名字即 redis

    • ConnectorHandleResolver getHandleResolver();

      返回一个RedisHandleResolver() 对象

    • Connector create(String catalogName, Map<String, String> config, ConnectorContext context);

      Bootstrap app = new Bootstrap(new JsonModule(),new RedisModule(context.getTypeManager(),tableDescriptionSupplier,context.getNodeManager())
      );Injector injector = app.strictConfig().doNotInitializeLogging().setRequiredConfigurationProperties(requiredConfig).initialize();
      return injector.getInstance(RedisConnector.class);
      

      主要用来定义Bootstrap,传入实现了Module接口的对象,module类主要实现configure方法,该方法中定义需要反射注入的类。

      即bootstrap主要用于启动反射注入。

3. Module

  • public class RedisModule implements Module module管理plugin大部分对象的反射创建。

  • 主要实现configure方法,主要定义需要反射注入那些类,相应的类的构造函数唯一且有@Injector注解,该类构造函数传入的对象需要在该处定义

    public void configure(Binder binder)
    {configBinder(binder).bindConfig(RedisConfig.class);if (tableDescriptionSupplier.isPresent()){binder.bind(new TypeLiteral<Supplier<Map<SchemaTableName, RedisTableDescription>>>() {}).toInstance(tableDescriptionSupplier.get());}else {binder.bind(new TypeLiteral<Supplier<Map<SchemaTableName,RedisTableDescription>>>() {}).to((Class<? extends Supplier<Map<SchemaTableName, RedisTableDescription>>>) RedisTableDescriptionSupplier.class).in(Scopes.SINGLETON);} //如果在plugin没有传入Map<SchemaTableName, RedisTableDescription> 那么就会启用RedisTableDescriptionSupplier读取json表文件来获取Mapbinder.bind(NodeManager.class).toInstance(nodeManager);binder.bind(TypeManager.class).toInstance(typeManager);binder.bind(RedisConnector.class).in(Scopes.SINGLETON);binder.bind(RedisMetadata.class).in(Scopes.SINGLETON);binder.bind(RedisSplitManager.class).in(Scopes.SINGLETON);binder.bind(RedisRecordSetProvider.class).in(Scopes.SINGLETON);binder.bind(RedisJedisManager.class).in(Scopes.SINGLETON);jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);jsonCodecBinder(binder).bindMapJsonCodec(String.class, listJsonCodec(RedisTableHandle.class));jsonCodecBinder(binder).bindJsonCodec(RedisTableDescription.class);binder.install(new RedisDecoderModule());//相当于是将RedisDecoderModule中configure内容copy到这里
    }
    

4. Config

  • 当我们使用一个插件时我们需要定义一个properties配置文件,故这里需要将preperties映射成一个对象

  • 属性

    字段名 properties中key名 功能
    redisPassword redis.password redis密码
    redisConnectTimeout redis.connect-timeout 连接超时,默认2000ms
    defaultSchema(default) redis.default-schema 默认schema名,如果未明确指定schema,使用此
    tableNames redis.table-names 表名,例 default.customer,tpch.national
    tableDescriptionDir redis.table-description-dir 表定义文件夹相对路径,默认是etc/redis/
    hideInternalColumns redis.hide-internal-columns 是否在元数据中隐藏内部列,默认是true
    keyPrefixSchemaTable redis.key-prefix-schema-table key是否符合schema:table:*格式,如果false则将当前数据库所有key当成表的key,默认是false
    nodes redis.nodes redis的Ip:port 结合
    redisScanCount redis.scan-count 主要通过scan的方式获取key,这里是指每轮scan获取的key数量。默认是100
    redisDataBaseIndex redis.database-index redis的数据库索引
    redisKeyDelimiter redis.key-delimiter schema和table的分隔符,默认是:
    tableDescriptionFlushInterval redis.table-description-interval description在内存中的持久时间(ms),默认是一直留存在内存中。
  • @Config("redis.table-description-dir")
    public RedisConfig setTableDescriptionDir(File tableDescriptionDir) {this.tableDescriptionDir = tableDescriptionDir;return this;
    }
    

5. Connector

  • public class RedisConnector implements Connector

  • 主要是作为一个组织者,用于管理metadata、splitManage、recordSetProvider

6. metadata

  • public class RedisMetadata implements ConnectorMetadata

  • metadata主要是管理元数据,在使用describe等sql语句时会调用相应的方法。

7. jedisManager

  • 主要作用是创建销毁一个并发安全的jedis连接池,主要通过ConcurrentHashMap实现。需要传入redisconfig相应的配置。

3. handle 层

  • 其中handleResolver用于返回三个类的class用于反射注入,handleResolver在connectorFactory创建对象并返回

  • TableHandle主要作用于metadata对象中,由getTableHandle方法根据Map<SchemaTableName, RedisTableDescription>创建

  • ColumnHandle主要由RedisTableFieldDescriptionRedisInternalFieldDescription创建,主要作用于record层,即对于数据的读取映射处理。

4. decoder层

  • 主要是反射注入key和value格式的解码器,根据需要自定义解码器

  • public class RedisDecoderModule implements Module {@Overridepublic void configure(Binder binder) {MapBinder<String, RowDecoderFactory> decoderFactoriesByName = MapBinder.newMapBinder(binder, String.class, RowDecoderFactory.class);decoderFactoriesByName.addBinding(DummyRowDecoder.NAME).to(DummyRowDecoderFactory.class).in(SINGLETON);decoderFactoriesByName.addBinding(CsvRowDecoder.NAME).to(CsvRowDecoderFactory.class).in(SINGLETON);decoderFactoriesByName.addBinding(JsonRowDecoder.NAME).to(JsonRowDecoderFactory.class).in(SINGLETON);decoderFactoriesByName.addBinding(RawRowDecoder.NAME).to(RawRowDecoderFactory.class).in(SINGLETON);decoderFactoriesByName.addBinding(AvroRowDecoder.NAME).to(AvroRowDecoderFactory.class).in(SINGLETON);decoderFactoriesByName.addBinding(ZsetRedisRowDecoder.NAME).to(ZsetRedisRowDecoderFactory.class).in(SINGLETON);decoderFactoriesByName.addBinding(HashRedisRowDecoder.NAME).to(HashRedisRowDecoderFactory.class).in(SINGLETON);binder.bind(DispatchingRowDecoderFactory.class).in(SINGLETON);}
    }
    

5. split层

  • 该层没有大的改动,当且仅当key的格式是zset时,即顺序排序后,才具备将数据切片的条件,默认是将整个表当成一个单分区。

6. recoder层

  • RedisRecordSetProvider主要实现getRecordSet,而该方法主要作用在于根据key和value的数据格式用decoderFactory 创建相应的解码器,并将其传入recordset

  • RedisRecordSet几乎没有什么作用,只是根据columnHandles返回 列的类型和将RecordSetProvider的参数传入给RecordCursor。

  • RedisRecordCursor 起到整个Plugin的核心作用,即从redis中读取相应的数据,主要是围绕FieldValueProvider[] currentRowValues

  • 首先在构造函数中,如果key的格式不是zset,那么就需要用scan来扫描获取相应的大量key,用scan需要配置ScanParams,重点是match,如果配置 redis.key-prefix-schema-table 为false,那么match为空,将数据库所有key当成表的key,默认match的是schema:table:*,如果表的schema为default,那么在redis中对应的key值可以省略schema,例如 customer:id,而不用default:customer:id。最后初步获取keysIterator,数量和redis.scan-count值相同

  • advanceNextPosition :主要由上层调用,每次调用,判断当前keysIterator是否消耗完,如果消耗完,那么再次scan一遍,如果没有消耗完,那么就消耗keysIterator一次,将key用于redis中数据读取,将读取的数据用相应的解码器解码,将解码后的数据,按照ColumnHandle的顺序写入到currentRowValues中。即上层每调用一次advanceNextPosition方法就获取一行数据,然后将数据转换写入到currentRowValues,如果方法返回false意味着数据读取完毕。

  • 剩下的getObjectgetLong 等方法都是对currentRowValues的读取类型转换

总结

  • 执行流程:

    1. 首先Plugin实现类起到了一个main入口作用,得到一个ConnectorFactory工厂,工厂每次create时,首先向Bootstrap注册module实现类(module中主要是注册了整个plugin需要反射注入的类),启动Bootstrap,意味着反射创建整个plugin需要的java对象,bootstrap起到了类似spring容器的作用。create返回一个connector类,该connector本身作用不大,主要是整合metadatasplitManagerecordSetProvider
    2. bootstarp启动后,从properties文件读取映射成一个config对象,读取表定义json文件,映射成Map<SchemaTableName, RedisTableDescription>
    3. metadata主要是对Map<SchemaTableName, RedisTableDescription>的操作,由getTableHandle方法将map转换成tablehandle对象
    4. splitmanager,主要是根据metadata生成的tablehandle对象,将数据做切片。在redis该场景下,只有当key的格式是zset,即顺序排序后,才具备切片的条件,一般默认将整个table当成单个分片。
    5. ConnectorRecordSetProvider,需要splitmanager得到的split和metadata对象中转换生成的tablehandle和ColumnHandle的list
      1. recordsetprovider首先getRecordSet方法根据split获取key和value格式的解码器,传给recordset
      2. recordset实现getColumnTypes()得到 List<Type> ,然后简单的传值给recordcursor
      3. recordcursor中的advanceNextPosition方法,每被调用一次读取一行数据,将这行数据用相应的解码器解码转换缓存为FieldValueProvider[]数组,其余方法皆是对FieldValueProvider[]读取类型转换。
  • 由于openlookeng定位是OLAP,侧重于从宏观层面对大量数据的分析,故没必要实现写操作,而且对where 执行逻辑是将表的所有数据读取出来之后,然后用where过滤,有一定的局限性。

  • 调试技巧,首先git clone openlookeng连同redis plugin编译连同并部署,log.properties中配置io.prestosql=DEBUG方便调试,然后建议用 bin/launcher run启动方便在控制台查看调试信息。如果出错调试代码 只需要修改代码重新打包redis plugin大概100kB左右,覆盖plugin/redis/中对应的jar包即可,再次run即可完成代码更新。

  • 注意:

    <air.check.fail-checkstyle>false</air.check.fail-checkstyle>
    <air.check.skip-checkstyle>true</air.check.skip-checkstyle>
    

    在pom.xml中properties添加如上,可跳过checkstyle检查

    可能会出现如下报错

    [WARNING] Rule 0: org.apache.maven.plugins.enforcer.RequireUpperBoundDeps failed with message:
    Rule 0: org.apache.maven.plugins.enforcer.RequireUpperBoundDeps failed with message:Failed while enforcing RequireUpperBoundDeps. The error(s) are [
    Require upper bound dependencies error for org.slf4j:slf4j-api:1.7.29 paths to dependency are:
    +-io.hetu.core:hetu-redis:1.7.0-SNAPSHOT+-io.hetu.core:presto-main:1.7.0-SNAPSHOT+-org.slf4j:slf4j-api:1.7.29 (managed) <-- org.slf4j:slf4j-api:1.7.25
    and
    

    解决方法,在pom.xml中的某些模块导入时注意添加如下

    <exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion>
    </exclusions>
    

Openlookeng Redis Connector 移植相关推荐

  1. Flink SQL 自定义 redis connector

    一般情况下,我们不需要创建新的 connector,因为 Flink SQL 已经内置了丰富的 connector 供我们使用,但是在实际生产环境中我们的存储是多种多样的,所以原生的 connecto ...

  2. flink redis connector(支持flink sql)

    flink redis connector(支持flink sql) 1. 背景 工作原因,需要基于flink sql做redis sink,但bahir 分支的flink connector支持只是 ...

  3. Redis Connector / UE4 DTRedis 插件说明

    使用蓝图连接Redis,可以设置,获取,删除关键值.订阅 / 发布 自定义消息. 1 CreateRedis 连接到 Redis 服务器 Host:服务器地址 Port:服务器端口 User:用户名, ...

  4. openlookeng 扩展connector对接达梦和金仓

    文章目录 1 概述: 2 准备环境(我这里用的是1.5.0): 2.1 idea环境OK,下载一套所需要版本的openlookeng源码:gitee: https://gitee.com/openlo ...

  5. OpenLooKeng / Presto Connector原理

    OpenLooKeng Connector 扩展 一.什么是Connector 我们知道在整个Presto工程中所有的功能都是以插件的形式实现,而Connector则是一种负责Presto与数据源进行 ...

  6. 数据湖探索DLI新功能:基于openLooKeng的交互式分析

    摘要:基于华为开源openLooKeng引擎的交互式分析功能,将重磅发布便于用户构建轻量级流.批.交互式全场景数据湖. 在这个"信息爆炸"的时代,大数据已经成为这个时代的关键词之一 ...

  7. flink sql 知其所以然(二)| 自定义 redis 数据维表(附源码)

    感谢您的关注  +  点赞 + 再看,对博主的肯定,会督促博主持续的输出更多的优质实战内容!!! 1.序篇-本文结构 背景篇-为啥需要 redis 维表 目标篇-做 redis 维表的预期效果是什么 ...

  8. 即兴小探华为开源行业领先大数据虚拟化引擎openLooKeng

    文章目录 概述 定义 背景 特点 架构 关键技术 应用场景 安装 单台部署 集群部署 命令行接口 连接器 MySQL连接器 ClickHouse连接器 概述 定义 openLooKeng 官网地址 h ...

  9. Redis搭建及使用

    Redis篇 一.Redis介绍 1.1Redis解决项目问题 为了降低对数据库的访问压力,当多个用户请求相同的数据时,我们可以第一时间从数据库查询到的数据进行缓存(存储到内存中),以减少数据库的访问 ...

最新文章

  1. ReactNative环境配置的坑
  2. SQLite Version3.3.6源代码文件结构
  3. docker 推送到本地仓库_Docker_学习笔记系列之仓库
  4. JDBC-连接数据库代码
  5. Linux内核引导简析
  6. 有关phpmailer的详细介绍及使用方法
  7. c++ java通信 protocol buffer,google protocol buffer (C++,Java序列化应用实例)
  8. jdk和jre是什么?都有什么用?
  9. 代理模式-Java实现-静态代理、动态代理
  10. 那年我整理的JavaEE面试题
  11. java 64内存不足_window7 64bit解决tomcat内存不足问题
  12. java经典源码 阅读_公开!阿里甩出“源码阅读指南”,原来源码才是最经典的学习范例...
  13. 从MySQL复制功能中得到一举三得实惠
  14. JDBC连接Oracle数据库时出现的ORA-12505错误及解决办法.
  15. MySQL数据库字段级权限设计
  16. 设计师的“通天塔”—浅谈设计沟通
  17. 云主机是什么,怎么才能购买性价比高的云主机
  18. Android游戏开发+实战开发教程视频
  19. AVS2的GB帧与s帧
  20. Windows 七种截图方式 快捷键 系统自带 工具软件

热门文章

  1. Oracle spatial 空间修正函数(SDO_UTIL.RECTIFY_GEOMETRY)
  2. Experiment 3. Stack and Queue
  3. 输入圆形半径,求圆形的周长和圆形的面积,并将结果输出。
  4. 2019牛客暑期多校训练营(第一场)E-ABBA(dp)
  5. Hyperion Research:2021年量子计算市场收入已达4.9亿美元
  6. 3.《程序猿扯淡系列》约会的艺术--教你如何逆袭
  7. 复杂交联环境下的测试任务快速开发工具系统情况
  8. web请求流程与HTTP方法刨析
  9. Windows10 蓝屏 DRIVER_IRQL_NOT_LESS_OR_EQUAL (vfilter.sys)的可能解决方法
  10. 虹口区企业技术中心认定条件及奖励政策解读