在日常Flink开发中,有时需要给正在运行中的Flink程序修改参数,比如过滤一些字段、字段值等,而这些值有时是配置在Mysql中的,但如果进行高吞吐计算的Function中动态查询配置文件有可能使任务阻塞,甚至导致任务出现失败的情况。

遇到上述场景时,可以考虑通过广播流查询配置文件,广播到某个operator的所有并发实例中,然后与另一个条流数据连接进行计算。

实现步骤:

1、首先定义一个Mysql的实体类,下面例子的属性名可以自己根据实际中Mysql中表名来变化

class Flow {var flowId = 0//var mode: Int = 0//数据库名var databaseName: String = ""//mysql表名var tableName: String = ""//hbase表名var hbaseTable: String = ""//Column Family名称var family: String = ""//字段名转为大写,默认为truevar uppercaseQualifier: Boolean = true//批量提交的大小,ETL中用到var commitBatch: Int = 0;//组成rowkey的字段名,必须用逗号分隔var rowKey: String = ""//状态var status: Int = 0var kuduTable: String = ""var tidbTable: String = ""var mask_fields: String = ""
}

2、定义一个MapStateDescriptor来描述广播的数据格式

private val flowStateDescriptor: MapStateDescriptor[String, Flow] = new MapStateDescriptor[String, Flow]("flowBroadCastState",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint[Flow] {}))

3、创建一个源Stream来广播下游的operator

class FlowSource extends RichSourceFunction[Flow]{private val log: Logger = LoggerFactory.getLogger(Class[FlowSource].getClass)val serialVersionUID: Long = 3519222623348229907Lprivate val flow = new Flow//状态位var isRunning: Boolean = trueoverride def run(ctx: SourceFunction.SourceContext[Flow]): Unit = {//定时读取数据库的flow表,生成Flow数据while (isRunning) {val conn = MysqlJdbcUtils.getConnection("jdbc:mysql://10.101.40.197:3306/hdb_data_warehouse?useUnicode=true&characterEncoding=utf8","canal","canal")val statement = conn.createStatement()val rs = statement.executeQuery("select * from data_warehouse_cfg")while (rs.next()) {flow.flowId = rs.getInt("flow_id")flow.databaseName = rs.getString("mysql_db")flow.tableName = rs.getString("mysql_table")flow.hbaseTable = rs.getString("hbase_table")flow.family = rs.getString("hbase_col_family")flow.commitBatch = rs.getInt("status")flow.status = rs.getInt("status")flow.rowKey = rs.getString("hbase_rowkey")flow.kuduTable = rs.getString("kudu_table")flow.tidbTable = rs.getString("tidb_table")flow.mask_fields = rs.getString("mask_fields")log.debug("load flow: " + flow.toString)ctx.collect(flow)}//隔一段时间读取,可以使用更新的配置生效Thread.sleep(60 * 1000L)}}override def cancel(): Unit = {isRunning = false}
}

4、添加数据源并把数据源注册成广播流

val broadcast: BroadcastStream[Flow] = env.addSource(new FlowSource).broadcast(flowStateDescriptor)

5、连接广播流和处理数据的流

