Blink创建数据维表

概述

在维表DDL语法中增加1行PERIOD FOR SYSTEM_TIME的声明,定义维表的变化周期,即可使用标准的CREATE TABLE语法定义实时计算维表。

示例

CREATE TABLE white_list (id varchar,name varchar,age int,PRIMARY KEY (id),PERIOD FOR SYSTEM_TIME  --定义维表的变化周期。实时计算3.x及以上版本,维表DDL中可以不声明该句,在维表JOIN时,声明FOR SYSTEM_TIME AS OF PROCTIME()即可。
) with (type = 'RDS',...
);
  • 维表必须指定主键。维表JOIN时,ON的条件必须包含所有主键的等值条件。
  • 目前仅支持源表INNER JOINLEFT JOIN维表。
  • 维表的唯一键(UK)必须为数据库表中的唯一键。如果维表声明的唯一键不是数据库表的唯一键会产生以下影响:
    • 维表的读取速度变慢。
    • 在维表JOIN时,会从第一条数据进行JOIN,在加入Job的过程中,相同KEY的多条记录在数据库中按顺序发生变化,可能导致JOIN结果错误。

INDEX语法

维表定义要求声明PRIMARY KEY,这种情况下只能实现一对一连接。为支持一对多连接的需求,引入了INDEX语法。非Cache All的维表JOIN通过INDEX LOOKUP的方式实现一对多连接的需求。

CREATE TABLE Persons (ID bigint,LastName varchar,FirstName varchar,Nick varchar,Age int,[UNIQUE] INDEX(LastName,FirstName,Nick), --定义INDEX,不需要指定具体的类型,例如,fulltext或clustered等。PERIOD FOR SYSTEM_TIME
) with (type='RDS',...
);

UNIQUE INDEX表示一对一连接,而INDEX表示一对多连接。

注意事项

  • 实时计算2.2.7及以后版本支持UNIQUE CONSTRAINTUNIQUE KEY),实时计算2.2.7以下版本可以使用PRIMARY KEY的定义。
  • 在生成执行计划时,引擎优先采用UNIQUE INDEX。即如果DDL中使用INDEX,但JOIN等值连接条件中同时包含UNIQUENON-UNIQUE INDEX时,优先使用UNIQUE INDEX查找右表数据。
  • 支持一对多连接的维表类型,例如RDS和MaxCompute。
  • 您可以增加maxJoinRows参数,表示在一对多连接时,左表一条记录连接右表的最大记录数(默认值为1024)。在一对多连接的记录数过多时,可能会极大的影响流任务的性能,因此您需要增大Cache的内存(cacheSize限制的是左表key的个数)。
  • 表格存储Tablestore和Hologres维表不支持使用INDEX进行一对多JOIN。

维表、源表和结果表的区别

类别 源表 结果表 维表
是否能驱动计算
是否能读取数据 是,直接读取。 是,仅通过源表和维表JOIN读取。
是否能写入数据
是否支持Cache

创建交互式分析Hologres维表

什么是交互式分析Hologres

交互式分析Hologres是实时交互分析产品,兼容PostgreSQL协议,与大数据生态紧密连接,支持高并发、低延时实时分析与处理PB级数据,可以轻松使用现有BI(Business Intelligence)工具对数据进行多维分析和业务探索。

使用限制

创建Hologres维表时建议选择行存模式,列存模式对于点查场景性能开销较大。

选择行存模式创建维表时必须设置主键,并且将主键设置为clustering key才可以工作。示例语句如下:

begin;
create table test(a int primary key, b text, c text, d float8, e int8);
call set_table_property('test', 'orientation', 'row');
call set_table_property('test', 'clustering_key', 'a');
commit;

Hologres维表的主键必须是Blink Join On的字段,Blink Join On的字段也必须是维表完整的主键字段,两者必须完全匹配。

Hologres Blink Connector的维表功能不支持一对多的输出。

不支持读取Hologres分区表的数据。

语法示例

CREATE TABLE hologres_dim_table(id INT,len INT,content VARCHAR,PRIMARY KEY (id),PERIOD FOR SYSTEM_TIME  --定义维表的变化周期。
) WITH (type='hologres',endpoint='...',dbname='...',tablename='...',username='...',password='...'
);

WITH参数

参数 描述 是否必选 备注
type 数据库类型。 固定值为hologres。
endpoint Hologres端点。 无。
tablename 表名称。 无。
dbname 数据库名称。 无。
username 用户名称。 无。
password 密码。 无。

备注 如果Schema不为Public时,则tableName需要填写为schema.tableName

CACHE参数

参数 描述 是否必选 示例值
cache 缓存策略。 目前交互式分析Hologres维表支持两种缓存策略:None(默认值):无缓存。LRU:缓存维表里的部分数据。
cacheSize 缓存大小。 缓存策略选择LRU时,可以设置缓存大小,默认为10000行。
cacheTTLMs 缓存失效时间,单位为毫秒。 缓存策略选择LRU时,可以设置缓存失效时间,默认不过期。
partitionedJoin 是否根据JoinKey进行分区。 参数的取值:false(默认值):不根据JoinKey进行分区。true:根据JoinKey进行分区,将数据分发到各JOIN节点,提高缓存命中率。
async 是否异步读取数据。 默认值:false

缓存注意事项

  • 源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。
  • 需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

类型映射

