文章目录

  • DataStream
  • Transformation
  • StreamOperator
  • Function
  • Transformation Chain
    • fromElements
      • fromCollection
      • addSource
      • 创建 DataStreamSource
    • flatMap
    • keyBy
    • sum
    • print

DataStream

A DataStream represents a stream of elements of the same type. A DataStream can be transformed into another DataStream by applying a transformation.

DataStream 是面向用户的对数据流的 API 封装,底层其实是一个 Tranformation chain,通过 map、keyBy 等方法,DataStream 往内部的 Tranformation chain 继续添加 Tranformation,并通过该 Tranformation 转换成另一个 DataStream。

Transformation

A Transformation represents the operation that creates a DataStream. Every DataStream has an underlying Transformation that is the origin of said DataStream.

多个 Transformation 串联起来封装了创建该 DataStream 的全部操作信息。Transformation 一般是对 StreamOperator 的封装。

需要注意的是,Transformation 只是一个逻辑操作,不是严格对应的物理操作,比如 keyBy 生成的 Transformation 代表的是一个分区信息,没有真正需要执行的物理操作,因此也没有包含 StreamOperator。

StreamOperator

Basic interface for stream operators. Implementers would implement one of OneInputStreamOperator or TwoInputStreamOperator to create operators that process elements.
The class AbstractStreamOperator offers default implementation for the lifecycle and properties methods.
Methods of StreamOperator are guaranteed not to be called concurrently. Also, if using the timer service, timer callbacks are also guaranteed not to be called concurrently with methods on StreamOperator.

所有 operators 的基类接口,默认实现为 AbstractStreamOperator,AbstractUdfStreamOperator 是 AbstractStreamOperator 的一个特殊的子类,所有包含 Function 的 operators 都应该拓展该类。另外,每个具体的 operator 应实现 OneInputStreamOperator 或 TwoInputStreamOperator 接口

另外 StreamOperator 的所有方法都必须保证不被并发调用,即使使用了定时任务,也必须保证定时任务不会并发调用这些方法。

Function

The base interface for all user-defined functions.
This interface is empty in order to allow extending interfaces to be SAM (single abstract method) interfaces that can be implemented via Java 8 lambdas.

所有 UDF(User-Defined Function) 的基类接口,这是一个空接口,设计成空接口能够方便子接口定义成一个函数式接口(只有一个抽象方法的接口),这样用户在使用时可以直接运用 lambda 表达式,整体代码会更简单清晰。

Transformation Chain

在 Transformation 中提到过,多个 Transformation 串联起来封装了整个 DataStream 的操作信息,

public class WordCount {public static final String[] WORDS = new String[]{"To be, or not to b"};public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.fromElements(WORDS) .name("word-count");DataStream<Tuple2<String, Integer>> counts = text.flatMap(new WordCount.Tokenizer()).keyBy(v -> v.f0).sum(1);counts.print();env.execute("Word Count")}public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {public Tokenizer() {}@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] tokens = value.toLowerCase().split("\\W+");for (String token : tokens) {if (token.length() > 0) {collector.collect(new Tuple2(token, 1));}}}}
}

我们以这段代码为例讲解 Transformation 的创建过程以及它和 DataStream 的关系。

fromElements

  1. 获取元素的类型信息;
  2. 调用 fromCollection(Arrays.asList(data), typeInfo)。

fromCollection

  1. 创建 SourceFunction,这是一个继承了 Function 的接口,实际类型为 FromElementsFunction,在创建过程中会对元素进行序列化为字节数组,存到 elementsSerialized 中;
  2. 调用 addSource 创建 DataStreamSource(这是一个 DataStream)。

addSource

  1. 获取之前解析得到的类型信息;
  2. 创建 StreamSource,StreamSource 继承自 AbstractUdfStreamOperator,包装了 fromCollection 中创建的 SourceFunction;
  3. 创建 DataStreamSource 并将其返回。

创建 DataStreamSource

  1. 创建 LegacySourceTransformation,将 addSource 中的 StreamSource 进行了封装;
  2. 将 LegacySourceTransformation 赋值给 DataStream 的 transformation 成员变量。

最终出来的结果结构如下图

同理,下面绘出了 flatMap、keyBy、sum、print 方法分别得到的 DataStream 结构。

flatMap

flatMap 最底层包装的 FlatMapFunction 传入的是用户的实现类。

keyBy

比较特别的一点是,keyBy 对应的 PartitionTransformation 中不包含 operator,而是存了一个分区器(StreamPartitioner),而最底层的 KeySelector 则是用户的实现类。

sum

与 keyBy 相似,sum 对应的 ReduceTransformation 中不包含 StreamOperator,而是直接记录了对应的 Function,sum 对应的 Function 为 SumAggregator。

print

print 返回的是一个 DataStreamSink,它不是一个 DataStream,因为当我们添加 sink 的时候说明这个 DataStream 的计算也已经结束了,但 DataStreamSink 和 DataStream 相似,它里面会保存一个 LegacySinkTransformation,LegacySinkTransformation 中有 StreamSink(StreamOperator),StreamSink 中包含 PrintSinkFunction(Function)。