val connectedStream: DataStream[(FlatMessage, Flow)] = keyedMessage.connect(broadcast).process(new KeyedBroadcastProcessFunction[String, FlatMessage, Flow, (FlatMessage, Flow)] {override def processElement(message: FlatMessage,ctx: KeyedBroadcastProcessFunction[String, FlatMessage, Flow, (FlatMessage, Flow)]#ReadOnlyContext,out: Collector[(FlatMessage, Flow)]): Unit = {//获取配置流val flow = ctx.getBroadcastState(flowStateDescriptor).get(message.getDatabase + message.getTable)if (null != flow) {out.collect((message, flow)) }}override def processBroadcastElement(flow: Flow,ctx: KeyedBroadcastProcessFunction[String, FlatMessage, Flow, (FlatMessage, Flow)]#Context,out: Collector[(FlatMessage, Flow)]): Unit = {val broadcast: BroadcastState[String, Flow] = ctx.getBroadcastState(flowStateDescriptor)...broadcast.put(key, flow)}})

需要注意到的问题:

  1. 数据源发送数据时候如果数据是集合,必须使用线程安全的集合类
  2. 获取到的BroadcastState是一个map,相同的KEY,put进去会覆盖掉

Flink广播流——BroadcastStream相关推荐

  1. Flink的累加器和广播变量、广播流、分布式缓存

    1.Accumulator累加器  Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化.可以在Flink job任务中的算 ...

  2. 【Flink】广播流

    flink广播流适用于规则匹配,当规则数据比较少的情况,flink会把规则流广播,数据流就会拿到最新的规则流进行处理. 1.先定义规则类 package com.test;import lombok. ...

  3. Spark/Flink广播实现作业配置动态更新

    点击上方"zhisheng",选择"设为星标" 后台回复"ffa"可以查看 Flink 资料 前言 在实时计算作业中,往往需要动态改变一些配 ...

  4. Flink / Scala 实战- 4.BroadCast 广播流数据先到再处理 Source 数据

    一.引言 Flink 支持增加 DataStream KeyBy 之后 conncet BroadCastStream 形成 BroadConnectedStream,广播流内数据一般为不间断更新的上 ...

  5. flink怎么保证广播流比数据流先到

    前置条件:flink cdc 监听字典表并广播,主流消费kafka设置消费模式为earliest,两个流connect 如果当字典表数据稍微大了点,那么主流数据会比广播流数据到的早,导致前边的数据没法 ...

  6. flink 怎么保证广播流比数据流先到

    解决flink问题小技巧: 一般使用flink中出现了问题, 可以在Apache Flink 中文用户邮件列表: http://apache-flink.147419.n8.nabble.com/ 找 ...

  7. Flink 多流转换

    概述   无论是基本的简单转换和聚合,还是基于窗口的计算,我们都是针对一条流上的数据进行处理的.而在实际应用中,可能需要将不同来源的数据连接合并在一起处理,也有可能需要将一条流拆分开,所以经常会有对多 ...

  8. Flink 广播变量

    广播变量简介 在Flink中,同一个算子可能存在若干个不同的并行实例,计算过程可能不在同一个Slot中进行,不同算子之间更是如此,因此不同算子的计算数据之间不能像Java数组之间一样互相访问,而广播变 ...

  9. Flink教程(10)- Flink批流一体API(其它)

    文章目录 01 引言 02 累加器 2.1 相关API 2.2 示例代码 03 广播变量 3.1 原理 3.2 示例代码 04 分布式缓存 4.1 原理 4.2 示例代码 05 文末 01 引言 在前 ...

最新文章

  1. 史上最萌的统计学入门书,快来看看长啥样
  2. 逆向思维--魔兽世界封包分析(1)
  3. BZOJ4432 : [Cerc2015]Greenhouse Growth
  4. Java扑克牌(斗地主发牌)
  5. SSM高级整合_非Maven控制版本下SSM高级整合
  6. 散文绘画集《心欢喜,灵快乐》研讨会在京举行
  7. azkaban config: nodes:_关于Nordic SDK的sdk.config.h
  8. navicat运行sql文件慢_SQL进阶之路——入门
  9. 汉诺塔V - ACM解决方法
  10. 三大主流软件负载均衡器对比(LVS VS Nginx VS Haproxy)
  11. 2013-2018卷积神经网络中十个最重要的概念与创新
  12. shell基础之EOF的用法
  13. 关于pipe管道的读写端关闭问题
  14. 胜为蓝牙适配器驱动_胜为USB蓝牙适配器4.0驱动下载
  15. uniapp 引导页 启动页 闪屏页功能介绍及部分功能实现
  16. CDN 加速 OSS 常见问题及处理思路
  17. Direct3D中的纹理映射
  18. R语言McSpatial_R语言中帮助和R包
  19. 使用Fragment实现Tab效果
  20. 文笔极佳的郭靖夫妇悼文

热门文章

  1. 童诗白模电-放大电路频率响应
  2. 波士顿大学计算机专业设置,【美国留学】BU波士顿大学计算机专业申请详解
  3. 《数字图像处理(第三版)》 第二章 数字图像基础 笔记2(图像感知和获取)
  4. MATLAB代码:基于非合作博弈的多微网P2P交易策略研究
  5. 含分布式电源的配电网日前两阶段优化调度模型(Matlab代码实现)
  6. 设备以国标GB28181协议接入视频平台时可能会遇到的问题
  7. 【层级多标签文本分类】Incorporating Hierarchy into Text Encoder: a Contrastive Learning Approach for Hierarchic
  8. 在SQL Server 2008中删除表中的记录时出错,该如何解决
  9. 【OSPF路由配置命令汇总大全】面面俱到
  10. 小程序跳转url地址实现