需求:每隔2s 求之前10s内的url的访问量topN

需求分析:

1.隔2s 算10s 滑动窗口

2. topN分成两部分看,

①算出10s内每个url的访问量

这里有个问题是同时计算所有每个url的访问量还是分别计算每个url的访问量

同时计算 同时计算10s内所有的url的访问量

分别计算//10s内来一条记录一条

env.addSource()
.assignTimestampsAndWatermarks
.keyBy( url )
.aggregate() //这里的只是窗口内来一条数据计算一下每个url的count,并没有排序

.keyBy( windowEnd) //在aggregate那一步可以获取窗口信息,

.process(new KeyedProcessFunction) //这里涉及到状态编程

同时计算所有数据

env.addSource()
        .keyBy(data -> true)//这里是划分到一个并行度了
        .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .process(new MyProcess());//这里处理一个窗口内的所有数据可以拿到topN

这个和上面的一样

env.addSource()

.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))

.process(new MyProcess())

这三种 第一种看起来是最复杂的两个keyBy,但是目前是最推荐第一种的。

因为这个 充分发挥了并行的优势,同时数据来一条处理一条。如果是有千万上亿数据在同一个窗口,第二个和第三个需要的资源更多。

代码。复制即可运行。


import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.sql.Timestamp;
import java.util.*;public class KeyedProcessTopNTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从自定义数据源读取数据SingleOutputStreamOperator<Event> eventStream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));//需要按照url分组,求出每个url的访问量SingleOutputStreamOperator<UrlViewCount> urlCountStream =eventStream.keyBy(data -> data.url).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))//这里为什么不用reduce 因为之前是Event 我们想要UrlViewCount//这里为什么不用process 因为process效率比较低。需要一起处理.aggregate(new UrlViewCountAgg(), new UrlViewCountResult());SingleOutputStreamOperator<String> result = urlCountStream.keyBy(data -> data.windowEnd).process(new TopN(2));result.print("result");//process处理SingleOutputStreamOperator<String> process = eventStream.keyBy(data -> true) //这里是划分到一个并行度了.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).process(new MyProcess());process.print("process");//这么写也可以和上面的process一样
//        eventStream
//                .windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
//                .process(new MyProcess2());// 对结果中同一个窗口的统计数据,进行排序处理env.execute();}// 自定义增量聚合public static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(Event value, Long accumulator) {return accumulator + 1;}@Overridepublic Long getResult(Long accumulator) {return accumulator;}@Overridepublic Long merge(Long a, Long b) {return null;}}// 自定义全窗口函数,只需要包装窗口信息public static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> {@Overridepublic void process(String url, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {// 结合窗口信息,包装输出内容Long start = context.window().getStart();Long end = context.window().getEnd();out.collect(new UrlViewCount(url, elements.iterator().next(), start, end));}}// 自定义处理函数,排序取top npublic static class TopN extends KeyedProcessFunction<Long, UrlViewCount, String> {// 将n作为属性private Integer n;// 定义一个列表状态 //为什么这里不直接赋值,因为还没有获取到上下文环境private ListState<UrlViewCount> urlViewCountListState;public TopN(Integer n) {this.n = n;}@Overridepublic void open(Configuration parameters) throws Exception {// 从环境中获取列表状态句柄urlViewCountListState = getRuntimeContext().getListState(new ListStateDescriptor<>("url-view-count-list", Types.POJO(UrlViewCount.class)));}@Overridepublic void processElement(UrlViewCount value, Context ctx, Collector<String> out) throws Exception {// 将count数据添加到列表状态中,保存起来urlViewCountListState.add(value);// 注册 window end + 1ms后的定时器,等待所有数据到齐开始排序ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey() + 1);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {// 将数据从列表状态变量中取出,放入ArrayList,方便排序ArrayList<UrlViewCount> urlViewCountArrayList = new ArrayList<>();for (UrlViewCount urlViewCount : urlViewCountListState.get()) {urlViewCountArrayList.add(urlViewCount);}// 清空状态,释放资源urlViewCountListState.clear();// 排序urlViewCountArrayList.sort(new Comparator<UrlViewCount>() {@Overridepublic int compare(UrlViewCount o1, UrlViewCount o2) {return o2.count.intValue() - o1.count.intValue();}});// 取前两名,构建输出结果StringBuilder result = new StringBuilder();result.append("========================================\n");result.append("窗口结束时间:" + new Timestamp(timestamp - 1) + "\n");for (int i = 0; i < this.n; i++) {UrlViewCount UrlViewCount = urlViewCountArrayList.get(i);String info = "No." + (i + 1) + " "+ "url:" + UrlViewCount.url + " "+ "浏览量:" + UrlViewCount.count + "\n";result.append(info);}result.append("========================================\n");out.collect(result.toString());}}public static class MyProcess extends ProcessWindowFunction<Event, String, Boolean, TimeWindow> {@Overridepublic void process(Boolean s, Context context, Iterable<Event> iterable, Collector<String> collector) throws Exception {HashMap<String, Long> urlCountMap = new HashMap<>();// 遍历窗口中数据,将浏览量保存到一个 HashMap 中for (Event event : iterable) {String url = event.url;if (urlCountMap.containsKey(url)) {long count = urlCountMap.get(url);urlCountMap.put(url, count + 1L);} else {urlCountMap.put(url, 1L);}}ArrayList<Tuple2<String, Long>> mapList = new ArrayList<Tuple2<String, Long>>();// 将浏览量数据放入ArrayList,进行排序for (String key : urlCountMap.keySet()) {mapList.add(Tuple2.of(key, urlCountMap.get(key)));}mapList.sort(new Comparator<Tuple2<String, Long>>() {@Overridepublic int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {return o2.f1.intValue() - o1.f1.intValue();}});// 取排序后的前两名,构建输出结果StringBuilder result = new StringBuilder();System.out.println(urlCountMap);result.append("========================================\n");int size = urlCountMap.size();int top = Integer.min(size, 2);for (int i = 0; i < top; i++) {Tuple2<String, Long> temp = mapList.get(i);String info = "浏览量No." + (i + 1) +" url:" + temp.f0 +" 浏览量:" + temp.f1 +" 窗口结束时间:" + new Timestamp(context.window().getEnd()) + "\n";result.append(info);}result.append("========================================\n");collector.collect(result.toString());}}public static class ClickSource implements SourceFunction<Event> {// 声明一个布尔变量,作为控制数据生成的标识位private Boolean running = true;@Overridepublic void run(SourceContext<Event> ctx) throws Exception {Random random = new Random();    // 在指定的数据集中随机选取数据String[] users = {"Mary", "Alice", "Bob", "Cary"};String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};while (running) {ctx.collect(new Event(users[random.nextInt(users.length)],urls[random.nextInt(urls.length)],Calendar.getInstance().getTimeInMillis()));//这里也可以发送水位线//           ctx.collectWithTimestamp();
//            ctx.emitWatermark(new Watermark(1));// 隔1秒生成一个点击事件,方便观测Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}}public static class Event {public String user;public String url;public Long timestamp;public Event() {}public Event(String user, String url, Long timestamp) {this.user = user;this.url = url;this.timestamp = timestamp;}@Overridepublic String toString() {return "Event{" +"user='" + user + '\'' +", url='" + url + '\'' +", timestamp=" + new Timestamp(timestamp) +'}';}}public static class UrlViewCount {public String url;public Long count;public Long windowStart;public Long windowEnd;public UrlViewCount() {}public UrlViewCount(String url, Long count, Long windowStart, Long windowEnd) {this.url = url;this.count = count;this.windowStart = windowStart;this.windowEnd = windowEnd;}@Overridepublic String toString() {return "UrlViewCount{" +"url='" + url + '\'' +", count=" + count +", windowStart=" + new Timestamp(windowStart) +", windowEnd=" + new Timestamp(windowEnd) +'}';}}}

