Background

  • 在使用flink处理数据的时候,需要从mysql获取基础的配置信息去解析收到的数据;
  • 第一种方案使用缓存,程序启动时从mysql读取配置信息加载到缓存,后面定时更新缓存,解析数据时从缓存中获取配置信息,但这种方法存在很多问题,首先会增加数据库的负载,同时缓存更新及时性不佳;
  • 第二种方案使用flink-cdc加广播流,通过mysql日志文件获取数据,减少对数据库的负载,提高数据的一致性;把数据流和配置流合并,然后对数据进行解析,这种方案也会有个问题,无法保证配置信息加载完毕后再处理数据,比如说数据源已经有数据了,程序启动时,配置信息还没加载完,数据已经上来了,那这个时候解析就会有问题;
  • 这里给出第二种方案问题的解决方法,在flink官方中文邮箱列表中找到了一个合理的解决思路,在处理数据流的方法中使用ListState对数据进行“缓冲”,等到配置信息加载完毕后再对“缓冲”的数据进行处理,下面是我自己的实现。

处理思路(后补的哈)

大致说下处理思路哈:
1.程序启动,数据流元素和配置流元素开始加载,不分先后;
2.当接收到数据流元素时,首先判断配置流元素是否加载完毕(判断逻辑后面介绍),若未加载完毕,会把数据缓存起来,因为流处理是数据驱动的,数据不来,缓存的数据就不会被处理,所以为了解决这种情况,同时创建了一个定时器,默认30秒后自动把缓存的数据处理掉;缓存了一定的数据,如果当下一个数据流元素来的时候发现配置加载完毕了,然后首先把缓存的数据处理掉,同时清除缓存、定时处理标志等,接着处理收到的数据,后面就正常流转了;
3.然后说下配置加载完毕的判断逻辑,我这里是根据两个条件判断的,一是配置流元素加载的个数,即配置流元素每次加载都会累加,二是配置流元素更新的加载时间,即配置流元素每次加载都会把当前加载的时间更新下;如果加载的个数大于0,同时更新的加载时间和当前时间相差大于2秒,我即判断为加载完毕。

源码

  • 懒得赘述了,注释的已经很清楚了,不懂的评论里问,不要私信,这样大家也可以看到相同的问题,不用和每个说一遍相同的问题,记得先一键三连哈
  • DataParseFunc
