探究flink-stream如何增量的读取iceberg table
从iceberg的官方文档上可以看到如下介绍:
实例程序中设置了startSnapshotId,介绍说可以从指定的快照版本号开始读取增量的数据。那么笔者的问题来了:
flink-stream如何增量的读取iceberg table?
flink本身肯定没有增量读取iceberg的能力,这是提供框架层的方法,在源码iceberg/flink/src/main/java/org/apache/iceberg/flink/source/中找到了StreamingReaderOperator.java类,继承了flink的AbstractStreamOperator,我们常识从这里入手去读源码。
/*** The operator that reads the {@link FlinkInputSplit splits} received from the preceding {@link* StreamingMonitorFunction}. Contrary to the {@link StreamingMonitorFunction} which has a parallelism of 1,* this operator can have multiple parallelism.** <p>As soon as a split descriptor is received, it is put in a queue, and use {@link MailboxExecutor}* read the actual data of the split. This architecture allows the separation of the reading thread from the one split* processing the checkpoint barriers, thus removing any potential back-pressure.*/public class StreamingReaderOperator extends AbstractStreamOperator<RowData>implements OneInputStreamOperator<FlinkInputSplit, RowData> {
先看这个类注释,就明白了大概框架:
并行的读取从StreamingMonitorFunction接收到的FlinkInputSplit对象,然后放到队列,最后使用MailboxExecutor从FlinkInputSplit读取真正的数据。
既然处理的数据来自StreamingMonitorFunction,那我们先看StreamingMonitorFunction是如何实现的:
/*** This is the single (non-parallel) monitoring task which takes a {@link FlinkInputFormat},* it is responsible for:** <ol>* <li>Monitoring snapshots of the Iceberg table.</li>* <li>Creating the {@link FlinkInputSplit splits} corresponding to the incremental files</li>* <li>Assigning them to downstream tasks for further processing.</li>* </ol>** <p>The splits to be read are forwarded to the downstream {@link StreamingReaderOperator}* which can have parallelism greater than one.*/
public class StreamingMonitorFunction extends RichSourceFunction<FlinkInputSplit> implements CheckpointedFunction {
这是一个单线程执行监控任务:
- 监控iceberg的snapshots
- 创建对应增量文件的FlinkInputSplit对象
- 将FlinkInputSplit分配给下游进一步的处理
因为它继承了CheckpointedFunction接口,所以要实现下面两个接口:
public void initializeState(FunctionInitializationContext context) throws Exception;
public void snapshotState(FunctionSnapshotContext context) throws Exception;
那么这两个接口不是关注的重点,跳过。剩下就只有run()和cancel(),重点关注这个run():
isRuning初始化为true,所以进来就是一个无限循环,标题中写到这是不能并行执行的任务,所以看到了synchronized和getCheckpointLock()加锁操作,每隔monitorInterval时间就执行一次monitorAndForwardSplits()
首先获取表当前最新的快照snapshotId,如果记录了lastSnapshotId,那就生成lastSnapshotId到snapshotId之间的增量文件的FlinkInputSplit对象,而该类的initializeState()方法已经将lastSnapshotId设置成了startSnapshotId。
呼应了文章开头的实例程序。接下来我们再看FlinkSplitGenerator.createInputSplits(table, newScanContext)这个方法究竟是如何构造出从startSnapshotId到snapshotId之间的增量FlinkInputSplit。
这里使用iceberg官方文档上的java api,具体的api介绍可以看
实现原理就是使用了TableScan扫描startSnapshotId和endSnapshotId之间的文件变化,生成一系列的CombinedScanTask任务,再将CombinedScanTask对象数组包装成FlinkInputSplit对象数组。
好了,如何产生增量数据的FlinkInputSplit我们明白了,那StreamingReaderOperator又是如何从FlinkInputSplit中得到真正增量数据的呢?我们需要再次回到StreamingReaderOperator.processElement()
首先将分配给的FlinkInputSplit放进了splits队列,然后执行了enqueueProcessSplits()方法,在executor中异步的执行了如下操作:
- 从列表头中取出一个FlinkInputSplit对象,调用FlinkInputFormat.open()
- 轮询调用FlinkInputFormat.nextRecord()获取RowData数据对象,并交给了flink的SourceContext,至此数据真正的进入了流
- 一直循环1-2这个过程,直到队列为空
进入FlinkInputFormat.nextRecord()刨根问底:
继续跟进,进入DataIterator.next()
首先,updateCurrentIterator()函数轮询了CombinedScanTask中的Collection files(),针对每个FileScanTask执行了FileScanTaskReader的fileScanTaskReader.open(scanTask, inputFilesDecryptor),通过FileScanTask任务读取了RowData对象,这就涉及底层文件的读取了,包括PARQUET、AVRO、ORC三种文件格式的读取。
至此笔者的疑问已经得到了答案,接下去的细节就不深究了。
学无止境,苦海无涯,撸码要适度,保重!
探究flink-stream如何增量的读取iceberg table相关推荐
- 2021年大数据Flink(三十二):Table与SQL案例准备 API
目录 API 获取环境 创建表 查询表 Table API SQL 写出表 与DataSet/DataStream集成 TableAPI SQL ...
- 使用Pandas的read_html方法读取网页Table表格数据
本文通过一个小实例,说明使用Pandas的read_html方法读取网页Table表格数据 要读取的网页表格数据 http://vip.stock.finance.sina.com.cn/q/go.p ...
- Flink stream load 方式写入doris
1,文档 Name: 'STREAM LOAD' Description: NAME: stream-load: load data to table in streaming ...
- 使用Flink时从Kafka中读取Array[Byte]类型的Schema
使用Flink时,如果从Kafka中读取输入流,默认提供的是String类型的Schema: val myConsumer = new FlinkKafkaConsumer08[String](&qu ...
- flink入门_Flink入门:读取Kafka实时数据流,实现WordCount
本文主要介绍Flink接收一个Kafka文本数据流,进行WordCount词频统计,然后输出到标准输出上.通过本文你可以了解如何编写和运行Flink程序. 代码拆解 首先要设置Flink的执行环境: ...
- Flink MySQL CDC 增量同步要求源表有主键
版本:2.2 如果源表没有主键,则运行时报错: 2023-03-13 21:28:25,244 INFO [679] [com.ververica.cdc.connectors.mysql.sourc ...
- flink stream 终于上local 集群 运行起来
先上图 运行界面 运行了三个任务 第一个是word count 第二三个是 数据 producer and consumer ----> 更多代码 可以参考上一篇blog 里面有很详细的介绍 ...
- 2021年大数据Flink(三十九):Table与SQL 总结 Flink-SQL常用算子
目录 总结 Flink-SQL常用算子 SELECT WHERE DISTINCT GROUP BY UNION 和 UNION ALL JOI ...
- 2021年大数据Flink(三十八):Table与SQL 案例五 FlinkSQL整合Hive
目录 案例五 FlinkSQL整合Hive 介绍 集成Hive的基本方式 准备工作 1.添加hadoop_classpath 2.下载jar并上传至flink/lib目录 3.修改hive配置 4.启 ...
最新文章
- vuecli 编译后部署_基于vue-cli 打包时抽离项目相关配置文件详解
- 压力变动力,存储追求高效率
- web前端之JavaScript
- CV_CAST_8U(val);的意义
- 我要做一个合格的网络工程师
- Spring Boot AJAX 示例
- 十七、爬虫实战,多线程抓取大搜网新车的数据
- 认清几种视频接口标准---无私奉献版
- Python、Java、C#、Perl 创始人聚首,编程语言要变天?
- 《社会心理学》第一章读书笔记
- 百度短网址URL生成
- OpenWrt之DNS 测试工具(nslookup、dig)
- 浅谈快速沃尔什变换(FWT)快速莫比乌斯变换(FMT)
- ROS——在Ubuntu18.04下基于ROS Melodic编译python3的cv_bridge
- 上一步,下一步(撤销和恢复)
- 统计字符出现的次数(输出由多到少)
- uniapp兼容ipad平板配置
- ipc4 - A 转换AV号(avtobv)
- 关于ASA5520防火墙搭配WEB服务器的非常规设置
- 新的一年如何给自己制定一个年度计划
热门文章
- 华为网络测试软件计算机命令
- oracle 等待的进程,Oracle 等待事件:ges generic event
- 全球及中国粉煤灰PFA行业行业发展动态与前景趋势预测报告2022-2028年
- 使用freemarker导出word并动态插入多张图片
- 【总结】1165- 前端团队代码规范最佳实践,个人成长必备!
- win7计算机名改成大写,图文解读win7系统excel数字小写金额转换成汉字大写金额的措施...
- 解决Github 每次上传都要输入用户名和密码
- python数据驱动+读取yaml文件+读取excel文件+mySQL
- CentOS7的Tiger VNC设置
- 深度学习正则化(L1 norm/L2 norm)以及dropout理解