通常我们在提交一个flink cep任务,流程基本上是:开发,打包,部署;例如我们有一个任务:计算在60秒内,连续两次登陆失败的用户

begin("begin").where(_.status=='fail').next("next").where(_.status=="fail").within(Time.seconds(60))

然后又来一个任务:计算60秒内,用户登陆失败1次,然后第二次登陆成功的用户

begin("begin").where(_.status=='fail').next("next").where(_.status=="success").within(Time.seconds(60))

这两个任务,数据的输入,输出都是一样的,唯一的区别就是pattern不同;往常的话我们要重复之前的3个步骤才能完成任务的上线;如果我们能根据flink 任务传入参数,动态生成pattern对象,就能简化任务的上线流程,画个图

如何实现pattern动态加载?为了实现这个功能,可以拆分成两个步骤

1.根据pattern规则转换成pattern字符串
val patternStr="begin(\"begin\").where(_.status==\"fail\").next(\"next\").where(_.status==\"fail\").within(Time.seconds(60))"
2.将pattern字符串转换成pattern对象
val pattern = transPattern(patternStr)

现在先看第2步,假定现在有了patternStr,如何转成pattern对象?

str->obj,第一个能想到的方案就是使用javax.script.ScriptEngine调用groovy脚本的方法生成pattern对象;

groovy 脚本如下:

import com.hhz.flink.cep.pojo.LoginEvent
import com.hhz.flink.cep.patterns.conditions.LogEventCondition
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.windowing.time.Time
def getP(){return Pattern.<LoginEvent>begin("begin").where(new LogEventCondition("getField(eventType)==\"fail\"")).next("next").where(new LogEventCondition("getField(eventType)==\"fail\"")).times(2).within(Time.seconds(3))
}

这个脚本可以以字符串的方式通过groovy脚本引擎加载到内存中,并使用invokeFunction调用getP()方法,就可以返回pattern对象,伪代码如下

String script="def getP(){return Pattern.<>....within(Time.seconds(3)))}";
ScriptEngineManager factory = new ScriptEngineManager();
ScriptEngine engine =  factory.getEngineByName("groovy");
engine.eval(script);
Invocable inv = (Invocable) engine;
Pattern<LoginEvent, LoginEvent> pattern= (Pattern<LoginEvent, LoginEvent>) invocable.invokeFunction("getP");

现在回过头来看第一步:根据pattern规则转换成pattern字符串

在scala中pattern代码如下

begin("begin").where(_.status=='fail').next("next").where(_.status=="fail").within(Time.seconds(60))

where方法可以接受表达式,例如"_.status=='fail'",同时他也可以接受一个SimpleCondition对象,例如

where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent event) {return event.eventType() == "fail";}})

但在groovy中不支持接受"_.status=='fail'"表达式作为函数的参数,所以在生成pattern串是必须将where中的表达式换成SimpleCondition对象;

那问题来了,换成SimpleCondition对象后,我们就得在filter方法中实现表达式的逻辑;这显然不是我们所需要的,这样做的话,patternStr的维护的成本就太高了;如果我们将表达式以字符串的形式传入到SimpleCondition对象中,然后在filter中自动计算表达式的值,就像

where(new LogEventCondition("getField(eventType)==\"fail\""))

filter方法根据表达式getField(eventType)==\"fail\"计算结果,返回ture或false,难点来了,如何根据表达式计算结果,这里就需要引入aviator包,关于aviator我们看几个样例

import com.googlecode.aviator.AviatorEvaluator;
public class TestAviator {public static void main(String[] args) {Long result = (Long) AviatorEvaluator.execute("1+2+3");System.out.println("-------"+result);}
}
结果输出:
-------6

具体看下LogEventCondition

public class LogEventCondition  extends SimpleCondition<LoginEvent> implements Serializable {private String script;static {AviatorEvaluator.addFunction(new GetFieldFunction());}//getField(eventType)==\"fail\"public LogEventCondition(String script){this.script = script;}@Overridepublic boolean filter(LoginEvent value) throws Exception {Map<String, Object> stringObjectMap = Obj2Map.objectToMap(value);//计算表达式的值boolean result = (Boolean) AviatorEvaluator.execute(script, stringObjectMap);return result;}}

到这里,cep pattern动态加载就介绍完了;起初我也是想像某些大厂一样,通过订制一套特有的DSL语法,然后将DSL语句解析转换成pattern,这样的话,非开发同学也就能够在公司的数据平台直接订制实时计算任务;但回想之前给非研发同学培训sql的悲惨经历,放弃了;觉得pattern订制的学习成本还是有点高,交给运营或者产品去搞这事,不靠谱,所以这事还是得研发来干;对于研发来讲pattern最原生的规则就是最好的~~~

------------20210917  更新------------------

补齐下之前遗漏的代码

package com.hhz.flink.cep.patterns.aviator;import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.runtime.function.FunctionUtils;
import com.googlecode.aviator.runtime.type.AviatorJavaType;
import com.googlecode.aviator.runtime.type.AviatorObject;
import com.googlecode.aviator.runtime.type.AviatorString;import java.util.HashMap;
import java.util.Map;public class GetFieldFunction extends HhzFieldFunction {@Overridepublic String getName() {return "getField";}@Overridepublic AviatorString call(Map<String, Object> params, AviatorObject arg1) {AviatorJavaType field = (AviatorJavaType)arg1;String name = field.getName();if(name.contains(".")){return new AviatorString(jsonValue(name, params));}String stringValue = FunctionUtils.getStringValue(arg1, params);return new AviatorString(stringValue);}public static void main(String[] args) {AviatorEvaluator.addFunction(new GetInt());Map<String,Object> m = new HashMap<String, Object>();m.put("person","{age:12,name:\"zhangsan\"}");System.out.println(AviatorEvaluator.execute("getInt(person.age)<12", m));}
}package com.hhz.flink.cep.patterns.aviator;import com.alibaba.fastjson.JSONObject;
import com.googlecode.aviator.runtime.function.AbstractFunction;import java.util.Map;public abstract class HhzFieldFunction extends AbstractFunction {public String jsonValue(String fieldName, Map<String, Object> params){String[] arr = fieldName.split("\\.");String json = params.get(arr[0]).toString();JSONObject object = JSONObject.parseObject(json);return object.getString(arr[1]);}
}

flink cep pattern动态加载相关推荐

