从iceberg的官方文档上可以看到如下介绍:

实例程序中设置了startSnapshotId,介绍说可以从指定的快照版本号开始读取增量的数据。那么笔者的问题来了:

flink-stream如何增量的读取iceberg table?

flink本身肯定没有增量读取iceberg的能力,这是提供框架层的方法,在源码iceberg/flink/src/main/java/org/apache/iceberg/flink/source/中找到了StreamingReaderOperator.java类,继承了flink的AbstractStreamOperator,我们常识从这里入手去读源码。

/*** The operator that reads the {@link FlinkInputSplit splits} received from the preceding {@link* StreamingMonitorFunction}. Contrary to the {@link StreamingMonitorFunction} which has a parallelism of 1,* this operator can have multiple parallelism.** <p>As soon as a split descriptor is received, it is put in a queue, and use {@link MailboxExecutor}* read the actual data of the split. This architecture allows the separation of the reading thread from the one split* processing the checkpoint barriers, thus removing any potential back-pressure.*/public class StreamingReaderOperator extends AbstractStreamOperator<RowData>implements OneInputStreamOperator<FlinkInputSplit, RowData> {

先看这个类注释,就明白了大概框架:
并行的读取从StreamingMonitorFunction接收到的FlinkInputSplit对象,然后放到队列,最后使用MailboxExecutor从FlinkInputSplit读取真正的数据。
既然处理的数据来自StreamingMonitorFunction,那我们先看StreamingMonitorFunction是如何实现的:

/*** This is the single (non-parallel) monitoring task which takes a {@link FlinkInputFormat},* it is responsible for:** <ol>*     <li>Monitoring snapshots of the Iceberg table.</li>*     <li>Creating the {@link FlinkInputSplit splits} corresponding to the incremental files</li>*     <li>Assigning them to downstream tasks for further processing.</li>* </ol>** <p>The splits to be read are forwarded to the downstream {@link StreamingReaderOperator}* which can have parallelism greater than one.*/
public class StreamingMonitorFunction extends RichSourceFunction<FlinkInputSplit> implements CheckpointedFunction {

这是一个单线程执行监控任务:

  1. 监控iceberg的snapshots
  2. 创建对应增量文件的FlinkInputSplit对象
  3. 将FlinkInputSplit分配给下游进一步的处理
    因为它继承了CheckpointedFunction接口,所以要实现下面两个接口:
public void initializeState(FunctionInitializationContext context) throws Exception;
public void snapshotState(FunctionSnapshotContext context) throws Exception;

那么这两个接口不是关注的重点,跳过。剩下就只有run()和cancel(),重点关注这个run():

isRuning初始化为true,所以进来就是一个无限循环,标题中写到这是不能并行执行的任务,所以看到了synchronized和getCheckpointLock()加锁操作,每隔monitorInterval时间就执行一次monitorAndForwardSplits()
首先获取表当前最新的快照snapshotId,如果记录了lastSnapshotId,那就生成lastSnapshotId到snapshotId之间的增量文件的FlinkInputSplit对象,而该类的initializeState()方法已经将lastSnapshotId设置成了startSnapshotId。

呼应了文章开头的实例程序。接下来我们再看FlinkSplitGenerator.createInputSplits(table, newScanContext)这个方法究竟是如何构造出从startSnapshotId到snapshotId之间的增量FlinkInputSplit。

这里使用iceberg官方文档上的java api,具体的api介绍可以看

实现原理就是使用了TableScan扫描startSnapshotId和endSnapshotId之间的文件变化,生成一系列的CombinedScanTask任务,再将CombinedScanTask对象数组包装成FlinkInputSplit对象数组。

好了,如何产生增量数据的FlinkInputSplit我们明白了,那StreamingReaderOperator又是如何从FlinkInputSplit中得到真正增量数据的呢?我们需要再次回到StreamingReaderOperator.processElement()

首先将分配给的FlinkInputSplit放进了splits队列,然后执行了enqueueProcessSplits()方法,在executor中异步的执行了如下操作:

  1. 从列表头中取出一个FlinkInputSplit对象,调用FlinkInputFormat.open()
  2. 轮询调用FlinkInputFormat.nextRecord()获取RowData数据对象,并交给了flink的SourceContext,至此数据真正的进入了流
  3. 一直循环1-2这个过程,直到队列为空

进入FlinkInputFormat.nextRecord()刨根问底:

继续跟进,进入DataIterator.next()

首先,updateCurrentIterator()函数轮询了CombinedScanTask中的Collection files(),针对每个FileScanTask执行了FileScanTaskReader的fileScanTaskReader.open(scanTask, inputFilesDecryptor),通过FileScanTask任务读取了RowData对象,这就涉及底层文件的读取了,包括PARQUET、AVRO、ORC三种文件格式的读取。

至此笔者的疑问已经得到了答案,接下去的细节就不深究了。

学无止境,苦海无涯,撸码要适度,保重!

探究flink-stream如何增量的读取iceberg table相关推荐

  1. 2021年大数据Flink(三十二):​​​​​​​Table与SQL案例准备 API

    目录 API 获取环境 创建表 查询表 Table API SQL ​​​​​​​写出表 ​​​​​​​与DataSet/DataStream集成 ​​​​​​​TableAPI ​​​​​​​SQL ...

  2. 使用Pandas的read_html方法读取网页Table表格数据

    本文通过一个小实例,说明使用Pandas的read_html方法读取网页Table表格数据 要读取的网页表格数据 http://vip.stock.finance.sina.com.cn/q/go.p ...

  3. Flink stream load 方式写入doris

    1,文档 Name: 'STREAM LOAD' Description:     NAME:         stream-load: load data to table in streaming ...

  4. 使用Flink时从Kafka中读取Array[Byte]类型的Schema

    使用Flink时,如果从Kafka中读取输入流,默认提供的是String类型的Schema: val myConsumer = new FlinkKafkaConsumer08[String](&qu ...

  5. flink入门_Flink入门:读取Kafka实时数据流,实现WordCount

    本文主要介绍Flink接收一个Kafka文本数据流,进行WordCount词频统计,然后输出到标准输出上.通过本文你可以了解如何编写和运行Flink程序. 代码拆解 首先要设置Flink的执行环境: ...

  6. Flink MySQL CDC 增量同步要求源表有主键

    版本:2.2 如果源表没有主键,则运行时报错: 2023-03-13 21:28:25,244 INFO [679] [com.ververica.cdc.connectors.mysql.sourc ...

  7. flink stream 终于上local 集群 运行起来

    先上图 运行界面 运行了三个任务 第一个是word count 第二三个是 数据 producer and  consumer ----> 更多代码 可以参考上一篇blog 里面有很详细的介绍 ...

  8. 2021年大数据Flink(三十九):​​​​​​​Table与SQL ​​​​​​总结 Flink-SQL常用算子

    目录 总结 Flink-SQL常用算子 SELECT WHERE ​​​​​​​DISTINCT ​​​​​​​GROUP BY ​​​​​​​UNION 和 UNION ALL ​​​​​​​JOI ...

  9. 2021年大数据Flink(三十八):​​​​​​​Table与SQL ​​​​​​案例五 FlinkSQL整合Hive

    目录 案例五 FlinkSQL整合Hive 介绍 集成Hive的基本方式 准备工作 1.添加hadoop_classpath 2.下载jar并上传至flink/lib目录 3.修改hive配置 4.启 ...

最新文章

  1. vuecli 编译后部署_基于vue-cli 打包时抽离项目相关配置文件详解
  2. 压力变动力,存储追求高效率
  3. web前端之JavaScript
  4. CV_CAST_8U(val);的意义
  5. 我要做一个合格的网络工程师
  6. Spring Boot AJAX 示例
  7. 十七、爬虫实战,多线程抓取大搜网新车的数据
  8. 认清几种视频接口标准---无私奉献版
  9. Python、Java、C#、Perl 创始人聚首,编程语言要变天?
  10. 《社会心理学》第一章读书笔记
  11. 百度短网址URL生成
  12. OpenWrt之DNS 测试工具(nslookup、dig)
  13. 浅谈快速沃尔什变换(FWT)快速莫比乌斯变换(FMT)
  14. ROS——在Ubuntu18.04下基于ROS Melodic编译python3的cv_bridge
  15. 上一步,下一步(撤销和恢复)
  16. 统计字符出现的次数(输出由多到少)
  17. uniapp兼容ipad平板配置
  18. ipc4 - A 转换AV号(avtobv)
  19. 关于ASA5520防火墙搭配WEB服务器的非常规设置
  20. 新的一年如何给自己制定一个年度计划

热门文章

  1. 华为网络测试软件计算机命令
  2. oracle 等待的进程,Oracle 等待事件:ges generic event
  3. 全球及中国粉煤灰PFA行业行业发展动态与前景趋势预测报告2022-2028年
  4. 使用freemarker导出word并动态插入多张图片
  5. 【总结】1165- 前端团队代码规范最佳实践,个人成长必备!
  6. win7计算机名改成大写,图文解读win7系统excel数字小写金额转换成汉字大写金额的措施...
  7. 解决Github 每次上传都要输入用户名和密码
  8. python数据驱动+读取yaml文件+读取excel文件+mySQL
  9. CentOS7的Tiger VNC设置
  10. 深度学习正则化(L1 norm/L2 norm)以及dropout理解