Flink cep动态模板+cep规则动态修改实践
私信之前请一键三连,请给一点动力!谢谢
目录
Flink CEP 概念以及使用场景
1.什么是CEP?
2.Flink CEP 应用场景
3.Flink CEP 原理(只做简单了解)
规则条件遵循参考
简单规则:
相对复杂规则:
CEP支持的模式
个体模式
模式序列
groovy+aviator的介绍
groovy是什么?
Aviator是什么?
groovy+aviator+cep的整合使用
正常cep代码开发流程
cep动态模板
cep动态模板+动态规则修改
运行演示
集群运行效果
具体代码
flink_cep_groovy_aviator项目
flink_cep_groovy_aviator2项目
相关代码
参考文章
最后
Flink CEP 概念以及使用场景
1.什么是CEP?
CEP的意思是复杂事件处理,例如:起床-->洗漱-->吃饭-->上班等一系列串联起来的事件流形成的模式称为 CEP。如果发现某一次起床后没有刷牙洗脸亦或是吃饭就直接上班,就可以把这种非正常的事件流匹配出来进行分析,看看今天是不是起晚了。
在真实场景中,起床,洗漱,吃饭,上班就是一个个事件数据。
CEP的特征如下:
目标:从有序的简单事件流中发现一些高阶特征;
输入:一个或多个简单事件构成的事件流;
处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件;
输出:满足规则的复杂事件。
下图中列出了几个例子,三个场景:
- 第一个是异常行为检测的例子:假设车辆维修的场景中,当一辆车出现故障时,这辆车会被送往维修点维修,然后被重新投放到市场运行。如果这辆车被投放到市场之后还未被使用就又被报障了,那么就有可能之前的维修是无效的。
- 第二个是策略营销的例子:假设打车的场景中,用户在 APP 上规划了一个行程订单,如果这个行程在下单之后超过一定的时间还没有被司机接单的话,那么就需要将这个订单输出到下游做相关的策略调整。
- 第三个是运维监控的例子:通常运维会监控服务器的 CPU、网络 IO 等指标超过阈值时产生相应的告警。但是在实际使用中,后台服务的重启、网络抖动等情况都会造成瞬间的流量毛刺,对非关键链路可以忽略这些毛刺而只对频繁发生的异常进行告警以减少误报。
2.Flink CEP 应用场景
- 风险控制:对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规操作的嫌疑。
- 策略营销:用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。
- 运维监控:灵活配置多指标、多依赖来实现更复杂的监控模式。
3.Flink CEP 原理(只做简单了解)
- Flink CEP内部是用NFA(非确定有限自动机)来实现的,由点和边组成的一个状态图,以一个初始状态作为起点,经过一系列的中间状态,达到终态。点分为起始状态、中间状态、最终状态三种,边分为 take、ignore、proceed 三种。
- take:必须存在一个条件判断,当到来的消息满足 take 边条件判断时,把这个消息放入结果集,将状态转移到下一状态。
- ignore:当消息到来时,可以忽略这个消息,将状态自旋在当前不变,是一个自己到自己的状态转移。
- proceed:又叫做状态的空转移,当前状态可以不依赖于消息到来而直接转移到下一状态。举个例子,当用户购买商品时,如果购买前有一个咨询客服的行为,需要把咨询客服行为和购买行为两个消息一起放到结果集中向下游输出;如果购买前没有咨询客服的行为,只需把购买行为放到结果集中向下游输出就可以了。 也就是说,如果有咨询客服的行为,就存在咨询客服状态的上的消息保存,如果没有咨询客服的行为,就不存在咨询客服状态的上的消息保存,咨询客服状态是由一条 proceed 边和下游的购买状态相连。
下面以一个打车的例子来展示状态是如何流转的,规则见下图所示。
以乘客制定行程作为开始,匹配乘客的下单事件,如果这个订单超时还没有被司机接单的话,就把行程事件和下单事件作为结果集往下游输出。
假如消息到来顺序为:行程-->其他-->下单-->其他。
状态流转如下:
1)开始时状态处于行程状态,即等待用户制定行程。
2)当收到行程事件时,匹配行程状态的条件,把行程事件放到结果集中,通过 take 边将状态往下转移到下单状态。
3)由于下单状态上有一条 ignore 边,所以可以忽略收到的其他事件,直到收到下单事件时将其匹配,放入结果集中,并且将当前状态往下转移到超时未接单状态。这时候结果集当中有两个事件:制定行程事件和下单事件。
4)超时未接单状态时,如果来了一些其他事件,同样可以被 ignore 边忽略,直到超时事件的触发,将状态往下转移到最终状态,这时候整个模式匹配成功,最终将结果集中的制定行程事件和下单事件输出到下游。
上面是一个匹配成功的例子,如果是不成功的例子会怎么样?
假如当状态处于超时未接单状态时,收到了一个接单事件,那么就不符合超时未被接单的触发条件,此时整个模式匹配失败,之前放入结果集中的行程事件和下单事件会被清理。
规则条件遵循参考
简单规则:
同一天同一设备IP变更次数 大于 XX 次
设备两次登录间隔小于 XX 秒
相对复杂规则:
同一账户,在XX分钟内转成金额 XX元后,又在XX分钟内转出累加金额大于 XXX元
CEP支持的模式
个体模式
包括单例模式和循环模式。单例模式只接收一个事件,而循环模式可以接收多个事件。
(1)量词
可以在一个个体模式后追加量词,也就是指定循环次数
// 匹配出现4次
start.time(4)
// 匹配出现0次或4次
start.time(4).optional
// 匹配出现2、3或4次
start.time(2,4)
// 匹配出现2、3或4次,并且尽可能多地重复匹配
start.time(2,4).greedy
// 匹配出现1次或多次
start.oneOrMore
// 匹配出现0、2或多次,并且尽可能多地重复匹配
start.timesOrMore(2).optional.greedy
(2)条件
每个模式都需要指定触发条件,作为模式是否接受事件进入的判断依据。CEP中的个体模式主要通过调用.where()、.or()和.until()来指定条件。按不同的调用方式,可以分成以下几类:
① 简单条件
通过.where()方法对事件中的字段进行判断筛选,决定是否接收该事件
start.where(event=>event.getName.startsWith(“foo”))
② 组合条件
将简单的条件进行合并;or()方法表示或逻辑相连,where的直接组合就相当于与and。
Pattern.where(event => …/*some condition*/).or(event => /*or condition*/)
③ 终止条件
如果使用了oneOrMore或者oneOrMore.optional,建议使用.until()作为终止条件,以便清理状态。
④ 迭代条件
能够对模式之前所有接收的事件进行处理;调用.where((value,ctx) => {…}),可以调用ctx.getEventForPattern(“name”)
模式序列
序列模式可以接受多个事件,这在我们下面讲解的案例里面可以看到。
(1)严格近邻
所有事件按照严格的顺序出现,中间没有任何不匹配的事件,由.next()指定。例如对于模式“a next b”,事件序列“a,c,b1,b2”没有匹配。
(2)宽松近邻
允许中间出现不匹配的事件,由.followedBy()指定。例如对于模式“a followedBy b”,事件序列“a,c,b1,b2”匹配为{a,b1}。
(3)非确定性宽松近邻
进一步放宽条件,之前已经匹配过的事件也可以再次使用,由.followedByAny()指定。例如对于模式“a followedByAny b”,事件序列“a,c,b1,b2”匹配为{ab1},{a,b2}。
除了以上模式序列外,还可以定义“不希望出现某种近邻关系”:
.notNext():不想让某个事件严格紧邻前一个事件发生。
.notFollowedBy():不想让某个事件在两个事件之间发生。
需要注意:
①所有模式序列必须以.begin()开始;
②模式序列不能以.notFollowedBy()结束;
③“not”类型的模式不能被optional所修饰;
④可以为模式指定时间约束,用来要求在多长时间内匹配有效。
next.within(Time.seconds(10))
groovy+aviator的介绍
groovy是什么?
Groovy是Java虚拟机的敏捷和动态语言, 提供静态类型检查的能力,并静态地编译成java字节码,以获得健壮性和性能,与所有现有的Java类和库无缝集成,可以在任何可以使用java的地方使用它。
简单解释,就是可以将字符串,静态类型编译成自己需要的java对象或者方法。
groovy是实现动态模板的关键。
案例说明:
Aviator是什么?
Google Aviator是一个高性能、轻量级的 java 语言实现的表达式求值引擎, 主要用于各种表达式的动态求值,aviator支持大部分运算操作符, 包括算术操作符、关系运算符、逻辑操作符、位运算符、正则匹配操作符(=~)、三元表达式(并且支持操作符的优先级和括号强制优先级, 具体请看后面的操作符列表, 支持自定义函数等。
简单解释,就是起到类型正则匹配的效果。
Aviator是实现条件匹配的关键。
案例说明:
了解相关链接跟案例:
google aviator:轻量级Java公式引擎_刘本龙的专栏-CSDN博客_java 公式引擎
groovy+aviator+cep的整合使用
正常cep代码开发流程
一,数据源
二,单场景
匹配结果:
三,多场景
匹配结果:
四,真实业务场景演示代码
代码类:
flink_cep_groovy_aviator –-> com.sjb.test.yarn.dev.CepOnYarnDev
数据源kafka:
flink_cep
数据格式:
{
"operation_type": "24",
"order_number": "2104081206210050",
"driver_code": "100000609882",
"device_type": "8",
"kafka_topic": "test_order_flow",
"canal_ts": "1617853408000",
"update_time": "1617853408436",
"orderflow_code": "dedc67e9dc55461b99da1548fd215c73",
"device_code": "oq6JXwo9KwqQ8jOPvlObJReUpZp4",
"operation_type_desc": "杜岳飞卸货地签到",
"company_code": "410307014961",
"company_member_code": "200000611452",
"create_date": "1617853408431",
"app_code": "100002"
}
数据结果写入的是hbase:
cep_result
查看结果:
Scan ‘cep_result’
cep动态模板
1, 什么是cep动态模板?
在上面的演示中我们看到,条件是固定的情况下,代码是固定写死的,那么我们有多少个任务,就需要开发出多少个代码类,然后编译打包上传运行。
为了业务的灵活需要,需要开发出一套代码,一个包,针对不同的条件提交任务运行。
2, 动态cep模板原理
通过解析参数,拼接代码,再加上groovy+avritor动态生成cep代码,来实现cep动态模板。
3, 案例演示
代码类:
flink_cep_groovy_aviator --> com.sjb.test.yarn.FlinkDynamicCepDemo_2021_04_07_003
cep动态模板+动态规则修改
动态规则修改实现:
修改了cep的源码,添加了一个pattern对象修改功能,核心代码
执行流程:
1,读取mysql的规则数据
2,接入广播流,监控规则数据是否修改
3,动态模板生产cep代码,如果有规则修改,重新生成cep代码
4,执行运行。
代码整合:略
运行演示
flink_cep_groovy_aviator ---> test.BroadcastCepDemo3
单条件(对应mysql表 id =3):
flink_cep_groovy_aviator2 ---> dev.BroadcastCepDev1
{
"rule_id": 1,
"title": "一个条件告警",
"label_content": [{
"condition_name": "begin",
"times": "timesOrMore(3)",
"within": "within(Time.days(1L))",
"sequence": "",
"description": "第一个条件",
"condition_value": "operation_type=='1'"
}],
"select": [{
"condition_name": "begin",
"do": "getList"
}],
"pattern": "combining",
"strategy": "",
"alarm_content": "钉钉告知用户xxxxxxxx,xxxx",
"alarm_url": "www.aaa.com",
"create_time": 1589373560798
}
多条件 :
flink_cep_groovy_aviator2 ---> dev.BroadcastCepDev2
多条件(对应mysql表 id =1):
{
"rule_id": 1,
"title": "三个条件,用户刷单告警_03",
"label_content": [{
"condition_name": "begin",
"times": "",
"sequence": "next('middle')",
"description": "第一个条件",
"condition_value": "operation_type=='1'&&app_code =='100002'"
}, {
"condition_name": "middle",
"description": "第二个条件",
"sequence": "next('end')",
"condition_value": "operation_type=='1'&&string.contains(operation_type_desc,'胡飞成功抢单')"
}, {
"condition_name": "end",
"within": "within(Time.days(1L))",
"description": "第三个条件",
"condition_value": "operation_type=='1'&&device_type =='4'"
}],
"select": [{
"condition_name": "begin",
"do": "getList"
}, {
"condition_name": "middle",
"do": "get"
}, {
"condition_name": "end",
"do": "get"
}],
"pattern": "combining",
"strategy": "AfterMatchSkipStrategy.skipToFirst('begin')",
"alarm_content": "钉钉告知用户xxxxxxxx,xxxx",
"alarm_url": "www.aaa.com",
"create_time": 1589373560798
}
条件麻烦一点的:
flink_cep_groovy_aviator2 ---> dev.BroadcastCepDev3
{
"rule_id": 4,
"title": "两个条件,用户刷单告警_03",
"label_content": [{
"condition_name": "begin",
"sequence": "next('middle')",
"description": "第一个条件",
"condition_value": "operation_type=='1'&&app_code =='100002'"
}, {
"condition_name": "middle",
"description": "第二个条件",
"sequence": "",
"condition_value": "operation_type=='1'&&string.contains(operation_type_desc,'成功抢单')&&(middle_create_date-begin_create_date) <3600000"
}],
"select": [{
"condition_name": "begin",
"do": "getList"
}, {
"condition_name": "middle",
"do": "get"
}],
"time_column": "create_date",
"strategy": "",
"alarm_content": "钉钉告知用户xxxxxxxx,xxxx",
"alarm_url": "www.aaa.com",
"create_time": 1589373560798
}
集群运行效果
还需要完善的就是:各条件之间有关联关系处理
比如:相邻数据时间差小于5秒,规则定义是难点,传参 "time_sub":"a-b < 5"
具体代码
flink_cep_groovy_aviator项目
相关代码
package com.sjb.test.yarn;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.sjb.constant.Constants; import com.sjb.function.CommonFunction; import com.sjb.schema.CustomerDeserializationSchema; import com.sjb.test.HBaseWriter; import com.sjb.test.pattern.PatternExample; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.cep.CEP; import org.apache.flink.cep.PatternFlatSelectFunction; import org.apache.flink.cep.PatternStream; import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector;import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties;import static com.sjb.test.jdbc.JdbcSelect.queryParam;/*** @program: flink_cep_groovy_aviator* @description: todo 实现flink cep 动态模板代码,3个条件的。* @author: Mr.Wang* @create: 2021-04-07 14:01**///todo com.sjb.test.yarn.FlinkDynamicCepDemo_2021_04_07_003 public class FlinkDynamicCepDemo_2021_04_07_003 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//todo 查询mysql 得到结果ParameterTool parameterTool = ParameterTool.fromArgs(args);long timestampParam = parameterTool.getLong("timestamp", 0L);String checkPointParam = parameterTool.get("checkpoint", "");String patternParam = parameterTool.get("prod", ""); // int paramId = parameterTool.getInt("id");int paramId = 1;String param = queryParam(paramId);System.out.println("param = " + param);Properties consumerProperties = getConsumerProperties();FlinkKafkaConsumer<JSONObject> kafkaConsumer = new FlinkKafkaConsumer<JSONObject>("cep_test",new CustomerDeserializationSchema(),consumerProperties);//todo 策略模式kafkaConsumer.setStartFromEarliest();DataStreamSource<JSONObject> sourceStream = env.addSource(kafkaConsumer);KeyedStream<JSONObject, String> stream = sourceStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(5)) {@Overridepublic long extractTimestamp(JSONObject loginEvent) {return loginEvent.getLong("create_date");}}).keyBy(new KeySelector<JSONObject, String>() {@Overridepublic String getKey(JSONObject dataJson) throws Exception {return dataJson.getString("device_code");}});JSONObject paramJson = JSON.parseObject(param);JSONArray selectArray = paramJson.getJSONArray("select");String labelContent = paramJson.getString("label_content");String strategy = paramJson.getString("strategy");String title = paramJson.getString("title");//todo 传入的数据应该是一个字符串System.out.println("paramJson = " + paramJson);//todo 打印数据: // sourceStream.print("==>");//todo 传入参数,返回一个 Pattern对象Pattern<JSONObject, JSONObject> pattern = new PatternExample(paramJson).pattern();try {PatternStream patternStream = CEP.pattern(stream, pattern);//todo 这里要不要做动态生成SingleOutputStreamOperator select = patternStream.flatSelect(new PatternFlatSelectFunction<JSONObject, List<JSONObject>>() {@Overridepublic void flatSelect(Map<String, List<JSONObject>> map, Collector<List<JSONObject>> out) throws Exception {if (selectArray.size() > 1) {List<JSONObject> list = new ArrayList<>();for (int i = 0; i < selectArray.size(); i++) {JSONObject select = selectArray.getJSONObject(i);String conditionName = select.getString("condition_name");List<JSONObject> begin = map.get(conditionName);list.addAll(begin);}out.collect(list);} else {JSONObject select = selectArray.getJSONObject(0);String conditionName = select.getString("condition_name");List<JSONObject> rs = map.get(conditionName);out.collect(rs);}}});select.print("out输出:");select.addSink(new HBaseWriter());//todo 侧流输出/*SingleOutputStreamOperator result = pattern1.process(new PatternProcessFunction<LoginEvent, String>() {@Overridepublic void processMatch(Map<String, List<LoginEvent>> map, Context context, Collector<String> out) throws Exception {List<LoginEvent> begin1 = map.get("begin");String begin = map.get("begin").get(0).toString();String next = map.get("next").get(0).toString();out.collect("begin=" + begin + "_____" + "next=" + next);}});*/} catch (Exception e) {e.printStackTrace();}try {env.execute(title);} catch (Exception e) {e.printStackTrace();}}public static Properties getConsumerProperties() {Properties properties = new Properties();properties.put("bootstrap.servers", Constants.BOOTSTRAP_SERVERS);properties.put("group.id", Constants.GROUP_ID);properties.setProperty("flink.partition-discovery.interval-millis", 300 * 1000 + ""); // 自动发现消费的partition变化return properties;}public static Properties getSinkProperties() {Properties props = new Properties();props.setProperty("bootstrap.servers", Constants.BOOTSTRAP_SERVERS);props.setProperty("buffer.memory", Constants.BUFFER_MEMORY);props.setProperty("batch.size", Constants.BATCH_SIZE);props.setProperty("linger.ms", Constants.LINGER_MS);props.setProperty("max.request.size", Constants.MAX_REQUEST_SIZE);props.setProperty("acks", Constants.ACKS);props.setProperty("retries", Constants.RETRIES);props.setProperty("retry.backoff.ms", Constants.RETRY_BACKOFF_MS);return props;} }
flink_cep_groovy_aviator2项目
相关代码
package dev;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.sjb.constant.Constants; import com.sjb.function.CommonFunction; import com.sjb.schema.CustomerDeserializationSchema; import com.sjb.test.pattern.PatternExample; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.cep.CEP; import org.apache.flink.cep.PatternStream; import org.apache.flink.cep.RichPatternFlatSelectFunction; import org.apache.flink.cep.listener.CepListener; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector;import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties;import static com.sjb.test.jdbc.JdbcSelect.queryParam;/*** @program: flink_cep_groovy_aviator2* @description: todo 从mysql查询条件,多条件的。* @author: Mr.Wang* @create: 2021-04-09 16:10**///todo test.BroadcastCepDev2 public class BroadcastCepDev2 {private static String logStr = "{\"operation_type\":\"1\",\"order_number\":\"1\",\"type\":\"in\",\"ip\":\"深圳\",\"device_code\":\"a\",\"create_date\":\"1616653174000\"}";public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//todo 修改为eventTimeenv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);ParameterTool parameterTool = ParameterTool.fromArgs(args);//todo 传入mysql对应的规则id // String ruleId = parameterTool.getRequired("rule_id");// String ruleId = "2";String ruleId = "1";String ruleInfoStr = queryParam(ruleId);System.out.println("打印从mysql查询出来的规则ruleInfoStr = " + ruleInfoStr);JSONObject ruleJSON = JSON.parseObject(ruleInfoStr);System.out.println("ruleJSON = " + ruleJSON);//todo 读取kafka的数据Properties consumerProperties = getConsumerProperties();FlinkKafkaConsumer<JSONObject> kafkaConsumer = new FlinkKafkaConsumer<JSONObject>("flink_cep",new CustomerDeserializationSchema(),consumerProperties);//todo 策略模式kafkaConsumer.setStartFromEarliest();DataStreamSource<JSONObject> sourceStream = env.addSource(kafkaConsumer);KeyedStream<JSONObject, String> keyedWatermarkStream = sourceStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(5L)) {@Overridepublic long extractTimestamp(JSONObject element) {return element.getLong("create_date");}}).map(new RichMapFunction<JSONObject, JSONObject>() {private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);}@Overridepublic JSONObject map(JSONObject dataJson) throws Exception {Long middleDate = dataJson.getLong("create_date");String dateStr = format.format(middleDate);dataJson.put("dateStr",dateStr);return dataJson;}}).keyBy(new KeySelector<JSONObject, String>() {@Overridepublic String getKey(JSONObject dataJson) throws Exception {return dataJson.getString("device_code");}});//sourceStream.print("主数据流:");MapStateDescriptor<String, String> mapStateDescriptor = new MapStateDescriptor<String, String>("register", Types.STRING, Types.STRING);FlinkKafkaConsumer<JSONObject> broadcastKafkaConsumer = new FlinkKafkaConsumer<JSONObject>("cep.fengkong.test",new CustomerDeserializationSchema(),consumerProperties);//todo 策略模式broadcastKafkaConsumer.setStartFromEarliest();//todo 这里要修改成kafka的binglog数据DataStreamSource<JSONObject> dataStreamSource = env.addSource(broadcastKafkaConsumer);DataStream<JSONObject> ruleStream = dataStreamSource.flatMap(new RichFlatMapFunction<JSONObject, JSONObject>() {@Overridepublic void flatMap(JSONObject data, Collector<JSONObject> out) throws Exception {JSONArray dataJSONArray = data.getJSONArray("data");for (int i = 0; i < dataJSONArray.size(); i++) {JSONObject json = dataJSONArray.getJSONObject(i);out.collect(json);}}});// ruleStream.print("--->");BroadcastStream<JSONObject> broadcastKafkaStream = ruleStream.broadcast(mapStateDescriptor);// //todo 这里要修改成kafka的binglog数据DataStream<JSONObject> connectStream = keyedWatermarkStream.connect(broadcastKafkaStream).process(new KeyedBroadcastProcessFunction<String, JSONObject, JSONObject, JSONObject>() {private MapStateDescriptor<String, String> mapStateDescriptor2;private Boolean notSend = true;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// 这里需要初始化map state 描述mapStateDescriptor2 = new MapStateDescriptor<String, String>("register", Types.STRING, Types.STRING);}@Overridepublic void processElement(JSONObject datalog, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {//todo 处理主流数据 // System.out.println("处理主流数据");ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor2);//todo 将广播数据发送到数据流里面去if (notSend && broadcastState.contains(ruleId)) {String broadcastStateValue = broadcastState.get(ruleId);notSend = false;JSONObject broadcastStateJson = JSON.parseObject(broadcastStateValue);broadcastStateJson.put("device_code",datalog.getString("device_code"));out.collect(broadcastStateJson);}out.collect(datalog);}@Overridepublic void processBroadcastElement(JSONObject value, Context ctx, Collector<JSONObject> collector) throws Exception {//todo 处理广播流System.out.println("新增加需要监控的" + value.toJSONString());BroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor2);boolean exist = broadcastState.contains(value.getString("id"));if (exist) {notSend = true;}broadcastState.put(value.getString("id"), value.toJSONString());}}).keyBy(new KeySelector<JSONObject, String>() {@Overridepublic String getKey(JSONObject dataJson) throws Exception {return dataJson.getString("device_code");}});// connectStream.print("==---->");//todo 在这里添加动态生成代码Pattern<JSONObject, JSONObject> pattern = new PatternExample(ruleJSON).pattern();PatternStream patternStream = CEP.pattern(connectStream, pattern);PatternStream patternStream1 = patternStream.registerListener(new CepListener<JSONObject>() {@Overridepublic void init() {System.out.println("初始化。。。");}@Overridepublic Boolean needChange(JSONObject dataJson) {if (dataJson.containsKey("id") && dataJson.containsKey("name") && dataJson.containsKey("content")) {System.out.println("这条数据是规则数据:" + dataJson);return true;}return false;}@Overridepublic Pattern<JSONObject, ?> returnPattern(JSONObject dataJson) throws Exception {JSONObject ruleJson = dataJson.getJSONObject("content");System.out.println("接收到规则数据数据:" + ruleJson + ",切换逻辑");Pattern<JSONObject, JSONObject> pattern = new PatternExample(ruleJson).pattern();return pattern;}});//todo 这里有问题 就是没办法改动select条件,目前select条件一旦定了就不能修改了 。patternStream1.flatSelect(new RichPatternFlatSelectFunction<JSONObject, List<JSONObject>>() {@Overridepublic void flatSelect(Map<String, List<JSONObject>> map, Collector<List<JSONObject>> out) throws Exception { // JSONArray updateSelect = map.get("begin").get(0).getJSONArray("select");String selectStr1 = CommonFunction.selectStr;JSONArray updateSelect = JSON.parseArray(selectStr1);if (updateSelect.size() > 1) {List<JSONObject> list = new ArrayList<>();for (int i = 0; i < updateSelect.size(); i++) {JSONObject select = updateSelect.getJSONObject(i);String conditionName = select.getString("condition_name"); // String getListOrget = select.getString("do");List<JSONObject> begin = map.get(conditionName);list.addAll(begin);}out.collect(list);} else {JSONObject select = updateSelect.getJSONObject(0);String conditionName = select.getString("condition_name");List<JSONObject> rs = map.get(conditionName);out.collect(rs);}}}).print("out输出==>");env.execute("xxxxx");}public static Properties getConsumerProperties() {Properties properties = new Properties();properties.put("bootstrap.servers", Constants.BOOTSTRAP_SERVERS);properties.put("group.id", Constants.GROUP_ID);properties.setProperty("flink.partition-discovery.interval-millis", 300 * 1000 + ""); // 自动发现消费的partition变化return properties;} }
参考文章
Apache Flink 1.12 Documentation: FlinkCEP - Flink的复杂事件处理
google aviator:轻量级Java公式引擎_刘本龙的专栏-CSDN博客_java 公式引擎
flink cep pattern动态加载_帆了个帆的专栏-CSDN博客
最后
有评论说广播流那里可以改成阿波罗或者nacos 其实都一样,不过使用阿波罗或者nacos 一定要注意依赖冲突
如果有兴趣的老铁可以一键三连,然后评论留言继续交流,私信之前请一键三连。
Flink cep动态模板+cep规则动态修改实践相关推荐
- 【flink】Flink-Cep实现规则动态更新
1.概述 我们是用processfunction实现的cep动态更新,然后看到这个是原生api感觉有趣,研究一下 原文:https://mp.weixin.qq.com/s/mh–wQvAWQq2tD ...
- Apache Flink 实战教程:CEP 实战(转载)
文章目录 原文链接: 一:Flink CEP 概念以及使用场景 1.什么是 CEP 2.Flink CEP 应用场景 3.Flink CEP 原理 二:Flink CEP 程序开发 1.Flink C ...
- Flink-Cep实现规则动态更新
Flink-Cep实现规则动态更新 规则引擎通常对我们的理解就是用来做模式匹配的,在数据流里面检测满足规则要求的数据.有人会问为什么需要规则动态变更呢?直接修改了规则把服务重启一下不就可以了吗,这个当 ...
- 【Spark】SparkStreaming-流处理-规则动态更新-解决方案
SparkStreaming-流处理-规则动态更新-解决方案 image2017-10-27_11-10-53.png (1067×738)elasticsearch-headElasticsearc ...
- SpringBoot整合Drools规则引擎动态生成业务规则
最近的项目中,使用的是flowable工作流来处理业务流程,但是在业务规则的配置中,是在代码中直接固定写死的,领导说这样不好,需要规则可以动态变化,可以通过页面去动态配置改变,所以就花了几天时间去研究 ...
- flink sql udf jar包_Java动态加载Jar实例解析
导读:在实际项目开发中,有时会遇到需动态加载jar包的应用场景.如将Flink的UDF自定义方法制作成方法库(既打成一个Jar),在提交给Flink集群的Job中动态加载jar便可使用.下面将举一个简 ...
- 开源前端 可视化大数据交互前端动态模板
介绍: 如今老板都很在乎公司实力形象 往往会在大厅投放展示大数据巨屏 你是否也想实现这样大数据效果展示 本次带来一套开源的前端可视化大数据交互动态模板网页前端模板,是HTML网页模板 只要稍微懂点前端 ...
- 【Elasticsearch】Elasticsearch 动态模板(Dynamic templates)
1.概述 动态映射请参考: [Elasticsearch]Elasticsearch 7 : 动态映射 dynamic 本博客摘抄自:Elastic Stack 实战手册(早鸟版).pdf 原文可看, ...
- 传奇游戏PC和手机版宣传页模板原版Flash动态炫酷
传奇游戏PC和手机版宣传页模板原版Flash动态炫酷 手机版: 只是一个宣传页面 无后台 html 自己修改里面的图片 链接 和文字即可挂在服务器上使用.传奇双端宣传页 电脑和手机版-CSDN下载传奇 ...
最新文章
- python初学者之网络爬虫_Python初学者之网络爬虫(二)
- Oracle存储过程(增、删、改)写法
- ironpython是什么2.7_是否可以在IronPython2.7.5中使用请求?
- cocos2dx 3.x(屏幕截图的两种方法)
- QT:QObject 简单介绍
- 2005年财富500强(zz)
- 《肖申克的救赎》--[美]斯蒂芬·金
- php 监听redis,swoole如何监听redis数据
- linux搭建ntp发包教程,linux 搭建本地ntp服务器
- 图片怎么改成jpg格式
- Geek Uninstaller
- 廊坊金彩教育:怎么做好人群标签
- 写给迷茫中的程序员兄弟:如何打造最快的职场晋升神话
- PNG图片压缩原理--屌丝的眼泪 #1
- Appscan安全测试
- JS标准时间时间格式化
- Java:如何选择一个好的Java外包合作伙伴?
- Java并发编程:park线程
- eds能谱图分析实例_基础理论丨一文了解XPS(概念、定性定量分析、分析方法等)...
- 北交大计算机在职非全考研初试心得
热门文章
- java抽象类的子类必须_JAVA——抽象类
- 中国泳装(泳装)市场趋势报告、技术动态创新及市场预测
- 十一、监控和管理Linux进程
- win7适合安哪个版本的python?(赠送Python永久使用安装包)
- oracle的wallet是什么意思,Oracle钱夹的使用{今日wallet失败 明天继续吧}
- html和css的小结
- 论文解读:华盛顿大学教授Pedro Domingos技术论文:机器学习中一些有用的知识(一)...
- 学生信息管理系统-错误总结
- Beyond Compare 4 Linux安装和使用
- java actioncontext_struts2(四) ognl表达式、值栈、actionContext之间的关系