Flink实时数据处理实践经验(Flink去重、维表关联、定时器、双流join)
Flink实时数据处理实践经验
文章目录
- Flink实时数据处理实践经验
- 1. 数据输入与预处理
- 2. 实时数据处理
- 3. 实时数仓架构
- 4. 优化方案
Java、大数据开发学习要点(持续更新中…)
首先需要知道的一些知识:
- Flink中所说的UDF在官方文档中定义为用户自己通过继承类或实现接口的自定义类、已有方法中传入匿名内部类、在已有函数中用Lambda表达式。
- Flink实现结果实时展示(一些简单的分组聚合逻辑)可以直接在流中分组聚合写入Redis/MySQL,而需要在线实时多维分析需要将数据存入ClickHouse交给CK来处理。
1. 数据输入与预处理
- 首先,Flink的每项实时任务需要通过FlinkKafkaComsumer获取对接Kafka的流式处理环境,所以可以对获取DataStream封装一个工具类。这个工具类根据传入的配置文件和具体的反序列化方式类型
Class<? extends DeserializationSchema<T>>
(如SimpleStringSchema.class),在内部为每个实时任务创建唯一执行环境和返回对接Kafka的DataStream。 - 其次,在获取到对应的DataStream后,需要对每条JSON数据解析成Java Bean对象。而解析的方法可以通过封装一个UDF(实现ProcessFunction,内部fastjson解析)。
2. 实时数据处理
在实时数仓实践中比较有意思的一些部分的总结:
- 流量域相关:
- 地理维度集成
FLink对每条数使用map进行查询是同步的做法,效率低下。这里可以使用Flink中的AsyncDataStream进行异步I/O查询。一般会在优先请求本地缓存的地理位置信息库,如果没有查询结果请求地图服务商API,在获取本次查询结果后再完善本地地理位置信息库。
- 关于新老用户标签:
离线数仓中由于数仓总是保存了历史的数据,所以在新的一天增量数据到来时,可以通过两表join的操作判断新数据是否为新用户来为每条记录添加新老用户标签。
实时数仓中新老用户的判断也可以去查询历史数据,但这样做会比较慢,不符合实时的场景。解决方式可以很容易想到在确保每个用户每次访问都会进入同一个组中1,状态保存HashSet来标记bean新老用户字段,但问题也很明显HashSet随用户线性增长,占用内存过高使得内存压力大且状态写入statebackend速度也会下降。所以会使用布隆过滤器替代HashSet方案,布隆过滤器的特点在于擅长判断不存在(只要有一个对应位为0就能判断其不存在),用位存储相比于HashSet节省存储空间。1.实现同一个用户访问进入同一个组中的方法:
第一种方法:可以通过对粗粒度的维度进行keyby比如手机型号等来确保同一个用户的数据进入同一个分组中,但存在的问题是可能由于手机型号不均衡导致数据倾斜,和每个分组都持有一个布隆过滤器(KeyedState)还是占用较多内存;
第二种方法:对最细粒度的维度进行keyby即设备id,这样只有同一用户的数据会进入到同一分组中,但同一个Slot中会存在大量分组,所以在同一个Slot中共享一个布隆过滤器(OperatorState)来实现。
- 关于Flink实现多维分析:
一般在Flink使用的实时场景中,不会对数据进行大量的分组聚合等操作,对于像多维复杂统计的需求一般是Flink在对数据ETL后存入CK后利用CK实现快速查询。
- 数据唯一ID生成(Kafka的topic+partition+offset)
可以通过FlinkKafkaComsumer类中传入KafkaDeserializtionSchema的实现类,重写deserialize()中可以获取topic、partition、offset和原始value信息等。- Flink整合CK实现At-Least-Once方案
使用是ClickHouse的ReplacingMergeTree,可以将同一个分区中,数据唯一ID相同的数据进行覆盖(merge),可以保留最新的数据(根据插入CK时的系统时间),可以使用这个特点实现Flink + Clickhouse的At-Least-Once。
存在的问题:写入到CK中的数据不会立即merge,需要手动optimize或后台自动合并。
解决方案:查询时在表名的后面加上final关键字,就只查最新的数据数据,但是查询效率变低。- Flink通过JDBC Sink将数据导入CK(详情Flink官网有示例,数据按批次和时间阈值执行导入)。
- 关于数据的实时统计结果测流输出的方案:
前言:在实时统计的场景中,比如对某个实物的在线人数实时统计,如果使用较多状态存储(尤其布隆过滤器),需要设置状态TTL对非活跃的状态进行管理。对有在线时长要求的计算通过状态和定时器实现对应的需求1。
现在有一个需求是,将数据流的一些聚合结果写入Redis用于实时展示,将明细写入ClickHouse用于之后的多维分析。采用的方式是将需要展示的聚合结果在定时器触发时(onTimer方法中)测流输出(打上标签)。测流输出Sink入Redis的主要代码如下:
其中,addSink()中的第二个参数需要重写里面的方法来设计写入Redis的k-v格式(Hset形式,key设计为id+时间,value设计为其他字段的拼接)。
主流输出JDBC Sink入ClickHouse的主要代码如下:
1.定时器定时触发的设定
定时器在设定时间相同的情况下会覆盖前一个定时器,利用这个特性可以用进入时间-进入时间%设定时长+设定时长
来达到定时器根据设定时间的恒定触发效果(期间数据到达不会对定时器触发产生影响)。
- 关于数据流关联维表的方案:
Flink关联外部维表通常的解决方案:
a) 每来一条数据查一次数据库(慢、吞吐量低)
b) 可以使用异步IO(相对快,消耗资源多)
c) 广播State(最快、适用于少量数据、数据可以变化的)
上面这个方案需要JDBC Source定时查询数据库,并不是最好的方案。自定义JDBC单并行Source(避免需要对MySQL表进行划分)实现MySQL维表增量读取:
读取完成后,得到的广播变量与主流进行connnect并实现BroadcastProcessFunction完成数据整合和输出。
- 关于划分窗口计算TopN:
实现方式:
- 先将数据进行keyby,划分滑动窗口(数据平滑),在窗口内进行增量聚合1(效率高,全局聚合效率低,而且占用大量资源)。但聚合函数只能获得到聚合条件和聚合结果信息,没法获取窗口的信息(窗口的起始时间,结束时间),所以再定义一个WindowFunction,窗口触发后可以在WindowFunction获取到窗口聚合后的数据,并且可以得到窗口的起始时间和结束时间(这两个定义的方法都是传入AggregateFunction中的)。
- 接下来要根据分类ID、事件ID、窗口起始时间、结束时间等条件keyby后进行排序。使用ProcessFunction的定时器实现判断一个窗口数据是否已经到齐,每来一条数据,不直接输出,而是将数据存储到ValueState<List>(为了容错),再注册一个比当前窗口的结束时间还要大一毫秒的定时器(仍然利用了注册时间相同的定时器会覆盖的特性)。如果下一个窗口的数据到达了,那么WaterMark已经大于了注册的定时器的时间,上一个窗口的数据已经攒齐,在onTimer方法中就可以排序并输出上个窗口的TopN数据。
1.增量聚合和全量聚合
增量聚合: 窗口不维护原始数据,只维护中间结果,每次基于中间结果和增量数据进行聚合。>如:ReduceFunction、AggregateFunction
。
全量聚合: 窗口需要维护全部原始数据,窗口触发进行全量聚合。如:ProcessWindowFunction
。
- 业务域相关:
- 关于业务数据双流Join:
双流join最主要的问题是,两个流可能会有一个流存在迟到的现象:
- 左表数据迟到:比较严重,在left join后甚至不会有数据出现。
- 右表数据迟到:在left join后右表数据为null。
- 正常情况:左右表都有数据。
针对上面迟到数据的解决方案为:
首先,在两个流join时需要划分窗口,在窗口中调用coGroupFunction进行left join。那么可以在join前先让左表数据先通过一个与后面join时等长的窗口,这样如果左表数据迟到,在两个窗口中都必定迟到,由此在第一个窗口中可以将迟到数据测流输出进行收集(WaterMark进行判断)。在left join后,迟到的右表数据则体现为右表全为null。将左表迟到的流和主流进行union,再去数据库查询右表为空的数据来解决双流join中数据迟到的问题。
另一种解决方案是,想在join的同时去测流数据迟到的数据,但coGroupFunction没有对应的实现,需要进行源码的修改实现。
3. 实时数仓架构
4. 优化方案
- StateBackend的优化:使用RocksDB作为StateBackend,适合存储更多状态、有长窗口的状态(window state)、key value的数据较大的状态(上限2G)。还有个重要的优势在于可以增量checkpoint。
Flink实时数据处理实践经验(Flink去重、维表关联、定时器、双流join)相关推荐
- 【Flink系列】开发篇:1. Flink维表关联方案
数据流往往需要访问外部的数据源来丰富自己的信息,比如通过record中的ip地址查询ip数据库maxmind的GeoIP2 Databases得到ip对应的城市名称,城市经纬度,将这些作为新的字段添加 ...
- flink维表关联系列之Redis维表关联:实时查询
点击上方蓝 字关注~ 在做维表关联如果要求低延时,即维表数据的变更能够被立刻感知到,所以就要求在查询时没有缓存策略,直接查询数据库维表信息. 本篇以实时查询redis为例,要求redis 客户端支持异 ...
- 基于 MaxCompute 的实时数据处理实践
简介: MaxCompute 通过流式数据高性能写入和秒级别查询能力(查询加速),提供EB级云原生数仓近实时分析能力:高效的实现对变化中的数据进行快速分析及决策辅助.当前Demo基于近实时交互式BI分 ...
- sql 忽略大小写_Flink使用Calcite解析Sql做维表关联(一)
点击箭头处"蓝色字",关注我们哦!! 维表关联是离线计算或者实时计算里面常见的一种处理逻辑,常常用于字段补齐.规则过滤等,一般情况下维表数据放在MySql等数据库里面,对于离线计算 ...
- 95-134-114-源码-维表-Hbase维表关联:LRU策略
1 .世界 2.概述 LRU(Least Recently Used),最近最少使用缓存淘汰算法,认为最近访问过的数据在将来被访问的概率也比较大,当内存达到上限去淘汰那些最近访问较少的数据. ...
- 美团点评基于Storm的实时数据处理实践
背景 目前美团点评已累计了丰富的线上交易与用户行为数据,为商家赋能需要我们有更强大的专业化数据加工能力,来帮助商家做出正确的决策从而提高用户体验.目前商家端产品在数据应用上主要基于离线数据加工,数据生 ...
- Flink CDC入门实践--基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL
文章目录 前言 1.环境准备 2.准备数据 2.1 MySQL 2.2 postgres 3.启动flink和flink sql client 3.1启动flink 3.2启动flink SQL cl ...
- Greenplum 实时数据仓库实践(8)——事实表技术
目录 8.1 事实表概述 8.2 周期快照 8.3 累积快照 8.4 无事实的事实表 8.5 迟到的事实 8.6 累积度量 小结 上一篇里介绍了几种基本的维度表技术,并用示例演示了每种技术的实现过程. ...
- 58.订单明细实时表和商品、品牌、spu 等维表关联
2.3.1 关联方式 (1)方法 1:用明细表依次和每个维度表进行关联 ➢ 订单明细和商品关联 order_detail --> sku_id ➢ 订单明细商品宽表和 spu 关联 订单明细宽表 ...
最新文章
- 【LeetCode】142 - Linked List Cycle II
- Java 18 要来了,你不会还在用Java 8吧?
- IBM推出AutoAI,让企业人工智能模型开发自动化
- MATLAB从入门到精通系列之如何在MATLAB中导入excel单sheet页及多sheet页表格
- 用c语言链表编写便利店零售系统,链表实现多项式求和(C语言)
- Highly Available (Mirrored) Queues
- 讲讲Bootstrap是在干啥?
- 计算t-test 的C程序
- 数据湖元数据服务的实现和挑战
- 阿里巴巴集团的几十款著名开源项目(Java)
- 15种基础的可以直接使用的CSS3样式
- 使用JavaScript生成二维码教程-附qrcodejs中文文档
- SAP License:财务与会计的区别
- 基于机器视觉的眼镜镜片轮廓提取
- Flex中添加大量组件时内存占用问题
- 第二章 C++对C的改进和扩展
- linux在线汇编编译器,Linux 汇编 Hello World
- Flume1.6.0之Error-protobuf-This is supposed to be overridden by subclasses
- 国产手机的18年历史
- 策略盈亏分布统计——从零到实盘11