Hologres字段类型 实时计算Flink版字段类型
INT INT
INT[] ARRAY<INT>
BIGINT BIGINT
BIGINT[] ARRAY<BIGINT>
REAL FLOAT
REAL[] ARRAY<FLOAT>
DOUBLE PRECISION DOUBLE
DOUBLE PRECISION[] ARRAY<DOUBLE>
BOOLEAN BOOLEAN
BOOLEAN[] ARRAY<BOOLEAN>
TEXT VARCHAR
TEXT[] ARRAY<VARCHAR>
NUMERIC DECIMAL
DATE DATE
TIMESTAMP WITH TIMEZONE TIMESTAMP

代码示例

create table randomSource (a int, b VARCHAR, c VARCHAR) with (type = 'random');create table test (a int, b VARCHAR, c VARCHAR, PRIMARY KEY (a, b), PERIOD FOR SYSTEM_TIME
) with (type = 'hologres',...
);create table print_sink (a int, b VARCHAR
) with (type = 'print', `ignoreWrite` = 'false'
);insert into print_sink
select randomSource.a, test.b from randomSource
LEFT JOIN test FOR SYSTEM_TIME AS OF PROCTIME()
on randomSource.a = test.a and randomSource.b = test.b;

创建表格存储Tablestore维表

什么是表格存储Tablestore

表格存储Tablestore是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问服务。其功能和特性类似开源的HBase

示例

CREATE TABLE ots_dim_table (id int,len int,content VARCHAR,PRIMARY KEY (id),PERIOD FOR SYSTEM_TIME--维表标识。
) WITH (type='ots',endPoint='<yourEndpoint>',instanceName='<yourInstanceName>',tableName='<yourTableName>',accessId='<yourAccessId>',accessKey='<yourAccessKey>'
);
  • 在声明维表时,必须要指名主键。
  • 在维表JOIN时,ON条件必须包含所有主键的等值条件。
  • Tablestore的主键即表的Rowkey。

WITH参数

参数 说明 备注
type 维表类型 固定值为ots
endPoint 表格存储的实例访问地址 VPC网络环境需要选择实例的VPC Endpoint。
instanceName 表格存储的实例名称
tableName 表格存储的数据表名
accessId 表格存储读取的AccessKey ID
accessKey 表格存储读取的密钥AccessKey Secret

CACHE参数

参数 说明 备注
cache 缓存策略 表格存储维表支持两种缓存策略:None(默认值):无缓存。LRU:缓存维表里的部分数据。
cacheSize 缓存大小 当选择LRU缓存策略后,可以设置缓存大小,默认为10000行。
cacheTTLMs 缓存超时时间,单位为毫秒。 当选择LRU缓存策略后,可以设置缓存失效的超时时间。

代码示例

CREATE TABLE datahub_input1 (
id            BIGINT,
name        VARCHAR,
age           BIGINT
) WITH (
type='datahub'
);CREATE TABLE phoneNumber(
name VARCHAR,
phoneNumber bigint,
primary key(name),
PERIOD FOR SYSTEM_TIME--维表标识。
)with(
type='ots'
);CREATE TABLE result_infor(
id bigint,
phoneNumber bigint,
name VARCHAR
)with(
type='rds'
);INSERT INTO result_infor
SELECT
t.id,
w.phoneNumber,
t.name
FROM datahub_input1 as t
JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w --维表JOIN时必须指定此声明。
ON t.name = w.name;

创建云数据库RDS MySQL版维表

语法示例

CREATE TABLE rds_dim_table(id INT,len INT,content VARCHAR,PRIMARY KEY (id),PERIOD FOR SYSTEM_TIME --定义维表的变化周期。
) with (type='rds',url='<yourDatabaseURL>',tableName='<yourDatabaseTableName>',userName='<yourDatabaseUserName>',password='<yourDatabasePassword>'
);

声明维表时,必须要指名主键。维表JOIN时,ON条件必须包含所有主键的等值条件。RDS或DRDS的主键可以定义为表的主键或唯一索引列。

WITH参数

参数 说明 是否必填 备注
type 维表类型 固定值为rds
url JDBC(Java DataBase Connectivity)连接地址 URL的格式为:jdbc:mysql://<内网地址>/<databaseName>,其中databaseName为对应的数据库名称。
tableName 表名
userName 用户名
password 密码
maxRetryTimes 最大尝试连接次数 默认值为10。

CACHE参数

参数 说明 是否必填 备注
cache 缓存策略 云数据库RDS(DRDS)版维表支持三种缓存策略:None(默认值):无缓存。LRU:缓存维表里的部分数据。ALL:缓存维表里的所有数据。
cacheSize 缓存大小 当选择LRU缓存策略后,可以设置缓存大小,默认10000行。
cacheTTLMs 缓存超时时间,单位为毫秒 当选择LRU缓存策略后,可以设置缓存失效的超时时间,默认不过期。当选择ALL策略,则为缓存加载的间隔时间,默认不重新加载。
cacheReloadTimeBlackList 缓存策略选择ALL 时启用。更新时间黑名单,防止在此时间内做cache更新(例如双11场景)。 默认为空。自定义输入格式为2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00。用逗号(,)来分隔多个黑名单,用箭头(->)来分割黑名单的起始结束时间。
maxJoinRows 主表中每一条数据查询维表时,匹配后最多返回的结果数。 默认值为1024。如果您可以预估一条数据对应的维表数据最多为n条,则可以设置maxJoinRows=‘n’,以确保实时计算匹配处理效率。
  • 源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。
  • 需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。
  • 在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。需要配置相关参数:缓存更新时间间隔(cacheTTLMs),更新时间黑名单(cacheReloadTimeBlackList)。
  • 因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。
  • 进行Join时,主表输入一条数据,对应维表匹配后返回的结果总数受该参数限制。

