Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

什么是CEP

CEP的全称为Complex Event Processing,中文翻译为复杂事件处理。光看字面意思解释还是很难理解。究竟何为“复杂事件”?通常我们使用Flink处理数据流的时候,只是对每个到来的元素感兴趣,不关注元素之间的关系。即便是有也仅仅是使用有状态算子而已。现在有一种需求,我们需要关注并捕获一系列有特定规律的事件,比方说用户登录,转帐,然后退出(ABC事件连续发生),或者是比如机房连续10次测温均高于50度(A{10,}),我们采用传统方式写Flink程序就比较困难。这时候就轮到Flink CEP大显身手了。

下面为大家讲解下Flink CEP的使用方式

引入依赖

使用Java编写代码需要引入:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_2.11</artifactId><version>1.13.2</version>
</dependency>

其中version对应Flink版本。

使用Scala则需要引入:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep-scala_2.11</artifactId><version>1.13.2</version>
</dependency>

下面的代码均以Java为准。

Pattern API

Pattern即我们在第一节提到的用户登录,转帐,然后退出或者机房连续10次测温均高于50度。和正则表达式的Pattern概念很类似。Pattern就是用户定义连续一系列事件应该具有的特征的编程接口。

我们以一个简单的例子开始:编写一个连续10次以上机房测温高于50度的Pattern。

Pattern<Row, Row> pattern = Pattern.<Row>begin("high").where(new SimpleCondition<Row>() {@Overridepublic boolean filter(Row row) throws Exception {int temp = (int) row.getField("temperature");return temp >= 50;}
}).timesOrMore(10).consecutive();

首先我们调用Patternbegin方法,标记Pattern的开始,需要为开始的Pattern指定一个名称。一个Pattern可以分为多段,每段都具有自己的名称,在后面捕获匹配数据流的元素时候需要用到。begin方法需要指定泛型,即数据源的数据类型。然后跟随的是一个where条件。where方法接收一个SimpleCondition内部类,对用户数据进行解析和条件判断,符合条件的元素返回true。接下来是指定满足条件的元素的个数,这里使用了timesOrMore(n),含义为n次或n次以上。我们注意到后面还有一个consecutive,意味着timesOrMore(n)必须是连续的,中间不能够穿插有其他元素。

我们再举一个用户登录,取款然后退出的例子:

// 分别定义用户的登陆事件,提款事件和登出事件
class UserEvent {}class LoginEvent extends UserEvent {}class WithdrawEvent extends UserEvent {}class LogoutEvent extends UserEvent {}// 下面是Pattern
Pattern.<UserEvent>begin("login").subtype(LoginEvent.class).next("withdraw").subtype(WithdrawEvent.class).timesOrMore(1).consecutive().next("logout").subtype(LogoutEvent.class);

这里定义的Pattern为AB+C,包含三种事件分别命名为"login","withdraw"和"logout"。使用subtype指定满足条件的数据类型。即如果连续到来3个元素分别为LoginEventWithdrawEventLogoutEvent的实例,这3个元素会被捕获。

实例已经讲完了大家对Pattern的使用应该有了初步了解。下面开始逐个分析Pattern API更为详细的配置。

Pattern 组合方式配置

定义组合配置主要是如下3个方法:

  • begin:匹配模版的开始。
  • next:严格匹配,A next B含义为A后必须紧跟着B。
  • followedBy:非严格匹配,不要求非得紧跟着,中间可以穿插其他元素。例如A followedBy B不仅能匹配A B,还能匹配A C B。C未能匹配会被忽略掉。仅仅忽略未被匹配的元素。
  • followedByAny:非严格匹配,比followedBy更加宽松,甚至能忽略掉可被匹配的元素,例如A followedByAny B去匹配A C B1 B2可以匹配到A B1(C被忽略)和A B2(尽管B1符合条件,但是也能被忽略掉,这是followedByAny和followedBy的不同之处)。如果是A followedBy B仅仅能匹配出A B1。
  • notNext:next的否定形式
  • notFollowedBy:followedBy的否定形式,注意notFollowedBy不能用在Pattern的结尾。

指定重复次数

重复次数可以指定0次,1次,n次,n次到m次,n次以上等等。也可以限定这n次之间是否可以穿插其他事件。

配置的方法为:

  • times(n):n次
  • times(n, m):n次到m次
  • optional():带上这个允许出现0次
  • oneOrMore():1次或多次
  • timesOrMore(n):n次或多次
  • greedy():表示尽可能匹配次数多的。例如到来的元素为 A X X B。其中X既满足条件A又满足条件B。对于Pattern A+ B,可以匹配到A X,A X X,X X B,X B和 A X X B。但是对于A (greedy)+ B,Flink会尽可能的多匹配A条件,故只会匹配到A X X B。
  • consecutive():要求Pattern必须连续。
  • allowCombinations():和consecutive相反,不要求连续。