package com.yl.flink.processor;import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.date.DateUtil;
import com.yl.constant.Const;
import com.yl.entity.MultiDataEntity;
import com.yl.entity.cdc.MysqlCdcEntity;
import com.yl.util.FlinkUtil;
import com.yl.util.PayloadParseUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.util.List;/*** @author wlf* @apiNote 数据解析* @since 2022/9/2*/
@Slf4j
public class DataParseFunc extends KeyedBroadcastProcessFunction<String, String, MysqlCdcEntity, MultiDataEntity> {// 配置信息未加载完毕时先缓存数据流元素private transient ListState<String> payloads;// 未设置定时输出缓存流元素的定时器,默认未设置private transient ValueState<Boolean> setTimer;/*** 初始化资源配置*/@Overridepublic void open(Configuration parameters) throws Exception {payloads = getRuntimeContext().getListState(new ListStateDescriptor<>(Const.S_PAYLOAD_STA, TypeInformation.of(new TypeHint<>() {})));setTimer = getRuntimeContext().getState(new ValueStateDescriptor<>(Const.S_TIMER_SET, TypeInformation.of(new TypeHint<>() {})));}/*** 处理数据流元素** @param payload 数据流元素* @param ctx     上下文* @param out     流输出器*/@Overridepublic void processElement(String payload, KeyedBroadcastProcessFunction<String, String, MysqlCdcEntity, MultiDataEntity>.ReadOnlyContext ctx, Collector<MultiDataEntity> out) throws Exception {ReadOnlyBroadcastState<String, List<MysqlCdcEntity>> state = ctx.getBroadcastState(FlinkUtil.getConfigDescriptor());// 数据流元素来时首先判断一下配置信息是否加载完毕if (configReady(state)) {// 如果加载完毕了,判断一下是否有缓存的数据需要处理一下if (null != setTimer.value() && setTimer.value()) {for (String element : payloads.get()) {output(element, state, out);}// 处理后把缓存清空payloads.clear();// 同时修改定时处理缓存的标志,说明缓存已经没数据了,不需要定时处理缓存数据了setTimer.update(false);}// 处理流元素output(payload, state, out);} else {// 如果配置信息还没有加载完毕,先把流元素缓存起来payloads.add(payload);// 只需要一个定时器就行了,到时执行一次就可以把缓存的所有数据处理掉if (null == setTimer.value() || !setTimer.value()) {// 注册定时器,如果后面没来数据,默认30秒后把缓存的数据发送到下游long fireTime = ctx.timerService().currentProcessingTime() + 30_000;ctx.timerService().registerProcessingTimeTimer(fireTime);setTimer.update(true);}}}/*** 处理配置流元素** @param config 配置流元素* @param ctx    上下文* @param out    流输出器*/@Overridepublic void processBroadcastElement(MysqlCdcEntity config, KeyedBroadcastProcessFunction<String, String, MysqlCdcEntity, MultiDataEntity>.Context ctx, Collector<MultiDataEntity> out) throws Exception {// 广播状态BroadcastState<String, List<MysqlCdcEntity>> state = ctx.getBroadcastState(FlinkUtil.getConfigDescriptor());// 每来一个配置流元素都会更新配置缓存的失效时间updateCache(state);// 根据操作类型更新广播数据状态FlinkUtil.updateState(state, config);}/*** 定时输出缓存的数据* 配置流未加载完毕时缓存此时过来的数据流元素,缓存的数据流元素只有等下一个流元素来的时候才会触发后续的处理操作,这里的定时器可以定时输出缓存的数据** @param timestamp 定时器触发时间* @param ctx       上下文* @param out       输出器*/@Overridepublic void onTimer(long timestamp, KeyedBroadcastProcessFunction<String, String, MysqlCdcEntity, MultiDataEntity>.OnTimerContext ctx, Collector<MultiDataEntity> out) throws Exception {ReadOnlyBroadcastState<String, List<MysqlCdcEntity>> state = ctx.getBroadcastState(FlinkUtil.getConfigDescriptor());for (String element : payloads.get()) {output(element, state, out);}// 输出后把缓存清空防止重复输出payloads.clear();// 执行完任务删除定时器ctx.timerService().deleteProcessingTimeTimer(timestamp);// 同时更新定时器的状态setTimer.update(false);}/*** 把流元素输出到下游*/private void output(String payload, ReadOnlyBroadcastState<String, List<MysqlCdcEntity>> state, Collector<MultiDataEntity> out) throws Exception {// 解析数据,并把解析后的数据发送到下游PayloadParseUtil.parse(payload, state).forEach(out::collect);}/*** 每来一个配置流元素进行个数统计,同时更新加载时间*/private void updateCache(BroadcastState<String, List<MysqlCdcEntity>> state) throws Exception {MysqlCdcEntity config;if (state.contains(Const.S_CONFIG_STA)) {config = state.get(Const.S_CONFIG_STA).get(0);// 统计的是流元素的个数config.setConfigCount(config.getConfigCount() + 1);// 更新加载时间config.setConfigTs(DateUtil.date().getTime());} else {config = MysqlCdcEntity.builder().configCount(1).configTs(DateUtil.date().getTime()).build();}// 动态更新广播流状态state.put(Const.S_CONFIG_STA, ListUtil.toList(config));}/*** 配置流元素是否配置完毕* 正常情况下配置流元素更新的时间和当前时间相差大于2秒代表配置完成* 配置加载完毕判断的两个必要条件:* 1.配置流元素加载的个数大于0;* 2.配置流元素加载时更新的加载时间和当前时间相差大于2秒;*/private Boolean configReady(ReadOnlyBroadcastState<String, List<MysqlCdcEntity>> state) throws Exception {long nowTs = DateUtil.date().getTime();if (state.contains(Const.S_CONFIG_STA)) {MysqlCdcEntity config = state.get(Const.S_CONFIG_STA).get(0);long configTs = config.getConfigTs();return config.getConfigCount() > 0 && Math.abs((nowTs - configTs) / 1000) > 2;}return false;}}

项目开源

【基于flink开发的传感器监测类项目通用实时数据流处理程序】

Flink解决广播配置流滞后数据流的问题相关推荐

