1.视界

2.概述

请参考:95-134-100-源码-维表-实现维表需要的Aysnc IO

Flink 1.9 中维表功能来源于新加入的Blink中的功能,如果你要使用该功能,那就需要自己引入 Blink 的 Planner,而不是引用社区的 Planner。

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

由于新合入的 Blink 相关功能,使得 Flink 1.9 实现维表功能很简单,只要自定义实现 LookupableTableSource 接口,同时实现里面的方法就可以进行,下面来分析一下 LookupableTableSource的代码:

public interface LookupableTableSource<T> extends TableSource<T> {     TableFunction<T> getLookupFunction(String[] lookupKeys);     AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys);     boolean isAsyncEnabled();}

可以看到 LookupableTableSource 这个接口中有三个方法

isAsyncEnabled 方法主要表示该表是否支持异步访问外部数据源获取数据,当返回 true 时,那么在注册到 TableEnvironment 后,使用时会返回异步函数进行调用,当返回 false 时,则使同步访问函数。

getLookupFunction 方法返回一个同步访问外部数据系统的函数,什么意思呢,就是你通过 Key 去查询外部数据库,需要等到返回数据后才继续处理数据,这会对系统处理的吞吐率有影响。

getAsyncLookupFunction 方法则是返回一个异步的函数,异步访问外部数据系统,获取数据,这能极大的提升系统吞吐率。

2.2 同步访问函数getLookupFunction

getLookupFunction 会返回同步方法,这里你需要自定义 TableFuntion 进行实现,TableFunction 本质是 UDTF,输入一条数据可能返回多条数据,也可能返回一条数据。用户自定义 TableFunction 格式如下:

public class MyLookupFunction extends TableFunction<Row> {    Jedis jedis;    @Override    public void open(FunctionContext context) throws Exception {        jedis = new Jedis("172.16.44.28");    }    public void eval(Object... paramas) {        String key = "userInfo:userId:" + paramas[0].toString() + ":userName";        String value = jedis.get(key);        collect(Row.of(key, value));    }}

open 方法在进行初始化算子实例的进行调用,异步外部数据源的client要在类中定义为 transient,然后在 open 方法中进行初始化,这样每个任务实例都会有一个外部数据源的 client。防止同一个 client 多个任务实例调用,出现线程不安全情况。

eval 则是 TableFunction 最重要的方法,它用于关联外部数据。当程序有一个输入元素时,就会调用eval一次,用户可以将产生的数据使用 collect() 进行发送下游。paramas 的值为用户输入元素的值,比如在 Join 的时候,使用 A.id = B.id and A.name = b.name, B 是维表,A 是用户数据表,paramas 则代表 A.id,A.name 的值。

2.3 异步访问函数

getAsyncLookupFunction 会返回异步访问外部数据源的函数,如果你想使用异步函数,前提是 LookupableTableSource 的 isAsyncEnabled 方法返回 true 才能使用。使用异步函数访问外部数据系统,一般是外部系统有异步访问客户端,如果没有的话,可以自己使用线程池异步访问外部系统。至于为什么使用异步访问函数,无非就是为了提高程序的吞吐量,不需要每条记录访问返回数据后,才去处理下一条记录。异步函数格式如下:

public class MyAsyncLookupFunction extends AsyncTableFunction<Row> {    private transient RedisAsyncCommands<String, String> async;    @Override    public void open(FunctionContext context) throws Exception {        RedisClient redisClient = RedisClient.create("redis://172.16.44.28:6379/0");        StatefulRedisConnection<String, String> connection = redisClient.connect();        async = connection.async();    }    public void eval(CompletableFuture<Collection<Row>> future, Object... params) {        redisFuture.thenAccept(new Consumer<String>() {            @Override            public void accept(String value) {                future.complete(Collections.singletonList(Row.of(key, value)));            }        });    }}

维表异步访问函数总体和同步函数实现类似,这里说一下注意点:

  1. 外部数据源异步客户端(如RedisAsyncCommands)初始化。如果是线程安全的(多个客户端一起使用),你可以不加 transient 关键字,初始化一次。否则,你需要加上 transient,不对其进行初始化,而在 open 方法中,为每个 Task 实例初始化一个。

  2. eval 方法中多了一个 CompletableFuture,当异步访问完成时,需要调用其方法进行处理.