这些 Transformation 会以 chain 的形式连接起来如下:


如 print 对应的 DataStreamSink 只需保存 LegacySinkTransformation 的引用,就可以拿到整个 Transformation chain 的信息,这条 Transformation chain 就包含了整个操作链路的信息。

Flink 源码笔记01—DataStream 和 Transformation相关推荐

  1. 【Flink】Flink 源码阅读笔记(15)- Flink SQL 整体执行框架

    1.概述 转载:Flink 源码阅读笔记(15)- Flink SQL 整体执行框架 在数据处理领域,无论是实时数据处理还是离线数据处理,使用 SQL 简化开发将会是未来的整体发展趋势.尽管 SQL ...

  2. 【Flink】Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型

    1.概述 转载:Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型 相似文章:[Flink]Flink 基于 MailBox 实现的 StreamTask 线程模型 Fl ...

  3. 【Flink】Flink 源码阅读笔记(18)- Flink SQL 中的流和动态表

    1.概述 转载:Flink 源码阅读笔记(18)- Flink SQL 中的流和动态表

  4. 【Flink】Flink 源码阅读笔记(16)- Flink SQL 的元数据管理

    1.概述 转载:Flink 源码阅读笔记(16)- Flink SQL 的元数据管理 Flink 源码阅读笔记(17)- Flink SQL 中的时间属

  5. Flink 源码解析 —— 源码编译运行

    更新一篇知识星球里面的源码分析文章,去年写的,周末自己录了个视频,大家看下效果好吗?如果好的话,后面补录发在知识星球里面的其他源码解析文章. 前言 之前自己本地 clone 了 Flink 的源码,编 ...

  6. 【Flink】 Flink 源码之 SQL 执行流程

    1.概述 转载:Flink 源码之 SQL 执行流程 2.前言 本篇为大家带来Flink执行SQL流程的分析.它的执行步骤概括起来包含: 解析.使用Calcite的解析器,解析SQL为语法树(SqlN ...

  7. 【Flink】 Flink 源码之 Buffer Timeout优化

    1.概述 转载:Flink 源码之 Buffer Timeout优化 2.Buffer Timeout 概念 Flink每个算子向下游发送数据需要两个条件: 输出buffer空间占满 buffer中数 ...

  8. 【Flink】Flink 源码之时间处理

    1.概述 转载:Flink 源码之时间处理 2.Flink支持的时间类型 EventTime: 每条数据都携带时间戳.Operator处理数据的时候所有依赖时间的操作依据数据携带的时间戳.可以支持乱序 ...

  9. Manim文档及源码笔记-CE文档-示例库3使用Manim绘图

    Manim文档及源码笔记-CE文档-示例库3使用Manim绘图 参考原文: Manim Community Edition Example Gallery 前言 笔记随想: 暂未发现官方中文版,自己实 ...

  10. 狂神Spring Boot 员工管理系统 【源码 + 笔记 + web素材】 超详细整理

    目录 员工管理系统 1.准备工作 1.1.导入资源 1.2.编写pojo层 1.3.编写dao层 2.首页实现 2.1.引入Thymeleaf 2.2.编写MyMvcConfig 2.3.测试首页 3 ...

最新文章

  1. NBJL 2020论文导读14:How Much Position Information Do Convolutional Neural Networks Encode ?
  2. 取消chrome下input和textarea的聚焦边框
  3. java.lang.InstantiationException 不能实例化某个对象
  4. MusicXML 3.0 - DTD 速查
  5. xcode 编译时有相同的类,导致冲突,编译错误
  6. php不会写 能看懂,人人都能看懂的全栈开发教程——PHP
  7. ECharts 点击非图表区域的点击事件不触发问题
  8. linux链接达梦数据库,linux下面 达梦数据库的JDBC链接
  9. Brief C Programs of the Bombs
  10. SQL-22 统计各个部门对应员工涨幅的次数总和,给出部门编码dept_no、部门名称dept_name以及次数sum...
  11. 疫情下的十大堵城:复工后整体拥堵下降37.3%
  12. Android 图片圆角的设置
  13. python 干什么工作具有明显优势-python语言的优势是什么
  14. ELK+filebeat+kafka+zookeeper构建海量日志分析平台
  15. java常见类型的转化以及风险
  16. 论文必备:如何用卡片法写论文?
  17. 【科普】数字货币的基石--区块链
  18. matlab自动变量名,matlab中如何自动给变量命名?
  19. 數字圖像中邊緣檢測算法綜合研究
  20. C#打开Adobe Reader进程打开pdf并传递页码参数跳转到指定页数

热门文章

  1. javaFx(7)文本阅读器
  2. PAT甲之初窥门径(上)
  3. python 输出结果图文混排_Django图文混排
  4. 新评论接口——京东评论接口
  5. 基于PG与PostGIS搭建实时矢量瓦片服务
  6. element-ui表格显示html格式
  7. 32、Java高级特性——日期操作类、Date类、SimpleDateFormat类、Calendar类
  8. python 232串口通信
  9. Java函数式编程与Lambda表达式
  10. HDU 6438 Buy and Resell (优先队列 or 贪心)