指定条件

  • where():参数为一个内部类,非常类似于filter算子,可编写自定义条件表达式。
  • subtype():指定匹配元素的类型。

匹配后跳过策略

我们经常遇到一个元素可以被成功匹配多次的情况。在实际应用中,一个元素究竟可以被如何匹配,这种行为可以通过匹配后跳过策略来指定。

匹配后跳过策略有如下5种:

  • NO_SKIP: 不跳过,匹配所有的可能性。
  • SKIP_TO_NEXT: 从匹配成功的事件序列中的第一个事件的下一个事件开始进行下一次匹配。
  • SKIP_PAST_LAST_EVENT: 从匹配成功的事件序列中的最后一个事件的下一个事件开始进行下一次匹配。
  • SKIP_TO_FIRST(patternName): 从匹配成功的事件序列中第一个对应于patternName的事件开始进行下一次匹配。
  • SKIP_TO_LAST(patternName): 从匹配成功的事件序列中最后一个对应于patternName的事件开始进行下一次匹配。

匹配后跳过策略在定义组合配置的时候指出:

Pattern.begin("patternName", skipStrategy);

skipStrategy通过如下方式创建:

AfterMatchSkipStrategy.noSkip();
AfterMatchSkipStrategy.skipToNext();
AfterMatchSkipStrategy.skipPastLastEvent();
AfterMatchSkipStrategy.skipToFirst(patternName);
AfterMatchSkipStrategy.skipToLast(patternName);

时间限制

除了从数据上约束之外,Flink CEP还支持从时间维度来指定Pattern。

  • within(时间段):这些满足条件的一连串元素必须发生在指定时间段之内。

创建PatternStream

通过CEP.pattern方法,关联数据源DataStream和上面章节我们创建出的Pattern

PatternStream<Event> patternStream = CEP.pattern(input, pattern);
// 或者我们指定comparator,不是很常用
PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);

其中第二个方法传入的Comparator可以自定义元素比较的方法,用于当元素的Event Time相同的时候来判断先后顺序。例如我们编写一个自定义的Row类型比较逻辑:

new EventComparator<Row>() {@Overridepublic int compare(Row o1, Row o2) {// 自定义比较逻辑在此return 0;}
};

需要注意的是,Flink 1.12版本之后CEP的PatternStream默认使用Event Time。如果业务使用的事Processing Time,必须要明确配置。

PatternStream<Event> patternStream = CEP.pattern(input, pattern).inProcessingTime();

从PatternStream中选出捕获的元素

select方法接收一个PatternSelectFunction类型参数,需要用户实现这个接口,编写自己的处理逻辑。接口如下所示:

public interface PatternSelectFunction<IN, OUT> extends Function, Serializable {OUT select(Map<String, List<IN>> var1) throws Exception;
}

接收到的参数类型为Map<String, List<IN>>,其中map的key为Pattern组合方式中指定的pattern名称,value为匹配到的一系列元素组成的集合。将处理过后的数据通过return返回即可。

还可以使用process函数:

CEP.pattern(...).process(new PatternProcessFunction<IN, OUT>() {@Overridepublic void processMatch(Map<String, List<IN>> map, Context context, Collector<OUT> collector) throws Exception {// ...}
});

这里的参数类型和PatternSelectFunction类似,但是处理过后的数据不能通过return返回,需要使用collector收集。

对于Pattern可以匹配到,但是超时的元素(上一章within配置的时间段),默认来说会被丢弃。如果我们需要捕获这种超时的匹配结果,可以使用自定义的PatternProcessFunction,实现TimedOutPartialMatchHandler。如下所示:

class CustomPatternProcessFunction extends PatternProcessFunction<Object, Object> implements TimedOutPartialMatchHandler<Object> {@Overridepublic void processMatch(Map<String, List<Object>> map, Context context, Collector<Object> collector) throws Exception {// ...}@Overridepublic void processTimedOutMatch(Map<String, List<Object>> map, Context context) throws Exception {Object element = map.get("key").get(0);context.output(outputTag, element);}
}

上面的例子将超时的element放入旁路数据,绑定到一个outputTag上。OutputTag用于标记一组旁路输出的元素。下面是创建OutputTag和获取旁路数据元素的方法:

// Object为旁路输出的元素类型
OutputTag<Object> outputTag = new OutputTag<>("late-element");// CEP操作...DataStream<Object> sideOutput = patternStream.getSideOutput(outputTag);

如果使用event time模式,一定会有来迟的元素。如果我们需要对这些元素进行捕获处理,可以和上面一样,使用旁路输出:

patternStream.sideOutputLateData(lateDataOutputTag)

新版变化

从Flink 1.12开始,CEP默认从Processing Time改为Event Time。使用时务必要注意。详情参见Flink 升级1.12版本的坑。

使用SQL方式编写CEP

除了使用Pattern API,Flink还支持使用SQL方式编写CEP,相比而言SQL更为灵活,但是需要学习SQL match_recognize子句的语法。SQL方式编写CEP参见Flink 使用之 CEP(SQL方式)。

链接:http://events.jianshu.io/p/a3931e203324

Flink 使用之 CEP相关推荐

  1. Flink中的CEP(二)

    目录 12.4 模式的检测处理 12.4.1 将模式应用到流上 12.4.2 处理匹配事件 12.4.3 处理超时事件 12.4.4 处理迟到数据 12.5 CEP 的状态机实现 12.6 本章总结 ...

  2. 【flink】flink 复杂事件处理 CEP

    @[ 1.概述 转载:Flink系列 13. 复杂事件处理 CEP 请到原文查看... 1. 什么是 CEP? CEP 是 Flink 中实现的复杂事件处理库,(Complex Event Proce ...

  3. Flink中的CEP(一)

    目录 十二:Flink CEP 12.1 基本概念 12.1.1 CEP 是什么 12.1.2 模式(Pattern) 12.1.3 应用场景 12.2 快速上手 12.2.1 需要引入的依赖 12. ...

  4. Flink SQL篇,SQL实操、Flink Hive、CEP、CDC、GateWay

    Flink源码篇,作业提交流程.作业调度流程.作业内部转换流程图 Flink核心篇,四大基石.容错机制.广播.反压.序列化.内存管理.资源管理 Flink基础篇,基本概念.设计理念.架构模型.编程模型 ...

  5. 从滴滴的Flink CEP引擎说起

    从滴滴的Flink CEP引擎说起 本文转载自 https://www.cnblogs.com/cx2016/p/11647110.html. CEP业务场景 复杂事件处理(Complex Event ...

  6. 一文学会 Flink CEP(以直播平台监控用户弹幕为例)

    我们在看直播的时候,不管对于主播还是用户来说,非常重要的一项就是弹幕文化.为了增加直播趣味性和互动性, 各大网络直播平台纷纷采用弹窗弹幕作为用户实时交流的方式,内容丰富且形式多样的弹幕数据中隐含着复杂 ...

  7. Flink Cep 源码分析

    复合事件处理(Complex Event Processing,CEP)是一种基于动态环境中事件流的分析技术,事件在这里通常是有意义的状态变化,通过分析事件间的关系,利用过滤.关联.聚合等技术,根据事 ...

  8. 什么是cep算子_Flink中的CEP复杂事件处理 (源码分析)

    其实CEP复杂事件处理,简单来说你可以用通过类似正则表达式的方式去表示你的逻辑,表现能力非常的强,用过的人都知道 开篇先偷一张图,整体了解FlinkCEP中的  一种重要的图  NFA FlinkCE ...

  9. Flink 在爱奇艺广告业务的实践

    简介:5 月 22 日北京站 Flink Meetup 分享的议题. 本文整理自爱奇艺技术经理韩红根在 5 月 22 日北京站 Flink Meetup 分享的议题<Flink 在爱奇艺广告业务 ...

最新文章

  1. 基恩士上位机链路通讯_【原创分享】ABB机器人与视觉控制器的通讯
  2. 对硕士而言,编制和稳定究竟有多重要?
  3. java jar包的路径
  4. 第十八章 33用重载输出运算符函数实现字符串的输出
  5. Bootstrap初学(一)
  6. PHP——大话PHP设计模式——SPL数据结构
  7. android-api28转换到api19-不能编译
  8. Python:正则表达式
  9. dhcp failover linux,ISC dhcp failover的mclt参数很重要啊
  10. 适用于Photoshop的人像美容磨皮ps插件:Beauty Retouch Panel 2021 Mac
  11. Lucene.Net的中文分词组件AdvancedChineseAnalyzer
  12. java final 详解_java中Final详解
  13. Android读取电话薄中的电话号码
  14. 神奇的mysql查询
  15. 数据通信原理_卫星通信系统原理什么 卫星通信系统原理介绍【图文】
  16. 安国U盘量产无法识别芯片处理
  17. 我用 Python 写了个基金涨跌通知助手
  18. 案例:恒丰银行——大数据实时流处理平台
  19. linux环境变量大全,Linux环境变量总结
  20. 《灵飞经》②东岛门人 第二章 知音可赏

热门文章

  1. Loadrunner11在win10下的安装、汉化与破解方法
  2. JAVA-线程优先级setPriority
  3. 3G、3G手机及上网本
  4. Linux小小白入门教程(六):创建和删除文件夹
  5. 对Java高级程序员有益的十个网站
  6. OpenGL-正轴测图
  7. hadoop大数据开发技术学习笔记第三天:(前序)MySQL数据库进阶
  8. 如何用ajax做登录页面,ajax如何制作登录页面?登录页面ajax的请求详解(附完整实例)...
  9. 常用编辑器的快捷键tr
  10. 学习使我快乐 第十一天