为了减少每条数据都去访问外部数据系统,提高数据的吞吐量,一般我们会在同步函数和异步函数中加入缓存,如果以前某个关键字访问过外部数据系统,我们将其值放入到缓存中,在缓存没有失效之前,如果该关键字再次进行处理时,直接先访问缓存,有就直接返回,没有再去访问外部数据系统,然后在进行缓存,进一步提升我们实时程序处理的吞吐量。

一般缓存类型有以下几种类型:
数据全部缓存,定时更新。
LRU Cache,设置一个超时时间。
用户自定义缓存。

以下是demo

从HDFS中读取数据源,join Redis维表,最终结果落到kafka。服务器上8个并行度大概可以达到60w+/s。

全部依赖项:

<properties>        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>        <flink.version>1.9.0</flink.version>        <java.version>1.8</java.version>        <scala.binary.version>2.11</scala.binary.version>        <maven.compiler.source>${java.version}</maven.compiler.source>        <maven.compiler.target>${java.version}</maven.compiler.target>    </properties>        <dependencies>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-java</artifactId>            <version>${flink.version}</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>            <version>${flink.version}</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-scala_${scala.binary.version}</artifactId>            <version>${flink.version}</version>        </dependency>          <!--1.9 SQL-->        <!-- Either... -->        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>            <version>${flink.version}</version>        </dependency>        <!--<dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>            <version>${flink.version}</version>        </dependency>-->        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>            <version>${flink.version}</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>            <version>${flink.version}</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-table-common</artifactId>            <version>${flink.version}</version>        </dependency>         <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-clients_${scala.binary.version}</artifactId>            <version>${flink.version}</version>        </dependency>        <!-- <dependency>             <groupId>mysql</groupId>             <artifactId>mysql-connector-java</artifactId>             <version>5.1.30</version>         </dependency>-->        <dependency>            <groupId>mysql</groupId>            <artifactId>mysql-connector-java</artifactId>            <version>8.0.16</version>        </dependency>         <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-connector-wikiedits_2.11</artifactId>            <version>${flink.version}</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-connector-filesystem_2.11</artifactId>            <version>${flink.version}</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-hadoop-compatibility_2.11</artifactId>            <version>${flink.version}</version>        </dependency>        <dependency>            <groupId>org.apache.hadoop</groupId>            <artifactId>hadoop-common</artifactId>            <version>2.5.1</version>        </dependency>        <dependency>            <groupId>org.apache.hadoop</groupId>            <artifactId>hadoop-hdfs</artifactId>            <version>2.5.1</version>        </dependency>        <dependency>            <groupId>org.apache.hadoop</groupId>            <artifactId>hadoop-client</artifactId>            <version>2.5.1</version>        </dependency>         <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-connector-rabbitmq_2.11</artifactId>            <version>${flink.version}</version>        </dependency>         <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-avro</artifactId>            <version>${flink.version}</version>        </dependency>         <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-connector-redis_2.11</artifactId>            <version>1.1.5</version>        </dependency>         <dependency>            <groupId>org.apache.httpcomponents</groupId>            <artifactId>httpclient</artifactId>            <version>4.5.2</version>        </dependency>        <dependency>            <groupId>com.fasterxml.jackson.core</groupId>            <artifactId>jackson-core</artifactId>            <version>2.9.5</version>        </dependency>         <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-metrics-dropwizard</artifactId>            <version>${flink.version}</version>        </dependency>          <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>            <version>${flink.version}</version>        </dependency>        <dependency>            <groupId>org.apache.kafka</groupId>            <artifactId>kafka_${scala.binary.version}</artifactId>            <version>0.11.0.0</version>        </dependency>         <!--redis异步连接-->        <dependency>            <groupId>io.lettuce</groupId>            <artifactId>lettuce-core</artifactId>            <version>5.0.5.RELEASE</version>        </dependency>        <dependency>            <groupId>io.netty</groupId>            <artifactId>netty-all</artifactId>            <version>4.1.24.Final</version>        </dependency>    </dependencies>
import io.lettuce.core.RedisClient;import io.lettuce.core.RedisFuture;import io.lettuce.core.api.StatefulRedisConnection;import io.lettuce.core.api.async.RedisAsyncCommands;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.typeutils.RowTypeInfo;import org.apache.flink.table.functions.AsyncTableFunction;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.types.Row; import java.util.Collection;import java.util.Collections;import java.util.concurrent.CompletableFuture;import java.util.function.Consumer; public class MyAsyncLookupFunction extends AsyncTableFunction<Row> {     private final String[] fieldNames;    private final TypeInformation[] fieldTypes;     private transient RedisAsyncCommands<String, String> async;     public MyAsyncLookupFunction(String[] fieldNames, TypeInformation[] fieldTypes) {        this.fieldNames = fieldNames;        this.fieldTypes = fieldTypes;    }     @Override    public void open(FunctionContext context) {        //配置redis异步连接        RedisClient redisClient = RedisClient.create("redis://172.16.44.28:6379/0");        StatefulRedisConnection<String, String> connection = redisClient.connect();        async = connection.async();    }     //每一条流数据都会调用此方法进行join    public void eval(CompletableFuture<Collection<Row>> future, Object... paramas) {        //表名、主键名、主键值、列名        String[] info = {"userInfo", "userId", paramas[0].toString(), "userName"};        String key = String.join(":", info);        RedisFuture<String> redisFuture = async.get(key);         redisFuture.thenAccept(new Consumer<String>() {            @Override            public void accept(String value) {                future.complete(Collections.singletonList(Row.of(key, value)));                //todo//                BinaryRow row = new BinaryRow(2);            }        });    }     @Override    public TypeInformation<Row> getResultType() {        return new RowTypeInfo(fieldTypes, fieldNames);    }     public static final class Builder {        private String[] fieldNames;        private TypeInformation[] fieldTypes;         private Builder() {        }         public static Builder getBuilder() {            return new Builder();        }         public Builder withFieldNames(String[] fieldNames) {            this.fieldNames = fieldNames;            return this;        }         public Builder withFieldTypes(TypeInformation[] fieldTypes) {            this.fieldTypes = fieldTypes;            return this;        }         public MyAsyncLookupFunction build() {            return new MyAsyncLookupFunction(fieldNames, fieldTypes);        }    }}
import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.typeutils.RowTypeInfo;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.TableSchema;import org.apache.flink.table.functions.AsyncTableFunction;import org.apache.flink.table.functions.TableFunction;import org.apache.flink.table.sources.LookupableTableSource;import org.apache.flink.table.sources.StreamTableSource;import org.apache.flink.table.types.DataType;import org.apache.flink.table.types.utils.TypeConversions;import org.apache.flink.types.Row; public class RedisAsyncLookupTableSource implements StreamTableSource<Row>, LookupableTableSource<Row> {     private final String[] fieldNames;    private final TypeInformation[] fieldTypes;     public RedisAsyncLookupTableSource(String[] fieldNames, TypeInformation[] fieldTypes) {       this.fieldNames = fieldNames;        this.fieldTypes = fieldTypes;    }     //同步方法    @Override    public TableFunction<Row> getLookupFunction(String[] strings) {        return null;    }     //异步方法    @Override    public AsyncTableFunction<Row> getAsyncLookupFunction(String[] strings) {        return MyAsyncLookupFunction.Builder.getBuilder()                .withFieldNames(fieldNames)                .withFieldTypes(fieldTypes)                .build();    }     //开启异步    @Override    public boolean isAsyncEnabled() {        return true;    }     @Override    public DataType getProducedDataType() {        return TypeConversions.fromLegacyInfoToDataType(new RowTypeInfo(fieldTypes, fieldNames));    }     @Override    public TableSchema getTableSchema() {        return TableSchema.builder()                .fields(fieldNames, TypeConversions.fromLegacyInfoToDataType(fieldTypes))                .build();    }     @Override    public DataStream<Row> getDataStream(StreamExecutionEnvironment environment) {        throw new UnsupportedOperationException("do not support getDataStream");    }     public static final class Builder {        private String[] fieldNames;        private TypeInformation[] fieldTypes;         private Builder() {        }         public static Builder newBuilder() {            return new Builder();        }         public Builder withFieldNames(String[] fieldNames) {            this.fieldNames = fieldNames;            return this;        }         public Builder withFieldTypes(TypeInformation[] fieldTypes) {            this.fieldTypes = fieldTypes;            return this;        }         public RedisAsyncLookupTableSource build() {            return new RedisAsyncLookupTableSource(fieldNames, fieldTypes);        }    }}
import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.typeutils.RowTypeInfo;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.types.Row;import org.junit.Test; import java.util.Properties; public class LookUpAsyncTest {     @Test    public void test() throws Exception {        LookUpAsyncTest.main(new String[]{});    }     public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //env.setParallelism(1);        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);         final ParameterTool params = ParameterTool.fromArgs(args);        String fileName;        if (params.get("f") != null)            fileName = params.get("f");        else            fileName = "/flink/userClick_Random_100W";         DataStream<String> source = env.readTextFile("hdfs://172.16.44.28:8020" + fileName, "UTF-8");         TypeInformation[] types = new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG};        String[] fields = new String[]{"id", "user_click", "time"};        RowTypeInfo typeInformation = new RowTypeInfo(types, fields);         DataStream<Row> stream = source.map(new MapFunction<String, Row>() {            private static final long serialVersionUID = 2349572544179673349L;             @Override            public Row map(String s) {                String[] split = s.split(",");                Row row = new Row(split.length);                for (int i = 0; i < split.length; i++) {                    Object value = split[i];                    if (types[i].equals(Types.STRING)) {                        value = split[i];                    }                    if (types[i].equals(Types.LONG)) {                        value = Long.valueOf(split[i]);                    }                    row.setField(i, value);                }                return row;            }        }).returns(typeInformation);         tableEnv.registerDataStream("user_click_name", stream, String.join(",", typeInformation.getFieldNames()) + ",proctime.proctime");         RedisAsyncLookupTableSource tableSource = RedisAsyncLookupTableSource.Builder.newBuilder()                .withFieldNames(new String[]{"id", "name"})                .withFieldTypes(new TypeInformation[]{Types.STRING, Types.STRING})                .build();        tableEnv.registerTableSource("info", tableSource);         String sql = "select t1.id,t1.user_click,t2.name" +                " from user_click_name as t1" +                " join info FOR SYSTEM_TIME AS OF t1.proctime as t2" +                " on t1.id = t2.id";         Table table = tableEnv.sqlQuery(sql);         DataStream<Row> result = tableEnv.toAppendStream(table, Row.class); //        result.print().setParallelism(1);         DataStream<String> printStream = result.map(new MapFunction<Row, String>() {            @Override            public String map(Row value) throws Exception {                return value.toString();            }        });         Properties properties = new Properties();        properties.setProperty("bootstrap.servers", "172.16.12.148:9094");        FlinkKafkaProducer011<String> kafkaProducer = new FlinkKafkaProducer011<>(// broker list                "user_click_name",                  // target topic                new SimpleStringSchema(),                properties);        printStream.addSink(kafkaProducer);         tableEnv.execute(Thread.currentThread().getStackTrace()[1].getClassName());    }}

参考:https://zhuanlan.zhihu.com/p/79800113

参考:https://blog.csdn.net/u012554509/article/details/100533749

95-134-110-源码-维表-Flink 1.9.0 维表实现相关推荐

  1. 【Flink源码篇】Flink 1.15.0源码编译

    目录 1. 下载源码并解压 2. Flink项目配置 3. 源码编译 4. 编译问题记录 5. IDEA调试Flink程序 1. 下载源码并解压 从github下载Flink的源码:https://g ...

  2. 数据结构源码笔记(C语言):线性表的单链表示

    /* LinkList.c*/ /*线性表的单链表示:函数实现*/#include <stdio.h> #include <stdlib.h> //#include " ...

  3. 数据结构源码笔记(C语言):线性表的顺序表示

    /* SeqList.c*/ /*线性表的顺序表示:函数实现*/#include <stdio.h> #include <stdlib.h> //#include " ...

  4. 直播视频app源码,Android 点击生成二维码

    直播视频app源码,Android 点击生成二维码实现的相关代码 activity.xml代码如下: <?xml version="1.0" encoding="u ...

  5. 插件一:JAVA微信砍价活动源码分享[商品帮砍到0元,免费领取奖品]

    插件一:微信砍价活动源码分享 [商品帮砍到0元,免费领取奖品] 活动描述: 砍价活动即公众号向粉丝推广的0价赠商品(或优惠价购商品)活动,用户通过分享好友帮其砍价,可将价格从原价一路砍到底价,并抢得名 ...

  6. 【Android 10 源码】healthd 模块 HAL 2.0 分析

    Android 9 引入了从 health@1.0 HAL 升级的主要版本 android.hardware.health HAL 2.0.这一新 HAL 具有以下优势: 框架代码和供应商代码之间的区 ...

  7. [android源码下载索引贴】微信+二维码那都不是事......

    微信: Android 简单仿微信朋友圈布局2014/09/02 http://www.eoeandroid.com/thread-542738-1-1.html [代码片段] 高仿摇一摇效果 201 ...

  8. 云客Drupal源码分析之数据库Schema及创建数据表

    本主题是<云客Drupal源码分析之数据库系统及其使用>的补充,便于查询,所以独立成一个主题 讲解数据库系统如何操作Schema(创建修改数据库.数据表.字段:判断它们的存在性等等),以及 ...

  9. hbase源码系列(八)从Snapshot恢复表

    在看这一章之前,建议大家先去看一下snapshot的使用.这一章是上一章snapshot的续集,上一章了讲了怎么做snapshot的原理,这一章就怎么从snapshot恢复表. restoreSnap ...

最新文章

  1. 10_史上最全的Markdown使用教程(没有之一)(20190115)
  2. [YTU]_2626( B 统计程序设计基础课程学生的平均成绩)
  3. jquery中$运算符的后代选择器
  4. 时序预测:从两篇高影响力的论文谈起
  5. 【OpenCV】直方图应用:直方图均衡化,直方图匹配,对比直方图
  6. hbase 页面访问_HBase
  7. 第五人格服务器维修中怎么进,第五人格进不去怎么办 游戏进不去解决方法详解[多图]...
  8. docker ubuntu16.04镜像下安装cowrie蜜罐记录
  9. 前5月全国快递业务量累计完成396.5亿件 同比增50.1%
  10. 管理博文Hive大数据-Mysql的安装和启动---大数据之Hive工作笔记0007
  11. Django-----模板嵌套
  12. BNU29140 Taikotaiko(概率)
  13. 解决比较Oracle中CLOB字段问题
  14. 2022年4月30号Mysql语句增删改查(CRUB)重在实操。
  15. linux瘦身软件下载,Linux系统瘦身裁剪 ——测试版
  16. 【Latex】Latex小白入门(4)——Latex中特殊符号的输入
  17. 7z 头部错误 数据错误_7z解压软件(7-zip)解压错误怎么办?
  18. 流利阅读day1 Dysmorphia
  19. 史上最强模型 GPT-4 上线:一张手绘草图能生一个网站、60 秒搞定一个游戏开发!
  20. 完全格式化硬盘(删除EFI分区)

热门文章

  1. 马云再谈教育:未来的学校学的不仅仅是知识 更重要的是学习的能力
  2. 五菱汽车:并不知悉导致股价及成交量上升的任何原因
  3. 疫情后全国热门博物馆榜单出炉 第一名竟不是故宫
  4. Redmi发布98寸电视:屏占比98.8%、价格仅为友商1/5
  5. 5G商用将满一年,6G研发开始了...
  6. 悔创阿里杰克马,毫无成就孙正义!孙正义对话马云:马云才是自己的贵人!
  7. 刘海变挖孔!小米高管:明年弹出式全面屏几乎没有了
  8. 中国制造特斯拉亮相 中文车尾标亮了!网友:好抠吗?
  9. 骁龙855加持!OPPO Reno正面照揭晓:边框窄得吓人
  10. 面试热问——你在前一份工作(实习)学到什么?