结果:都没有问题。

flink案例之求TopN相关推荐

  1. 案例:分组求top1、求topN

    ==== 案例:分组求top1 自定义GroupingComparator求取topN GroupingComparator是mapreduce当中reduce端的一个功能组件,主要的作用是决定哪些数 ...

  2. 数组小案例(求数组最大最小值、反转数组中元素、指定元素第一次出现的索引)

    数组小案例(求数组最大最小值.反转数组中元素.指定元素第一次出现的索引) 练习1 数组最值的求取 定一个数组,求数组中的最大值和最小值 public class Demo01 {public stat ...

  3. 数据湖架构Hudi(五)Hudi集成Flink案例详解

    五.Hudi集成Flink案例详解 5.1 hudi集成flink flink的下载地址: https://archive.apache.org/dist/flink/ Hudi Supported ...

  4. Java编程语言学习:Java语言基础案例(如求两门课分数之差、三门课平均分、关系运算符作比较)之详细攻略

    Java编程语言学习:Java语言基础案例(如求两门课分数之差.三门课平均分.关系运算符作比较)之详细攻略 目录 Java语言基础案例 1.求两门课分数之差.三门课平均分: 2.关系运算符作比较

  5. MR的案例:求每个部门的工资总额

    MR的案例:求每个部门的工资总额 1.表:员工表emp SQL: select deptno,sum(sal) from emp group by deptno; DEPTNO   SUM(SAL) ...

  6. 【赵强老师】MapReduce编程案例之求工资总额

    先看视频. [赵强老师]MapReduce编程案例之求工资总额 Hadoop MapReduce是一个软件框架,基于该框架能够容易地编写应用程序,这些应用程序能够运行在由上千个商用机器组成的大集群上, ...

  7. MapReducer——MapReduce编程案例:求部门的工资总额(2)

    MapReduce编程案例:求部门的工资总额 1.员工表  SQL:select deptno,sum(sal) from emp group by deptno; 2.分析数据处理的过程 3.开发程 ...

  8. mysql分组聚合->连表联查->求topn

    首先建立两个表如下: CREATE TABLE `student` (`id` INT(11) NOT NULL AUTO_INCREMENT,`name` VARCHAR(255) NOT NULL ...

  9. python求矩阵的秩_【案例】求出矩阵的最简型?——sympy登场

    案例介绍 学过线性代数的同学都知道,在将矩阵进行初等变换化为最简型时过程有多么繁琐.今天,给大家带来一个小项目--使用 Python 化简矩阵. 将要学习:使用特殊的科学计算库--sympy,来化简矩 ...

最新文章

  1. 转-Redis学习手册(目录)
  2. 不用卷积,也能生成清晰图像,华人博士生首次尝试用两个Transformer构建一个GAN
  3. html进度条圆圈渐变色,HTML5 canvas带渐变色的圆形进度条动画
  4. Linux内存中的 buffer 和 cache
  5. 阿里 双11 同款流控降级组件 Sentinel Go 正式 GA,助力云原生服务稳稳稳
  6. Property 'submit' of object #HTMLFormElement is not a function
  7. 干货首发,能够清理,带动画的自己定义控件CuteEditText
  8. Harris角点检测算法优化
  9. 大数据_Flink_数据处理_运行时架构3_yarn上作业提交流程---Flink工作笔记0018
  10. 2019 十大国产开源项目来势汹汹!
  11. Java操作MongoDB(聚合函数)向Mongo插入及查询数据
  12. GPX格式地图轨迹生成python
  13. Android期末复习题库
  14. Python基础入门篇【26】--python基础入门练习卷B
  15. mysql模糊查询语句怎么不区分大小写
  16. python 数据挖掘_Python数据挖掘框架scikit数据集之iris
  17. ChatGPT团队揭秘:3清华、1北大、1华科
  18. 服务器光纤信号灯,光纤收发器的六个指示灯都代表什么?
  19. 更改电脑用户名(C:\Users\用户名)
  20. 读彬彬有礼压缩感知相关论文笔记3——沙威程序解读

热门文章

  1. 分数阶傅里叶变换Transformer
  2. 插值算法之:拉格朗日插值
  3. Qt(mingw)+GDAL位图转矢量图写入shp或json文件
  4. css设置高度和宽度相同
  5. ssm高校党员信息管理系统
  6. 13 【操作mysql数据库】
  7. C语言笔记-26-网络-UDP网络编程
  8. 读书笔记系列--VB2005-菜根谭
  9. DataView 构造
  10. GitHub操作流程