  1. flink 怎么保证广播流比数据流先到

    解决flink问题小技巧: 一般使用flink中出现了问题, 可以在Apache Flink 中文用户邮件列表: http://apache-flink.147419.n8.nabble.com/ 找 ...

  2. 【Flink】广播流

    flink广播流适用于规则匹配,当规则数据比较少的情况,flink会把规则流广播,数据流就会拿到最新的规则流进行处理. 1.先定义规则类 package com.test;import lombok. ...

  3. 动态配置流处理-BetterCloud如何使用Flink构建报警系统

    动态配置流处理-BetterCloud如何使用Flink构建报警系统

  4. as点击发送广播_Apache Flink 中广播状态的实用指南

    翻译 | 王柯凝 校对 | 邱从贤(山智) 自版本 Flink 1.5.0 以来,Apache Flink 提供了一种新的状态类型,称为广播状态(Broadcast State).在本文中,将解释什么 ...

  5. [数据湖] 基于flink hudi的批流一体实践

    1.业务背景介绍 广告主和代理商通过广告投放平台来进行广告投放,由多个媒介进行广告展示 ,从而触达到潜在用户.整个过程中会产生各种各样的数据,比如展现数据.点击数据.其中非常重要的数据是计费数据,以计 ...

  6. Flink部署——内存配置

    文章目录 配置 Flink 进程的内存 配置总内存 JVM 参数 受限的等比内存部分 配置 TaskManager 内存 配置总内存 配置堆内存和托管内存 任务(算子)堆内存(Task (Operat ...

  7. [Flink基础]--什么是流处理?

    感谢原文作者:https://data-artisans.com/what-is-stream-processing 什么是流处理? Data Artisans由ApacheFlink®的原始创建者创 ...

  8. flink生产环境参数配置

    1.flink生产环境配置 2.flink 可配置参数 2.1常用选项 键 默认 描述 jobmanager.heap.size 1024MB JobManager的JVM堆大小. taskmanag ...

  9. Flink Checkpoint所有配置解读

    配置类在:org.apache.flink.configuration.CheckpointingOptions 配置解析: 配置 类型 默认值 描述 state.backend String 无 检 ...

最新文章

  1. 【TensorFlow2.0】(5) 数学计算、合并、分割
  2. 企业 SOA 设计(1)–ESB 设计
  3. Struts 2:處理一個form多個submit
  4. MATLAB Simulink中自定义函数和switch case的用法
  5. JS-面向对象-继承
  6. ORA-12514: TNS:listener does not currently know of service …
  7. 坦克大战之声音处理类(四)
  8. java 多线程 举例,Java多线程简单举例
  9. android h5 保存通讯录,h5+调用系统通讯录
  10. CE教程第八关——搜索4级指针
  11. 6.1 PyTorch简单二分类模型
  12. 自考计算机00051笔记,自考00051 管理系统中计算机应用自考资料笔记自考小抄.doc...
  13. 微信小程序点播音频服务器,微信小程序无法播放本地音频
  14. 服装制造业信息化系统分析(一)
  15. 优雅的处理你的Java异常
  16. UNIAPP/微信小程序-下拉刷新的操作
  17. 电脑怎么系统重装,重装电脑系统怎么装
  18. Glide的使用回收内存问题
  19. PHP上传图片类Upload.php
  20. html页面点击小图弹出大图代码,利用JS实现点击小图弹出大图代码

热门文章

  1. 2017年web前端职位_2017年10个Web预测
  2. ICPC Camp 2016 几道神奇的构造打表题
  3. 特征降维-PCA(Principal Component Analysis)
  4. 移动端web设计尺寸_移动端H5页面的设计稿尺寸(上)
  5. 蓝牙基带调制(GFSK)
  6. 关于connection holder is null的个人解决方案实践
  7. 一次非常有趣的sql优化经历
  8. Java程序员参考网站:11个高质量技术网站推荐
  9. picoCTF,Forensics,取证类,43/50
  10. 每日10行代码31:爬取人民日报一日的所有文章并存入数据库