95-134-110-源码-维表-Flink 1.9.0 维表实现
1.视界
2.概述
请参考:95-134-100-源码-维表-实现维表需要的Aysnc IO
Flink 1.9 中维表功能来源于新加入的Blink中的功能,如果你要使用该功能,那就需要自己引入 Blink 的 Planner,而不是引用社区的 Planner。
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
由于新合入的 Blink 相关功能,使得 Flink 1.9 实现维表功能很简单,只要自定义实现 LookupableTableSource 接口,同时实现里面的方法就可以进行,下面来分析一下 LookupableTableSource的代码:
public interface LookupableTableSource<T> extends TableSource<T> { TableFunction<T> getLookupFunction(String[] lookupKeys); AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys); boolean isAsyncEnabled();}
可以看到 LookupableTableSource 这个接口中有三个方法
isAsyncEnabled 方法主要表示该表是否支持异步访问外部数据源获取数据,当返回 true 时,那么在注册到 TableEnvironment 后,使用时会返回异步函数进行调用,当返回 false 时,则使同步访问函数。
getLookupFunction 方法返回一个同步访问外部数据系统的函数,什么意思呢,就是你通过 Key 去查询外部数据库,需要等到返回数据后才继续处理数据,这会对系统处理的吞吐率有影响。
getAsyncLookupFunction 方法则是返回一个异步的函数,异步访问外部数据系统,获取数据,这能极大的提升系统吞吐率。
2.2 同步访问函数getLookupFunction
getLookupFunction 会返回同步方法,这里你需要自定义 TableFuntion 进行实现,TableFunction 本质是 UDTF,输入一条数据可能返回多条数据,也可能返回一条数据。用户自定义 TableFunction 格式如下:
public class MyLookupFunction extends TableFunction<Row> { Jedis jedis; @Override public void open(FunctionContext context) throws Exception { jedis = new Jedis("172.16.44.28"); } public void eval(Object... paramas) { String key = "userInfo:userId:" + paramas[0].toString() + ":userName"; String value = jedis.get(key); collect(Row.of(key, value)); }}
open 方法在进行初始化算子实例的进行调用,异步外部数据源的client要在类中定义为 transient,然后在 open 方法中进行初始化,这样每个任务实例都会有一个外部数据源的 client。防止同一个 client 多个任务实例调用,出现线程不安全情况。
eval 则是 TableFunction 最重要的方法,它用于关联外部数据。当程序有一个输入元素时,就会调用eval一次,用户可以将产生的数据使用 collect() 进行发送下游。paramas 的值为用户输入元素的值,比如在 Join 的时候,使用 A.id = B.id and A.name = b.name, B 是维表,A 是用户数据表,paramas 则代表 A.id,A.name 的值。
2.3 异步访问函数
getAsyncLookupFunction 会返回异步访问外部数据源的函数,如果你想使用异步函数,前提是 LookupableTableSource 的 isAsyncEnabled 方法返回 true 才能使用。使用异步函数访问外部数据系统,一般是外部系统有异步访问客户端,如果没有的话,可以自己使用线程池异步访问外部系统。至于为什么使用异步访问函数,无非就是为了提高程序的吞吐量,不需要每条记录访问返回数据后,才去处理下一条记录。异步函数格式如下:
public class MyAsyncLookupFunction extends AsyncTableFunction<Row> { private transient RedisAsyncCommands<String, String> async; @Override public void open(FunctionContext context) throws Exception { RedisClient redisClient = RedisClient.create("redis://172.16.44.28:6379/0"); StatefulRedisConnection<String, String> connection = redisClient.connect(); async = connection.async(); } public void eval(CompletableFuture<Collection<Row>> future, Object... params) { redisFuture.thenAccept(new Consumer<String>() { @Override public void accept(String value) { future.complete(Collections.singletonList(Row.of(key, value))); } }); }}
维表异步访问函数总体和同步函数实现类似,这里说一下注意点:
外部数据源异步客户端(如RedisAsyncCommands)初始化。如果是线程安全的(多个客户端一起使用),你可以不加 transient 关键字,初始化一次。否则,你需要加上 transient,不对其进行初始化,而在 open 方法中,为每个 Task 实例初始化一个。
eval 方法中多了一个 CompletableFuture,当异步访问完成时,需要调用其方法进行处理.
为了减少每条数据都去访问外部数据系统,提高数据的吞吐量,一般我们会在同步函数和异步函数中加入缓存,如果以前某个关键字访问过外部数据系统,我们将其值放入到缓存中,在缓存没有失效之前,如果该关键字再次进行处理时,直接先访问缓存,有就直接返回,没有再去访问外部数据系统,然后在进行缓存,进一步提升我们实时程序处理的吞吐量。
一般缓存类型有以下几种类型:
数据全部缓存,定时更新。
LRU Cache,设置一个超时时间。
用户自定义缓存。
以下是demo
从HDFS中读取数据源,join Redis维表,最终结果落到kafka。服务器上8个并行度大概可以达到60w+/s。
全部依赖项:
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.9.0</flink.version> <java.version>1.8</java.version> <scala.binary.version>2.11</scala.binary.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!--1.9 SQL--> <!-- Either... --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!--<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.30</version> </dependency>--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.16</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-wikiedits_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-compatibility_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.5.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.5.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.5.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-rabbitmq_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-avro</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.1.5</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.2</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.9.5</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-metrics-dropwizard</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.binary.version}</artifactId> <version>0.11.0.0</version> </dependency> <!--redis异步连接--> <dependency> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> <version>5.0.5.RELEASE</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.24.Final</version> </dependency> </dependencies>
import io.lettuce.core.RedisClient;import io.lettuce.core.RedisFuture;import io.lettuce.core.api.StatefulRedisConnection;import io.lettuce.core.api.async.RedisAsyncCommands;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.typeutils.RowTypeInfo;import org.apache.flink.table.functions.AsyncTableFunction;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.types.Row; import java.util.Collection;import java.util.Collections;import java.util.concurrent.CompletableFuture;import java.util.function.Consumer; public class MyAsyncLookupFunction extends AsyncTableFunction<Row> { private final String[] fieldNames; private final TypeInformation[] fieldTypes; private transient RedisAsyncCommands<String, String> async; public MyAsyncLookupFunction(String[] fieldNames, TypeInformation[] fieldTypes) { this.fieldNames = fieldNames; this.fieldTypes = fieldTypes; } @Override public void open(FunctionContext context) { //配置redis异步连接 RedisClient redisClient = RedisClient.create("redis://172.16.44.28:6379/0"); StatefulRedisConnection<String, String> connection = redisClient.connect(); async = connection.async(); } //每一条流数据都会调用此方法进行join public void eval(CompletableFuture<Collection<Row>> future, Object... paramas) { //表名、主键名、主键值、列名 String[] info = {"userInfo", "userId", paramas[0].toString(), "userName"}; String key = String.join(":", info); RedisFuture<String> redisFuture = async.get(key); redisFuture.thenAccept(new Consumer<String>() { @Override public void accept(String value) { future.complete(Collections.singletonList(Row.of(key, value))); //todo// BinaryRow row = new BinaryRow(2); } }); } @Override public TypeInformation<Row> getResultType() { return new RowTypeInfo(fieldTypes, fieldNames); } public static final class Builder { private String[] fieldNames; private TypeInformation[] fieldTypes; private Builder() { } public static Builder getBuilder() { return new Builder(); } public Builder withFieldNames(String[] fieldNames) { this.fieldNames = fieldNames; return this; } public Builder withFieldTypes(TypeInformation[] fieldTypes) { this.fieldTypes = fieldTypes; return this; } public MyAsyncLookupFunction build() { return new MyAsyncLookupFunction(fieldNames, fieldTypes); } }}
import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.typeutils.RowTypeInfo;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.TableSchema;import org.apache.flink.table.functions.AsyncTableFunction;import org.apache.flink.table.functions.TableFunction;import org.apache.flink.table.sources.LookupableTableSource;import org.apache.flink.table.sources.StreamTableSource;import org.apache.flink.table.types.DataType;import org.apache.flink.table.types.utils.TypeConversions;import org.apache.flink.types.Row; public class RedisAsyncLookupTableSource implements StreamTableSource<Row>, LookupableTableSource<Row> { private final String[] fieldNames; private final TypeInformation[] fieldTypes; public RedisAsyncLookupTableSource(String[] fieldNames, TypeInformation[] fieldTypes) { this.fieldNames = fieldNames; this.fieldTypes = fieldTypes; } //同步方法 @Override public TableFunction<Row> getLookupFunction(String[] strings) { return null; } //异步方法 @Override public AsyncTableFunction<Row> getAsyncLookupFunction(String[] strings) { return MyAsyncLookupFunction.Builder.getBuilder() .withFieldNames(fieldNames) .withFieldTypes(fieldTypes) .build(); } //开启异步 @Override public boolean isAsyncEnabled() { return true; } @Override public DataType getProducedDataType() { return TypeConversions.fromLegacyInfoToDataType(new RowTypeInfo(fieldTypes, fieldNames)); } @Override public TableSchema getTableSchema() { return TableSchema.builder() .fields(fieldNames, TypeConversions.fromLegacyInfoToDataType(fieldTypes)) .build(); } @Override public DataStream<Row> getDataStream(StreamExecutionEnvironment environment) { throw new UnsupportedOperationException("do not support getDataStream"); } public static final class Builder { private String[] fieldNames; private TypeInformation[] fieldTypes; private Builder() { } public static Builder newBuilder() { return new Builder(); } public Builder withFieldNames(String[] fieldNames) { this.fieldNames = fieldNames; return this; } public Builder withFieldTypes(TypeInformation[] fieldTypes) { this.fieldTypes = fieldTypes; return this; } public RedisAsyncLookupTableSource build() { return new RedisAsyncLookupTableSource(fieldNames, fieldTypes); } }}
import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.typeutils.RowTypeInfo;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.types.Row;import org.junit.Test; import java.util.Properties; public class LookUpAsyncTest { @Test public void test() throws Exception { LookUpAsyncTest.main(new String[]{}); } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //env.setParallelism(1); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); final ParameterTool params = ParameterTool.fromArgs(args); String fileName; if (params.get("f") != null) fileName = params.get("f"); else fileName = "/flink/userClick_Random_100W"; DataStream<String> source = env.readTextFile("hdfs://172.16.44.28:8020" + fileName, "UTF-8"); TypeInformation[] types = new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG}; String[] fields = new String[]{"id", "user_click", "time"}; RowTypeInfo typeInformation = new RowTypeInfo(types, fields); DataStream<Row> stream = source.map(new MapFunction<String, Row>() { private static final long serialVersionUID = 2349572544179673349L; @Override public Row map(String s) { String[] split = s.split(","); Row row = new Row(split.length); for (int i = 0; i < split.length; i++) { Object value = split[i]; if (types[i].equals(Types.STRING)) { value = split[i]; } if (types[i].equals(Types.LONG)) { value = Long.valueOf(split[i]); } row.setField(i, value); } return row; } }).returns(typeInformation); tableEnv.registerDataStream("user_click_name", stream, String.join(",", typeInformation.getFieldNames()) + ",proctime.proctime"); RedisAsyncLookupTableSource tableSource = RedisAsyncLookupTableSource.Builder.newBuilder() .withFieldNames(new String[]{"id", "name"}) .withFieldTypes(new TypeInformation[]{Types.STRING, Types.STRING}) .build(); tableEnv.registerTableSource("info", tableSource); String sql = "select t1.id,t1.user_click,t2.name" + " from user_click_name as t1" + " join info FOR SYSTEM_TIME AS OF t1.proctime as t2" + " on t1.id = t2.id"; Table table = tableEnv.sqlQuery(sql); DataStream<Row> result = tableEnv.toAppendStream(table, Row.class); // result.print().setParallelism(1); DataStream<String> printStream = result.map(new MapFunction<Row, String>() { @Override public String map(Row value) throws Exception { return value.toString(); } }); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "172.16.12.148:9094"); FlinkKafkaProducer011<String> kafkaProducer = new FlinkKafkaProducer011<>(// broker list "user_click_name", // target topic new SimpleStringSchema(), properties); printStream.addSink(kafkaProducer); tableEnv.execute(Thread.currentThread().getStackTrace()[1].getClassName()); }}
参考:https://zhuanlan.zhihu.com/p/79800113
参考:https://blog.csdn.net/u012554509/article/details/100533749
95-134-110-源码-维表-Flink 1.9.0 维表实现相关推荐
- 【Flink源码篇】Flink 1.15.0源码编译
目录 1. 下载源码并解压 2. Flink项目配置 3. 源码编译 4. 编译问题记录 5. IDEA调试Flink程序 1. 下载源码并解压 从github下载Flink的源码:https://g ...
- 数据结构源码笔记(C语言):线性表的单链表示
/* LinkList.c*/ /*线性表的单链表示:函数实现*/#include <stdio.h> #include <stdlib.h> //#include " ...
- 数据结构源码笔记(C语言):线性表的顺序表示
/* SeqList.c*/ /*线性表的顺序表示:函数实现*/#include <stdio.h> #include <stdlib.h> //#include " ...
- 直播视频app源码,Android 点击生成二维码
直播视频app源码,Android 点击生成二维码实现的相关代码 activity.xml代码如下: <?xml version="1.0" encoding="u ...
- 插件一:JAVA微信砍价活动源码分享[商品帮砍到0元,免费领取奖品]
插件一:微信砍价活动源码分享 [商品帮砍到0元,免费领取奖品] 活动描述: 砍价活动即公众号向粉丝推广的0价赠商品(或优惠价购商品)活动,用户通过分享好友帮其砍价,可将价格从原价一路砍到底价,并抢得名 ...
- 【Android 10 源码】healthd 模块 HAL 2.0 分析
Android 9 引入了从 health@1.0 HAL 升级的主要版本 android.hardware.health HAL 2.0.这一新 HAL 具有以下优势: 框架代码和供应商代码之间的区 ...
- [android源码下载索引贴】微信+二维码那都不是事......
微信: Android 简单仿微信朋友圈布局2014/09/02 http://www.eoeandroid.com/thread-542738-1-1.html [代码片段] 高仿摇一摇效果 201 ...
- 云客Drupal源码分析之数据库Schema及创建数据表
本主题是<云客Drupal源码分析之数据库系统及其使用>的补充,便于查询,所以独立成一个主题 讲解数据库系统如何操作Schema(创建修改数据库.数据表.字段:判断它们的存在性等等),以及 ...
- hbase源码系列(八)从Snapshot恢复表
在看这一章之前,建议大家先去看一下snapshot的使用.这一章是上一章snapshot的续集,上一章了讲了怎么做snapshot的原理,这一章就怎么从snapshot恢复表. restoreSnap ...
最新文章
- 10_史上最全的Markdown使用教程(没有之一)(20190115)
- [YTU]_2626( B 统计程序设计基础课程学生的平均成绩)
- jquery中$运算符的后代选择器
- 时序预测:从两篇高影响力的论文谈起
- 【OpenCV】直方图应用:直方图均衡化,直方图匹配,对比直方图
- hbase 页面访问_HBase
- 第五人格服务器维修中怎么进,第五人格进不去怎么办 游戏进不去解决方法详解[多图]...
- docker ubuntu16.04镜像下安装cowrie蜜罐记录
- 前5月全国快递业务量累计完成396.5亿件 同比增50.1%
- 管理博文Hive大数据-Mysql的安装和启动---大数据之Hive工作笔记0007
- Django-----模板嵌套
- BNU29140 Taikotaiko(概率)
- 解决比较Oracle中CLOB字段问题
- 2022年4月30号Mysql语句增删改查(CRUB)重在实操。
- linux瘦身软件下载,Linux系统瘦身裁剪 ——测试版
- 【Latex】Latex小白入门(4)——Latex中特殊符号的输入
- 7z 头部错误 数据错误_7z解压软件(7-zip)解压错误怎么办?
- 流利阅读day1 Dysmorphia
- 史上最强模型 GPT-4 上线:一张手绘草图能生一个网站、60 秒搞定一个游戏开发!
- 完全格式化硬盘(删除EFI分区)
热门文章
- 马云再谈教育:未来的学校学的不仅仅是知识 更重要的是学习的能力
- 五菱汽车:并不知悉导致股价及成交量上升的任何原因
- 疫情后全国热门博物馆榜单出炉 第一名竟不是故宫
- Redmi发布98寸电视:屏占比98.8%、价格仅为友商1/5
- 5G商用将满一年,6G研发开始了...
- 悔创阿里杰克马,毫无成就孙正义!孙正义对话马云:马云才是自己的贵人!
- 刘海变挖孔!小米高管:明年弹出式全面屏几乎没有了
- 中国制造特斯拉亮相 中文车尾标亮了!网友:好抠吗?
- 骁龙855加持!OPPO Reno正面照揭晓:边框窄得吓人
- 面试热问——你在前一份工作(实习)学到什么?