Flink广播流——BroadcastStream
在日常Flink开发中,有时需要给正在运行中的Flink程序修改参数,比如过滤一些字段、字段值等,而这些值有时是配置在Mysql中的,但如果进行高吞吐计算的Function中动态查询配置文件有可能使任务阻塞,甚至导致任务出现失败的情况。
遇到上述场景时,可以考虑通过广播流查询配置文件,广播到某个operator的所有并发实例中,然后与另一个条流数据连接进行计算。
实现步骤:
1、首先定义一个Mysql的实体类,下面例子的属性名可以自己根据实际中Mysql中表名来变化
class Flow {var flowId = 0//var mode: Int = 0//数据库名var databaseName: String = ""//mysql表名var tableName: String = ""//hbase表名var hbaseTable: String = ""//Column Family名称var family: String = ""//字段名转为大写,默认为truevar uppercaseQualifier: Boolean = true//批量提交的大小,ETL中用到var commitBatch: Int = 0;//组成rowkey的字段名,必须用逗号分隔var rowKey: String = ""//状态var status: Int = 0var kuduTable: String = ""var tidbTable: String = ""var mask_fields: String = ""
}
2、定义一个MapStateDescriptor来描述广播的数据格式
private val flowStateDescriptor: MapStateDescriptor[String, Flow] = new MapStateDescriptor[String, Flow]("flowBroadCastState",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint[Flow] {}))
3、创建一个源Stream来广播下游的operator
class FlowSource extends RichSourceFunction[Flow]{private val log: Logger = LoggerFactory.getLogger(Class[FlowSource].getClass)val serialVersionUID: Long = 3519222623348229907Lprivate val flow = new Flow//状态位var isRunning: Boolean = trueoverride def run(ctx: SourceFunction.SourceContext[Flow]): Unit = {//定时读取数据库的flow表,生成Flow数据while (isRunning) {val conn = MysqlJdbcUtils.getConnection("jdbc:mysql://10.101.40.197:3306/hdb_data_warehouse?useUnicode=true&characterEncoding=utf8","canal","canal")val statement = conn.createStatement()val rs = statement.executeQuery("select * from data_warehouse_cfg")while (rs.next()) {flow.flowId = rs.getInt("flow_id")flow.databaseName = rs.getString("mysql_db")flow.tableName = rs.getString("mysql_table")flow.hbaseTable = rs.getString("hbase_table")flow.family = rs.getString("hbase_col_family")flow.commitBatch = rs.getInt("status")flow.status = rs.getInt("status")flow.rowKey = rs.getString("hbase_rowkey")flow.kuduTable = rs.getString("kudu_table")flow.tidbTable = rs.getString("tidb_table")flow.mask_fields = rs.getString("mask_fields")log.debug("load flow: " + flow.toString)ctx.collect(flow)}//隔一段时间读取,可以使用更新的配置生效Thread.sleep(60 * 1000L)}}override def cancel(): Unit = {isRunning = false}
}
4、添加数据源并把数据源注册成广播流
val broadcast: BroadcastStream[Flow] = env.addSource(new FlowSource).broadcast(flowStateDescriptor)
5、连接广播流和处理数据的流
val connectedStream: DataStream[(FlatMessage, Flow)] = keyedMessage.connect(broadcast).process(new KeyedBroadcastProcessFunction[String, FlatMessage, Flow, (FlatMessage, Flow)] {override def processElement(message: FlatMessage,ctx: KeyedBroadcastProcessFunction[String, FlatMessage, Flow, (FlatMessage, Flow)]#ReadOnlyContext,out: Collector[(FlatMessage, Flow)]): Unit = {//获取配置流val flow = ctx.getBroadcastState(flowStateDescriptor).get(message.getDatabase + message.getTable)if (null != flow) {out.collect((message, flow)) }}override def processBroadcastElement(flow: Flow,ctx: KeyedBroadcastProcessFunction[String, FlatMessage, Flow, (FlatMessage, Flow)]#Context,out: Collector[(FlatMessage, Flow)]): Unit = {val broadcast: BroadcastState[String, Flow] = ctx.getBroadcastState(flowStateDescriptor)...broadcast.put(key, flow)}})
需要注意到的问题:
- 数据源发送数据时候如果数据是集合,必须使用线程安全的集合类
- 获取到的BroadcastState是一个map,相同的KEY,put进去会覆盖掉
Flink广播流——BroadcastStream相关推荐
- Flink的累加器和广播变量、广播流、分布式缓存
1.Accumulator累加器 Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化.可以在Flink job任务中的算 ...
- 【Flink】广播流
flink广播流适用于规则匹配,当规则数据比较少的情况,flink会把规则流广播,数据流就会拿到最新的规则流进行处理. 1.先定义规则类 package com.test;import lombok. ...
- Spark/Flink广播实现作业配置动态更新
点击上方"zhisheng",选择"设为星标" 后台回复"ffa"可以查看 Flink 资料 前言 在实时计算作业中,往往需要动态改变一些配 ...
- Flink / Scala 实战- 4.BroadCast 广播流数据先到再处理 Source 数据
一.引言 Flink 支持增加 DataStream KeyBy 之后 conncet BroadCastStream 形成 BroadConnectedStream,广播流内数据一般为不间断更新的上 ...
- flink怎么保证广播流比数据流先到
前置条件:flink cdc 监听字典表并广播,主流消费kafka设置消费模式为earliest,两个流connect 如果当字典表数据稍微大了点,那么主流数据会比广播流数据到的早,导致前边的数据没法 ...
- flink 怎么保证广播流比数据流先到
解决flink问题小技巧: 一般使用flink中出现了问题, 可以在Apache Flink 中文用户邮件列表: http://apache-flink.147419.n8.nabble.com/ 找 ...
- Flink 多流转换
概述 无论是基本的简单转换和聚合,还是基于窗口的计算,我们都是针对一条流上的数据进行处理的.而在实际应用中,可能需要将不同来源的数据连接合并在一起处理,也有可能需要将一条流拆分开,所以经常会有对多 ...
- Flink 广播变量
广播变量简介 在Flink中,同一个算子可能存在若干个不同的并行实例,计算过程可能不在同一个Slot中进行,不同算子之间更是如此,因此不同算子的计算数据之间不能像Java数组之间一样互相访问,而广播变 ...
- Flink教程(10)- Flink批流一体API(其它)
文章目录 01 引言 02 累加器 2.1 相关API 2.2 示例代码 03 广播变量 3.1 原理 3.2 示例代码 04 分布式缓存 4.1 原理 4.2 示例代码 05 文末 01 引言 在前 ...
最新文章
- 史上最萌的统计学入门书,快来看看长啥样
- 逆向思维--魔兽世界封包分析(1)
- BZOJ4432 : [Cerc2015]Greenhouse Growth
- Java扑克牌(斗地主发牌)
- SSM高级整合_非Maven控制版本下SSM高级整合
- 散文绘画集《心欢喜,灵快乐》研讨会在京举行
- azkaban config: nodes:_关于Nordic SDK的sdk.config.h
- navicat运行sql文件慢_SQL进阶之路——入门
- 汉诺塔V - ACM解决方法
- 三大主流软件负载均衡器对比(LVS VS Nginx VS Haproxy)
- 2013-2018卷积神经网络中十个最重要的概念与创新
- shell基础之EOF的用法
- 关于pipe管道的读写端关闭问题
- 胜为蓝牙适配器驱动_胜为USB蓝牙适配器4.0驱动下载
- uniapp 引导页 启动页 闪屏页功能介绍及部分功能实现
- CDN 加速 OSS 常见问题及处理思路
- Direct3D中的纹理映射
- R语言McSpatial_R语言中帮助和R包
- 使用Fragment实现Tab效果
- 文笔极佳的郭靖夫妇悼文
热门文章
- 童诗白模电-放大电路频率响应
- 波士顿大学计算机专业设置,【美国留学】BU波士顿大学计算机专业申请详解
- 《数字图像处理(第三版)》 第二章 数字图像基础 笔记2(图像感知和获取)
- MATLAB代码:基于非合作博弈的多微网P2P交易策略研究
- 含分布式电源的配电网日前两阶段优化调度模型(Matlab代码实现)
- 设备以国标GB28181协议接入视频平台时可能会遇到的问题
- 【层级多标签文本分类】Incorporating Hierarchy into Text Encoder: a Contrastive Learning Approach for Hierarchic
- 在SQL Server 2008中删除表中的记录时出错,该如何解决
- 【OSPF路由配置命令汇总大全】面面俱到
- 小程序跳转url地址实现