代码示例

CREATE TABLE datahub_input1 (
id            BIGINT,
name        VARCHAR,
age           BIGINT
) WITH (type='datahub',endPoint='http://dh-cn-hangzhou.aliyun-inc.com',project='<yourProjectName>',topic='<yourTopic>',accessId='<yourAccessID>',accessKey='<yourAccessSecret>',startTime='2017-07-21 00:00:00'
);create table phoneNumber(
name VARCHAR,
phoneNumber BIGINT,
primary key(name),
PERIOD FOR SYSTEM_TIME--定义维表的变化周期。
)WITH(type='rds',url='<yourDatabaseURL>',tableName='<yourDatabaseTableName>',userName='<yourDatabaseUserName>',password='<yourDatabasePassword>'
);CREATE table result_infor(
id BIGINT,
phoneNumber BIGINT,
name VARCHAR
)WITH(type='rds',url='<yourDatabaseURL>',tableName='<yourDatabaseTableName>',userName='<yourDatabaseUserName>',password='<yourDatabasePassword>'
);INSERT INTO result_infor
SELECT
t.id,
w.phoneNumber,
t.name
FROM datahub_input1 as t
JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w --维表JOIN时必须指定此声明。
ON t.name = w.name;

类型映射

RDS字段类型 实时计算Flink版字段类型
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DECIMAL DECIMAL
DOUBLE DOUBLE
DATE DATE
TIME TIME
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
VARBINARY VARBINARY

创建云数据库HBase版维表

注意事项

实时计算HBase维表不支持自建的开源HBase。

HBase维表仅支持一个PK(Primary Key)。

DDL定义

HBase企业标准版

CREATE TABLE hbase (`key` varchar, `name` varchar,PRIMARY KEY (`key`), --HBase中的rowkey字段。PERIOD FOR SYSTEM_TIME --维表标识。
) with (TYPE = 'cloudhbase',zkQuorum = '<yourzkQuorum>',columnFamily = '<yourColumnFamilyName>',tableName = '<yourTableName>'
);

HBase性能增强版

CREATE TABLE hbase (`key` varchar, `name` varchar,PRIMARY KEY (`key`), --HBase中的rowkey字段。PERIOD FOR SYSTEM_TIME --维表标识。
) with (TYPE = 'cloudhbase',endPoint = '<host:port>',--HBase增强版的Java API访问地址。userName  = 'root', --HBase用户名。password = 'root', --HBase密码。columnFamily = '<yourColumnFamilyName>',tableName = '<yourTableName>'
);

Blink-3.5.0以上HBase性能增强版

create table liuxd_user_behavior_test_front (row_key varchar,from_topic varchar,origin_data varchar,record_create_time varchar,primary key (row_key)
) with (type = 'cloudhbase',zkQuorum = '<host:port>', --HBase增强版的Java API访问地址。userName  = 'root', --HBase用户名。password = 'root', --HBase密码。columnFamily = '<yourColumnFamily>',tableName = '<yourTableName>',batchSize = '500'
);

Blink-3.5.0以上支持HBase写入主备切换

create table liuxd_user_behavior_test_front (row_key varchar,from_topic varchar,origin_data varchar,record_create_time varchar,primary key (row_key)
) with (type = 'cloudhbase',zkQuorum = '<host:port>', --HBase高可用访问地址。haClusterID = 'ha-xxx', --HBase高可用实例ID。userName  = 'root', --HBase用户名。password = 'root', --HBase密码。columnFamily = '<yourColumnFamily>',tableName = '<yourTableName>',batchSize = '500'
);

备注

  • 在声明维表时,必须要指名主键。
  • 在维表进行JOIN时,ON的条件必须包含所有主键的等值条件。示例中HBase中的主键是row_key
  • HBase企业标准版和HBase性能增强版DDL的区别为连接参数不同:
    • HBase企业标准版:zkQuorum
    • HBase性能增强版:endPoint
    • Blink 3.5.0以上标准版和增强版:zkQuorum

WITH参数

参数 说明 是否必填 备注
type 维表类型 固定值为cloudhbase
zkQuorum HBase集群配置的zk地址,是以逗号(,)分隔的主机列表。 可以在hbase-site.xml文件中查看hbase.zookeeper.quorum相关配置。 仅在HBase企业标准版中生效。
zkNodeParent 集群配置在zk上的路径 可以在hbase-site.xml文件中查看hbase.zookeeper.quorum相关配置。仅在HBase企业标准版中生效。
endPoint HBase地域名称 可在购买的HBase实例控制台中获取。仅在HBase性能增强版中生效。
userName 用户名 仅在HBase性能增强版中生效。
password 密码 仅在HBase性能增强版中生效。
tableName HBase表名
columnFamily 列族名 仅支持插入同一列族。
maxRetryTimes 最大尝试次数 默认值为10次。
partitionedJoin 是否使用joinKey进行分区。 默认值为False。在设置partitionedJoin为True时,使用joinKey进行分区,将数据分发到各JOIN节点,提高缓存命中率。
shuffleEmptyKey 是否将上游EMPTY KEY随机发送到下游节点。 默认值为True。参数取值如下:True:如果上游有多个EMPTY KEY,会将所有EMPTY KEY随机发送到各个JOIN节点。False:如果上游有多个EMPTY KEY,会将所有EMPTY KEY发送至一个JOIN节点。