  1. flink sql udf jar包_FlinkSQL 动态加载 UDF 实现思路

    导读: 最近在对 Flink 进行平台化,基于 REST API 构建一个平台实现通过纯 SQL 化编写和管理 Job.尽管 Flink官方希望用户将所有的依赖和业务逻辑打成一个fat jar,这样方 ...

  2. 1.12.Flink Kafka-Connector详解、Consumer消费策略设置、动态加载Topic、Consumers Offset 自动提交、Producer、容错等

    1.12.Flink Kafka-Connector详解 1.12.1.Kafka Consumer消费策略设置 1.12.2.Kafka Consumer的容错 1.12.3.动态加载Topic 1 ...

  3. flink sql udf jar包_Java动态加载Jar实例解析

    导读:在实际项目开发中,有时会遇到需动态加载jar包的应用场景.如将Flink的UDF自定义方法制作成方法库(既打成一个Jar),在提交给Flink集群的Job中动态加载jar便可使用.下面将举一个简 ...

  4. Flink Cep 扩展 - 动态规则更新及Pattern间within()

    上一篇文章 <Flink Cep 源码分析>我们可以知道Flink cep中Pattern的创建,state的转换,以及匹配结果的数据.这一篇则对Flink cep的两个痛点进行扩展: 1 ...

  5. Ext JS 4倒计时:动态加载和新的类系统

    Today we're excited to release the first in a series of brand new features in Ext JS 4. Over the nex ...

  6. python 反射和动态加载_Python的反射

    什么是反射 反射是一个很重要的概念,它可以把字符串映射到实例的变量或者实例的方法然后可以去执行调用.修改等操作.它有四个重要的方法: getattr 获取指定字符串名称的对象属性 setattr 为对 ...

  7. flinksql获取系统当前时间搓_FlinkSQL 动态加载 UDF 实现思路

    导读: 最近在对 Flink 进行平台化,基于 REST API 构建一个平台实现通过纯 SQL 化编写和管理 Job.尽管 Flink官方希望用户将所有的依赖和业务逻辑打成一个fat jar,这样方 ...

  8. 用爬虫抓取动态加载数据丨Python爬虫实战系列(6)

    提示:最新Python爬虫资料/代码练习>>戳我直达 前言 抓取动态加载数据 话不多说,开练! 爬虫抓取动态加载数据 确定网站类型 首先要明确网站的类型,即是动态还是静态.检查方法:右键查 ...

  9. 踩坑记15 动态路由 router.options.routes未更新 | vue升级 element-plus未适配vue3.2.x | vite glob导入动态加载组件,不能使用别名alias

    2021.8.12 坑50(vue-router4.addRoute().router.options.routes未更新):进行动态权限获取菜单的设置,使用了addRoute()来添加路由,但是ro ...

最新文章

  1. TensorFlow练习23: “恶作剧”
  2. python data analysis | python数据预处理(基于scikit-learn模块)
  3. 希尔排序增量怎么确定_Python实现希尔排序(已编程实现)
  4. -bash: vi: command not found -bash: ls: command not found
  5. java的IO总结(一)
  6. anaconda conda 切换为清华源
  7. 接unityads_[蛮牛教程]unity接入unity Ads详细流程
  8. GitHub常用优秀开源Android项目
  9. 【原创】 禁用ctfmon.exe 禁止ctfmon.exe自动启动
  10. 国民岳父的“屁民理论”
  11. 论文阅读【域泛化】:ECCV2018|Two at Once: Enhancing Learning and Generalization Capacities via IBN-Net
  12. Git 命令行其实真的很好用
  13. python爬虫下载小说
  14. 如何将图片压缩到100K以内,教你几种免费方法
  15. @Autowired浅析
  16. 以软件测试的角度测试一支笔,软件测试面试:如何测试一支笔(铅笔,钢笔,中性笔)...
  17. 安装SQL server显示重新启动计算机失败解决方法
  18. AIX系统管理界面工具SMIT快捷方式
  19. fckeditor上传突破_方法
  20. Python从入门到转行

热门文章

  1. 4月4日王者服务器维护到几点,王者荣耀7月4日几点开服 王者荣耀7月4日更新维护到几点...
  2. bad magic number in 'application': b'\x03\xf3\r\n': ImportError
  3. 06-02 C# 匿名类
  4. Android 百分比布局(支持AndroidX)
  5. PDBbind Database (version 2020)
  6. Qt之使用QSS设置QPushButton图标和文本的位置
  7. 计算机图像进行滤波的函数,数字图像处理图像滤波.ppt
  8. HG,GIT,SVN版本控制系统
  9. DFT specification file string
  10. android 完美获取音乐文件中的专辑图片并显示