shuffleEmptyKeypartitionedJoin生效后才能使用。

CACHE参数

参数 说明 是否必填 备注
cache 缓存策略 HBase维表支持以下三种缓存策略:None(默认值):无缓存。LRU:缓存维表里的部分数据。ALL:缓存维表里的所有数据。
cacheSize 缓存大小 当选择LRU缓存策略后,可以设置缓存大小,默认为10000行。
cacheTTLMs 缓存超时时间,单位为毫秒。 当选择LRU缓存策略后,可以设置缓存失效的超时时间,默认不过期。当选择ALL策略,则为缓存加载的间隔时间,默认不重新加载。
cacheReloadTimeBlackList 缓存策略选择ALL时启用。更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。 默认为空。自定义输入格式如下。2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00使用逗号(,)分隔多个黑名单,使用箭头(->)分割黑名单的起始和结束时间。
cacheScanLimit 缓存策略选择ALL时启用。读取全量HBase数据,服务端一次RPC返回给客户端的行数。 默认值为100条。

代码示例

create table source (id   TINYINT,name BIGINT
) with (type = 'random'
);create table dim (id   TINYINT,score BIGINTprimary key(id),PERIOD FOR SYSTEM_TIME
)with(type = 'cloudhbase',zkQuorum = '<yourzkQuorum>',columnFamily = '<yourColumnFamilyName>',tableName = '<yourTableName>'
);CREATE table result_infor(id BIGINT,score BIGINT
)with(type='rds'
);INSERT INTO result_infor
SELECTt.id,w.score
FROM source as t
JOIN dim FOR SYSTEM_TIME AS OF PROCTIME() as w
ON t.id = w.id;

创建MaxCompute维表

DDL定义

CREATE TABLE white_list (id varchar,name varchar,age int,PRIMARY KEY (id), PERIOD FOR SYSTEM_TIME --维表的标识。
) WITH (type = 'odps',endPoint = '<YourEndPoint>',project = '<YourProjectName>',tableName = '<YourtableName>',accessId = '<yourAccessKeyId>',accessKey = '<yourAccessKeySecret>',`partition` = 'ds=2018****',cache = 'ALL'
);

备注

  • 声明维表时,必须要指名主键,MaxCompute维表主键必须具有唯一性,否则会被去重。
  • 在维表进行JOIN时,ON条件必须包含所有主键的等值条件。
  • partition是关键字,需要使用反引号(`)注释,例如partition
  • 如果是分区表,目前不支持将分区列写入到DDL定义中。

WITH参数

参数 说明 是否必填 备注
type 维表类型。 固定值为odps
endPoint MaxCompute服务地址。 请参见Endpoint。
tunnelEndpoint MaxCompute Tunnel服务的连接地址。 请参见Endpoint。
project MaxCompute项目名称。 无。
tableName 表名。 无。
accessId AccessKey ID。 无。
accessKey AccessKey Secret。 无。
partition 分区名。 详见备注
maxRowCount 可加载的最大表格数量。 默认值为100000。如果您的数据超过100000,需要设置maxRowCount参数。建议设定值比实际值大。

备注

固定分区

  • 只存在一个分区MaxCompute表

    例如,如果只存在1个分区列ds,则``partition= 'ds=20180905'表示读ds=20180905分区的数据。

  • 存在多个分区的MaxCompute表

    例如,如果存在2个分区列dshh,则``partition='ds=20180905,hh=*'表示读ds=20180905分区的数据。

    分区过滤时需要声明所有分区的值。例如,上述示例中,只声明``partition= 'ds=20180905',则不会读取任何分区。

非固定分区

  • Blink 2.2.0及以上版本支持``partition= ‘max_pt()’ `功能, 即每次加载所有分区列表中字典序最大的分区。
  • Blink 3.2.2及以上版本支持``partition= 'max_pt_with_done()'功能,即每次加载所有分区列表中字典序最大且伴随有.done的分区。

CACHE参数

参数 说明 备注
cache 缓存策略 详见备注。
cacheSize 缓存大小 可以设置缓存大小,MaxCompute默认缓存值为100000行。
cacheTTLMs 缓存超时时间 单位为毫秒,当cache选择为ALL策略,则为缓存加载的间隔时间,默认为不重新加载。
cacheReloadTimeBlackList 更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。 默认为空,格式为2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00。分隔符的使用情况如下所示:用逗号,来分隔多个黑名单。用箭头->来分割黑名单的起始结束时间。
partitionedJoin 当开启PartitionedJoin优化时,每个并发内存里只缓存维表的部分数据,即该并发上需要的缓存数据。 可选,默认值为false,表示每个并发内存里缓存全量维表数据。

备注

目前MaxCompute维表仅支持ALL策略,必须显式声明。

ALL策略:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查询都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。需要配置缓存更新时间间隔(cacheTTLMs)和更新时间黑名单(cacheReloadTimeBlackList)参数。

  • 因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的至少4倍,具体值与MaxCompute存储压缩算法有关。
  • 在使用超大MaxCompute维表时,如果频繁GC(Allocation Failure)导致作业异常,且在增加维表JOIN节点的内存仍无改善的情况下,建议:
    • Blink 3.6.0及以后版本,设置参数partitionedJoin = ‘true’ ,即打开PartitionedJoin优化。
    • 改为支持LRU 缓存策略的KV型维表,例如云数据库Hbase版维表。

代码示例

CREATE TABLE datahub_input1 (id    BIGINT,name  VARCHAR,age   BIGINT
) with (type='datahub'
);CREATE TABLE odps_dim (name VARCHAR,phoneNumber BIGINT, PRIMARY KEY (name), PERIOD FOR SYSTEM_TIME --维表的标识。
) with (type = 'odps',endPoint = '<yourEndpointName>', project = '<yourProjectName>',tableName = '<yourTableName>',accessId = '<yourAccessId>',accessKey = '<yourAccessPassword>',`partition` = 'ds=20180905', --动态分区、固定分区请参见WITH参数说明。cache = 'ALL'
);CREATE table result_infor(id BIGINT,phoneNumber BIGINT,name VARCHAR
)with(type='print'
);INSERT INTO result_infor
SELECTt.id,w.phoneNumber,t.name
FROM datahub_input1 as t
JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() as w --维表JOIN时必须指定此声明。
ON t.name = w.name;

类型映射

MaxCompute BLINK
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
BOOLEAN BOOLEAN
DATETIME TIMESTAMP
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
DECIMAL DECIMAL
BINARY VARBINARY
STRING VARCHAR

实时计算Flink版MaxCompute维表仅支持上述MaxCompute字段类型。

创建云数据库Redis维表

注意

  • 实时计算Flink版Redis维表仅支持引用Redis数据存储中STRING类型的数据。
  • 实时计算Flink版Redis维表支持自建Redis服务。

语法示例

CREATE TABLE white_list (id VARCHAR,name VARCHAR,PRIMARY KEY (id),  --Redis中的Row Key字段。PERIOD FOR SYSTEM_TIME --维表标识。
) WITH (type = 'redis',host = '<yourHostName>',port = '<yourPort>',password = '<yourPassword>',dbNum = '<yourDatabaseNumber>'
);

注意

  • Redis维表必须声明且只能声明一个主键。
  • 维表JOIN时,ON条件必须包含所有主键的等值条件。
  • Redis维表仅支持声明两个字段,且字段类型必须为VARCHAR。

WITH参数

参数 说明 是否必填 备注
type 维表类型 固定值为redis
host Redis连接地址
port Redis连接端口 默认值为6379。
dbNum 选择操作的数据库 默认值为0。
password Redis密码 默认值为空,不进行权限验证。
hashName Hash模式下的Hash Key名称 默认值为空,实时计算Flink版从Redis中读取STRING类型的数据。

注意事项

通常,Redis维表中的数据类型为STRING类型,即key-value对。如果设置hashName参数,则Redis维表中的数据类型为HASHMAP类型,即key-{field-value}对,其中:

  • keyhashName参数值。
  • field为您在CREATE TABLE中指明的key参数值。
  • valuekey对应的赋值,和STRING类型key-valuevalue语义相同。

CACHE参数

参数 说明 是否必填 备注
cache 缓存策略 参考注意事项。
cacheSize 缓存大小 选择LRU缓存策略后,可以设置缓存大小,默认为10000行。
cacheTTLMs 缓存超时时长 默认缓存不超时,单位为毫秒。可选LRU缓存策略,即设置缓存失效的超时时长。
cacheEmpty 是否缓存空结果 默认值为true。

注意事项

云数据库Redis维表支持以下两种缓存策略:

  • None(默认值):无缓存。

  • LRU:缓存维表里的部分数据。源表来一条数据,系统会先查找Cache,如果没有找到,则去物理维表中查询。

    需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

类型映射

Redis字段类型 实时计算Flink版字段类型
STRING VARCHAR

代码示例

CREATE TABLE event (id VARCHAR, data VARCHAR) with (type = 'random'
);CREATE TABLE white_list (id VARCHAR,name VARCHAR,PRIMARY KEY (id),  --Redis中的Row Key字段。PERIOD FOR SYSTEM_TIME --维表的标识。
) WITH (type = 'redis',host = '<yourRedisHost>',password = '<yourRedisPassword>'
);SELECT e.*, w.*
FROM event AS e
JOIN white_list FOR SYSTEM_TIME AS OF PROCTIME() AS w
ON e.id = w.id;

创建Elasticsearch维表

DDL定义

 CREATE TABLE es_stream_sink(field1 LONG, field2 VARBINARY, field3 VARCHAR,PRIMARY KEY(field1),PERIOD FOR SYSTEM_TIME
) WITH (type ='elasticsearch',endPoint = '<yourEndPoint>',accessId = '<yourUsername>',accessKey = '<yourPassword>',index = '<yourIndex>',typeName = '<yourTypeName>'
);

ES维表支持根据ES的PRIMARY KEY进行PRIMARY KEYUPDATE,且PRIMARY KEY只能为1个字段。

WITH参数

参数 说明 默认值 是否必选
type 维表类型 elasticsearch
endPoint Server地址,例如:http://127.0.0.1:9211。
accessId 创建ES时的登录名。
accessKey 创建ES时的登录密码 。
index 索引名称,类似于数据库Database的名称。
typeName Type名称,类似于数据库的Table名称。
maxRetryTimes 异常重试次数 30
timeout 读取超时时长,单位为毫秒。 600000
discovery 是否开启节点发现。如果开启,客户端每5分钟刷新一次Server List。 false
compression 是否使用GZIP压缩Request Bodies。 true
multiThread 是否开启JestClient多线程。 true

CACHE参数

参数 说明 备注
cache 缓存策略 参考注意事项。
cacheSize 缓存大小 选择LRU缓存策略后,可以设置缓存大小,默认为10000行。
cacheTTLMs 缓存更新时间间隔 参考注意事项。

注意事项

缓存策略支持以下三种:

  • None(默认值):无缓存。

  • LRU:缓存维表里的部分数据。源表来一条数据,系统会先查找Cache,如果没有找到,则去物理维表中查询。

    需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

  • ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查询都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

默认缓存不超时,单位为毫秒。不同缓存更新时间策略下的功能如下:

  • LRU:设置缓存失效的超时时长。
  • ALL:设置缓存加载的间隔时长,默认不重新加载。

创建Phoenix5维表

语法示例

create table US_POPULATION_DIM (`STATE` varchar,CITY varchar,POPULATION BIGINT,PRIMARY KEY (`STATE`, CITY),PERIOD FOR SYSTEM_TIME
) WITH (type = 'PHOENIX5',serverUrl = '<YourServerUrl>',tableName = '<YourTableName>'
);

WITH参数

参数 说明 是否必填 备注
type 维表类型 固定值为PHOENIX5
serverUrl Phoenix5的Query Server地址。如果Phoenix5是在集群中创建的,则serverUrl是负载均衡服务的URL地址;如果Phoenix5是在单机中创建的,则serverUrl是单机的URL地址。 serverUrl格式为http://host:port,其中:host为Phoenix5服务的域名。port为Phoenix5服务的端口号,固定值为8765。
tableName Phoenix5表名 Phoenix5表名格式为SchemaName.TableName,其中:SchemaName为模式名,可以为空,即不写模式名,仅写表名,表示使用数据库的默认模式。TableName为表名。

CACHE参数

参数 说明 是否必填 备注
cache 缓存策略 详见注意事项。
cacheSize 缓存大小 选择LRU缓存策略后,可以设置缓存大小,默认为10000行。
cacheTTLMs 缓存超时时间,单位为毫秒。 当选择LRU缓存策略后,可以设置缓存失效的超时时间,默认不过期。当选择ALL策略,则为缓存加载的间隔时间,默认不重新加载。
cacheReloadTimeBlackList 更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。 可选,默认空,格式为 ‘2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00’。用逗号(,)来分隔多个黑名单,用箭头(->)来分割黑名单的起始或结束时间。

注意事项

目前Phoenix5维表支持以下三种缓存策略:

  • None(默认值):无缓存。

  • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。

    需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

  • ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查询都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

    适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。

    需要配置相关参数:缓存更新时间间隔(cacheTTLMs),更新时间黑名单(cacheReloadTimeBlackList)。

    因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。

代码示例

CREATE TABLE datahub_input1 (id  BIGINT,name  VARCHAR,age   BIGINT
) WITH (type='datahub'
);create table phoneNumber(name VARCHAR,phoneNumber BIGINT,primary key(name),PERIOD FOR SYSTEM_TIME--定义维表的变化周期。
)with(type='PHOENIX5'
);CREATE table result_infor(id BIGINT,phoneNumber BIGINT,name VARCHAR
)with(type='rds'
);INSERT INTO result_infor
SELECTt.id,w.phoneNumber,t.name
FROM datahub_input1 as t
JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w --维表JOIN时必须指定该声明。
ON t.name = w.name;

创建云原生数据仓库AnalyticDB MySQL版3.0维表

语法示例

CREATE TABLE dim_ads(`name` VARCHAR,id VARCHAR,PRIMARY KEY (`name`),PERIOD FOR SYSTEM_TIME
)with(type='ADB30',url='jdbc:mysql://<内网地址>/<databaseName>',tableName='xxx',userName='xxx',password='xxx'
);

注意事项

  • 在声明一个维表时,必须指明主键。
  • 在维表进行JOIN时,ON条件必须包含所有主键的等值条件。
  • 云原生数据仓库AnalyticDB MySQL版的主键可以定义为表的主键或唯一索引列。

WITH参数

参数 说明 是否必选 备注
type 维表类型。 固定值为ADB30。
url 云原生数据仓库AnalyticDB MySQL版数据库地址。 云原生数据仓库AnalyticDB MySQL版数据库地址。示例:url='jdbc:mysql://databaseName****-cn-shenzhen-a.ads.aliyuncs.com:10014/databaseName'
tableName 表名。 无。
userName 用户名。 无。
password 密码。 无。
maxRetryTimes 写入重试次数。 默认值为3。

CACHE参数

参数 说明 是否必填 备注
cache 缓存策略 详见注意事项。
cacheSize 缓存大小 当选择LRU缓存策略后,可以设置缓存大小,默认为10000行。
cacheTTLMs 缓存更新时间间隔。系统会根据您设置的缓存更新时间间隔,重新加载一次维表中的最新数据,保证源表能JOIN到维表的最新数据。 单位为毫秒。默认不设置此参数,表示不重新加载维表中的新数据。
cacheReloadTimeBlackList 更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。 可选,默认空,格式为 ‘2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00’ 。其中分割符使用情况如下:用逗号(,)来分隔多个黑名单。用箭头(->)来分割黑名单的起始结束时间。
partitionedJoin 是否开启partitionedJoin。在开启partitionedJoin优化时,主表会在关联维表前,先按照Join KEY进行Shuffle。 默认情况下为false,表示不开启partitionedJoin。
maxJoinRows 主表中每一条数据查询维表时,匹配后最多返回的结果数。 默认值为1024。如果您可以预估一条数据对应的维表数据最多为n条,则可以设置maxJoinRows=‘n’,以确保实时计算匹配处理效率。

注意事项

目前云原生数据仓库AnalyticDB MySQL版3.0支持以下三种缓存策略:

  • None(默认值):无缓存。

  • LRU:缓存维表里的部分数据。源表来一条数据,系统会先查找Cache,如果没有找到,则去物理维表中查询。

    需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

  • ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查询都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

    适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。

    需要配置相关参数:缓存更新时间间隔(cacheTTLMs),更新时间黑名单(cacheReloadTimeBlackList)。

    因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。

    对于数据量比较大的维表,选择CACHE ALL时,可能会出现OOM或者Full GC耗时很久的情况,针对这个问题,可以选择以下两种解决方式:

    • 对于支持Cache All策略的维表,开启PartitionedJoin优化。3.6.0版本之前,每个并发默认加载维表全量数据。3.6.0版本之后,CACHE ALL策略支持PartitionedJoin优化。开启PartitionJoin优化后,每个并发只缓存自己并发所需要的数据。
    • 使用HBase或者RDS等Key-Value类型的维表。

使用partitionedJoin优化前,需要您手动设置partitionedJoin = ‘true’

进行Join时,主表输入一条数据,对应维表匹配后返回的结果总数受maxJoinRows参数限制。

开启partitionedJoin的优点:

  • 在缓存策略为LRU时,可以提高缓存命中率。
  • 在缓存策略为ALL时,节省内存资源,因为每个并发只缓存自己并发所需要的数据。

代码示例

CREATE TABLE datahub_input1 (id      BIGINT,name    VARCHAR,age     BIGINT
) WITH (type='datahub'
);create table phoneNumber (name VARCHAR,phoneNumber BIGINT,primary key(name),PERIOD FOR SYSTEM_TIME--维表标识。
) with (type='ADB30'
);CREATE table result_infor (id BIGINT,phoneNumber BIGINT,name VARCHAR
) with (type='rds'
);INSERT INTO result_infor
SELECTt.id,w.phoneNumber,t.name
FROM datahub_input1 as t
JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w --维表JOIN时必须指定该声明。
ON t.name = w.name;

创建Oracle维表

DDL定义

CREATE TABLE oracle_dim(employee_id BIGINT,phone_number BIGINT,dollar DOUBLE,PRIMARY KEY (employee_id)
) WITH (type = 'oracle_dim',url = '<yourUrl>',userName = '<yourUserName>',password = '<yourPassword>',tableName = '<yourTableName>',cache = 'ALL'
);

WITH 参数

参数 描述 是否必选 示例值
type 维表类型 固定值为oracle_dim。
url JDBC的Oracle地址 jdbc:oracle:thin:@ip:port:sid
userName 数据库的用户名
password 数据库的密码
tableName 表名称
maxRetryTimes 读取维表异常重试的最大次数 默认值为10。

CACHE参数

参数 描述 是否必选 示例值
cache 缓存策略 详见注意事项。
cacheSize 缓存大小,即缓存多少行数据。 当缓存策略选择LRU时,可以设置缓存大小,默认值为10000行。
cacheTTLMs 缓存超时时间,单位为毫秒。 缓存策略选择LRU时,可以设置缓存失效时间,默认不过期。当缓存策略选择ALL时,缓存失效时间为缓存重新加载的间隔时间,默认不重新加载。
cacheReloadTimeBlackList 更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。 默认空,格式为 ‘2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00’ 。分割符如下:用逗号(,)来分隔多个黑名单。用箭头(->)来分割黑名单的起始结束时间。
maxJoinRows 一对多连接时,左表一条记录连接右表的最大记录数。 默认值为1024。一对多连接的记录数过多时,需要调cache的内存。因为cacheSize限制的是左表key个数,单条左表记录对应的右表记录较多时,可能会极大地影响流任务的性能。

注意事项

目前Oracle维表支持以下三种缓存策略:

  • None(默认值):无缓存。

  • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。

    需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

  • ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则关键健不存在,并在Cache过期后重新加载一遍全量Cache。

    适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。

    需要配置相关参数:缓存更新时间间隔(cacheTTLMs),更新时间黑名单(cacheReloadTimeBlackList)。

    因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。

类型映射

Oracle字段类型 实时计算Flink字段类型
CHAR、VARCHAR、VARCHAR2 VARCHAR
FLOAT DOUBLE
NUMBER BIGINT
DECIMAL DECIMAL

代码示例

CREATE TABLE oracle_source (employee_id BIGINT,employee_name VARCHAR,employee_age INT
) WITH (type ='random'
);CREATE TABLE oracle_dim (employee_id BIGINT,phone_number BIGINT,dollar DOUBLE,PRIMARY KEY (employee_id)
) WITH (type = 'oracle_dim',url = '<yourUrl>',userName = '<yourUserName>',password = '<yourPassword>',tableName = '<yourTableName>',cache = 'ALL'
);CREATE TEMPORARY TABLE oracle_sink (employee_id BIGINT,phone_number BIGINT,employee_name VARCHAR
) WITH (type = 'oracle',url = '<yourUrl>',userName = '<yourUserName>',password = '<yourPassword>',tableName = '<yourTableName>'
);INSERT INTO oracle_sink
SELECT t.employee_id, w.phone_number, t.employee_name
FROM oracle_source as t JOIN oracle_dim FOR SYSTEM_TIME AS OF PROCTIME() as w
ON t.employee_id = w.employee_id;

Blink SQL之创建数据维表相关推荐

  1. Blink SQL之创建数据结果表

    Blink创建数据结果表 概述 Blink使用CREATE TABLE作为输出结果数据的格式定义,同时定义数据如何写入到目的数据结果表. 结果表有两种类型: Append类型:输出存储是日志系统.消息 ...

  2. flink sql 知其所以然(二)| 自定义 redis 数据维表(附源码)

    感谢您的关注  +  点赞 + 再看,对博主的肯定,会督促博主持续的输出更多的优质实战内容!!! 1.序篇-本文结构 背景篇-为啥需要 redis 维表 目标篇-做 redis 维表的预期效果是什么 ...

  3. 使用SQL语句创建数据表(SQL Server)

    数据库 表的创建(SQL Server) 文章目录 数据库 表的创建(SQL Server) 使用SQL语句创建数据表 使用SQL语句创建数据表 CREATE TABLE的语法格式如下 databas ...

  4. 【Excel 2013 数据透视表 学习】一、创建数据透视表

    1 数据透视表 是Excel中数据处理分析工具. 用途: 1. 快速分类汇总.比较大量数据. 2. 快速变化统计分析维度查看统计结果. 数据透视表不仅综合了数据排序.筛选.组合及分类汇总等数据分析方法 ...

  5. SQL Server中数据透视表的Python脚本

    This article talks about Python scripts for creating pivot tables in multiple ways. 本文讨论了以多种方式创建数据透视 ...

  6. 【转载】使用Pandas创建数据透视表

    使用Pandas创建数据透视表 本文转载自:蓝鲸的网站分析笔记 原文链接:使用Pandas创建数据透视表 目录 pandas.pivot_table() 创建简单的数据透视表 增加一个行维度(inde ...

  7. 动态新增表字段_制作动态的数据透视表(一):定义名称法创建数据透视表

    --施瓦辛格:没有跌倒过的人不会成功. 我们的日常工作中,会经常遇到一种情况:创建好数据透视表后,有其他被遗漏的数据内容需要重新插入到数据源后,有其他被遗漏的数据内容需要重新插入到数据源中:或者是需要 ...

  8. 职称计算机excel2015年,【2015年职称计算机Excel重点精讲:创建数据透视表】- 环球网校...

    [摘要]2015年职称计算机Excel重点精讲:创建数据透视表 利用Excel提供的数据透视表向导,可以很方便地建立数据透视表.以图8-1所示的年终津 贴分配表为例,要求统计各部门的津贴总和以及各部门 ...

  9. 如何创建数据透视表的方法

    内容提要:本文介绍excel中如何创建数据透视表的方法,并分为数据区域和列表区域来分别介绍,并给出数据透视表教程链接.对Excel感兴趣的朋友可加Excel学习交流群:284029260(www.it ...

最新文章

  1. 疫情可视化,基于知识图谱的AI“战疫”平台如何做?
  2. 文字超长自动省略,以...代替,CSS实现
  3. 关于jenkins的安装及自动部署
  4. python django部署docker_如何Docker化Python Django应用程序
  5. android rfid 数据解析_手持机是什么?RFID手持机是什么?
  6. Linux中的软件源详解,Ubuntu Linux 软件源详解
  7. cesium js 路径_vue2.0项目集成Cesium的实现方法
  8. wincc7的常用c语言,wincc几个常用c语言编程-20210324073153.docx-原创力文档
  9. 问题六十八:着色模型(shading model)(1)——反射模型(reflection model)(2.2)——高光反射(specular reflection)
  10. 产品原型图设计Axure教程-CSDN就业班-专题视频课程
  11. zabbix开启SNMPTrap功能
  12. python工厂模式和单例模式_python之单例模式和工厂模式
  13. Unity游戏开发经验点滴
  14. 说出来你可能不信,我用excel就能做一张高端的统计报表
  15. 橡皮擦的英语_英语单词这样写,老师想扣卷面分都难!(建议收藏学习)
  16. bingo小游戏介绍以及概率问题
  17. YIT-CTF—Web
  18. 常见图片格式及其区别小结
  19. JavaScript 判断是否是数字 isFinite() Number.isFinite()
  20. JavaWeb学习(第一天)-1-HTML部分

热门文章

  1. 【常用自定义函数001】VBA高容错性地打开文件
  2. idea中查找的快捷键
  3. 二十、Gtk4-GtkMenuButton, accelerators, font, pango and gsettings
  4. 互联网+重点路桥建设智慧工地综合监管平台
  5. 各种梯度下降法原理与Dropout
  6. R语言做Scheirer–Ray–Hare检验
  7. nginx重写url
  8. svchost.exe是什么进程,svchost.exe是病毒吗?
  9. 较简单的c语言航班管理系统
  10. c语言if语句写在参数里,c语言ifthen语句