0x1 摘要

最近业务要实时统计半小时维度的UV、PV数据,经过调研准备用Flink时间窗来实现,主要是Flink对eventTime的支持,可以做到更精准的统计,由于第一次尝试使用Flink,所以过程中遇到不少问题,记录下来方便后续查阅。

0x2 执行计划输出JSON问题

Flink对执行计划分析提供了支持,可以通过代码将执行计划打出来,并利用官网提供的图生成工具可以方便分析,通过env.getExecutionPlan()方法可以获取JSON格式的执行计划,将JSON字符串拷贝到http://flink.apache.org/visualizer/网站文本框就可以查看。
但我们在项目中调用env.getExecutionPlan()方法后报以下异常信息:

Caused by: java.lang.IllegalArgumentException: Comparison method violates its general contract!at java.util.TimSort.mergeLo(TimSort.java:777)at java.util.TimSort.mergeAt(TimSort.java:514)at java.util.TimSort.mergeForceCollapse(TimSort.java:457)at java.util.TimSort.sort(TimSort.java:254)at java.util.Arrays.sort(Arrays.java:1512)at java.util.ArrayList.sort(ArrayList.java:1454)at java.util.Collections.sort(Collections.java:175)at org.apache.flink.streaming.api.graph.JSONGenerator.getJSON(JSONGenerator.java:61)at org.apache.flink.streaming.api.graph.StreamGraph.getStreamingPlanAsJSON(StreamGraph.java:663)... 2 more

通过异常信息可以知道问题发生在TimSort排序算法,意思就是比较方法违反约束,具体约束规范大家可以自行网上查阅:自反性、传递性、对称性。
我们来看一下Flink的源码,只要看JSONGenerator61行就可以:

public String getJSON() throws JSONException {JSONObject json = new JSONObject();JSONArray nodes = new JSONArray();json.put("nodes", nodes);List<Integer> operatorIDs = new ArrayList<Integer>(streamGraph.getVertexIDs());Collections.sort(operatorIDs, new Comparator<Integer>() {@Overridepublic int compare(Integer o1, Integer o2) {// put sinks at the backif (streamGraph.getSinkIDs().contains(o1)) {return 1;} else if (streamGraph.getSinkIDs().contains(o2)) {return -1;} else {return o1 - o2;}}});visit(nodes, operatorIDs, new HashMap<Integer, Integer>());return json.toString();
}

从源码可以看在对操作ID做排序时Flink自己实现compare方法,具体这个方法的实际意义不是很明白,有明白的赐教一下。后来通过网上查阅已经有人提过此issues,地址:https://issues.apache.org/jira/browse/FLINK-8498,但状态是关闭的,也没有回复什么时候解决,但我们通过查看Flink GitHub源码发现,此处实现已经发生变更,源码地址https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
新实现源码:

public String getJSON() {ObjectNode json = mapper.createObjectNode();ArrayNode nodes = mapper.createArrayNode();json.put("nodes", nodes);List<Integer> operatorIDs = new ArrayList<Integer>(streamGraph.getVertexIDs());Collections.sort(operatorIDs, new Comparator<Integer>() {@Overridepublic int compare(Integer idOne, Integer idTwo) {boolean isIdOneSinkId = streamGraph.getSinkIDs().contains(idOne);boolean isIdTwoSinkId = streamGraph.getSinkIDs().contains(idTwo);// put sinks at the backif (isIdOneSinkId == isIdTwoSinkId) {return idOne.compareTo(idTwo);} else if (isIdOneSinkId) {return 1;} else {return -1;}}});visit(nodes, operatorIDs, new HashMap<Integer, Integer>());return json.toString();
}

我猜测是因为违反了自反性导致的错误,那这个问题怎么解决呢?有两种方案:

  • 方案一:去掉env.getExecutionPlan()不打印执行计划
  • 方案二:设置JVM参数-Djava.util.Arrays.useLegacyMergeSort=true

0x3 不能连续split问题

场景描述,直接看拓扑图:

希望达到上图的流拆分,但我开开心心把代码写后发布线上运行没有任何异常,等到验证数据时才发现最终统计数据不准A-1A-2的结果都是一样的,
issues地址:https://issues.apache.org/jira/browse/FLINK-5031

0x4 不能先process操作再split

在发现不能连续split后,只能想其他办法,将拓扑图改为:

改为此方案后,线下运行直接报错,异常信息:

Exception in thread "main" java.lang.NullPointerExceptionat org.apache.flink.streaming.api.graph.StreamGraph.addOutputSelector(StreamGraph.java:444)at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformSplit(StreamGraphGenerator.java:267)at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:176)at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformSelect(StreamGraphGenerator.java:282)at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:178)at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformPartition(StreamGraphGenerator.java:241)at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:184)at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:527)at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:166)at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generateInternal(StreamGraphGenerator.java:132)at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:124)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1528)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionPlan(StreamExecutionEnvironment.java:1540)... 2 more

网上查阅发现存在issues,地址:https://issues.apache.org/jira/browse/FLINK-9141
最终改为先splitprocess方法搞定,拓扑图如下:

Flink 1.4.2 版本踩过的坑相关推荐

  1. flink与flink-client的版本对应

    目前flink版本从0.6~1.14.0对应的flink-client版本有 无版本号,2.10,2.11,2.12这四种版本 无版本号对应flink(0.9版本和之前的版本)(一般都不会用到) 2. ...

  2. vue项目中更换tinymce版本踩坑

    项目需求: vue项目中实现多图片批量上传功能 问题: tinymce富文本编辑器的多图片批量上传插件 支持版本:5.0.4+ 项目中现有的富文本编辑器版本:4.9.4 为实现这一功能选择更换tiny ...

  3. Angular教程英雄之旅版本踩坑记录

    Angular教程英雄之旅版本踩坑记录 前言 问题1 问题2 问题3 问题4 总结 前言 这两天心血来潮,跟着Angular官网的教程英雄之旅(https://angular.cn/tutorial) ...

  4. java项目经理也就那么回事_网易PM | 我们之前在需求评审环节踩过的坑...

    原本觉得需求评审也就那么回事儿,大家应该都差不多这么做的,没啥好说的.不过前不久有一位同学问起来我们是怎么做需求评审的,然后发现有一些团队的做法可能还不大一样,他们也还踩着我们之前踩过的坑,他们还在探 ...

  5. Redis 集群部署及踩过的坑

    本文目标 要在单台机器上搭建Redis集群,方式是通过不同的TCP端口启动多个实例,然后组成集群,同时记录在搭建过程中踩过的坑. 安装准备 centos版本:6.7 redis版本:3.2.3 安装方 ...

  6. AWS Device Farm介绍及Appium踩过的坑

    本文记录了在AWS Device Farm上进行Appium TestNG进行手机应用UI自动化测试的流程及遇到的问题,及具体的解决方法.同时记录了使得测试脚本更稳定的一些代码写法. Device F ...

  7. arcgis python 二次开发_我在部署ArcGIS API for Python时踩到的坑

    ArcGIS API for Python相比于其他ESRI产品,还是很年轻.我在部署时踩到了坑,网上也找不到解决方法,很是煞风景,也很打击学习的积极性. 今天回顾一下,做个总结吧.一方面自己备忘,另 ...

  8. git服务器安装位置,Linux服务器安装gitlabe-runner,并部署包到指定目录,还有踩的一些坑~~...

    [TOC] 前言:上篇文章讲解了如何安装一个本地runner,然后用本地runner发布本地包到Linux,但这会有一个问题,在本地runner用scp向Linux发送文件,会造成服务器上的文件越来越 ...

  9. mysql 优惠卷表设计_这些年MySQL表设计踩过的坑!

    本文首发于个人微信公众号<andyqian>,期待你的关注! 前言 有朋友在后台留言.希望我能说说我在数据库表设计时踩过的坑.那么,我们今天就来聊聊我在数据库表设计时踩过的坑,以及现在对数 ...

最新文章

  1. Flowable工作流总结_工作规范流程设计思路
  2. Python元组,列表,解构和循环
  3. GIT创建公钥,并放置远程库
  4. vue 开发中element-ui库的switch开关绑定number类型数据不成功问题 解决方法
  5. require.js使用教程
  6. 迁移cnblog博客
  7. mac电脑安装mysql
  8. 单片机51keil编程流程
  9. Python爬虫自己写项目之:爬取火车站的时刻表和票务信息
  10. 未来计算机2020500,500kV变电站计算机监控系统的实施策略原稿
  11. Android 测试环境噪音分贝
  12. 诛仙服务器 修改技能伤害,《诛仙3》技能调整优化【 技能修改·天脉】
  13. 电子科技大学生物信息学 重点
  14. 微信公众号里面使用定位
  15. scl函数C语言,SCL语言基本语法规则:表达式
  16. 太感动了!从Python入门到入魔
  17. 阿里云Codeup云效使用flow+k8s部署go语言项目
  18. OPC是什么意思?OPC Server 和OPC Client又有什么区别呢?
  19. 20元一支的洗面奶,7天卖了上万,他们是如何做到的?
  20. 图解三次握手与四次挥手

热门文章

  1. 统信UOS专业版安装SecureCRT
  2. 乡村“蔬菜快递”直供饭店
  3. QT zint一维码生成解析
  4. Ghost每个功能详细解释
  5. 【刷题】BZOJ 4827 [Hnoi2017]礼物
  6. 电脑运行缓慢分析处理方案
  7. 【量化投资实训】基于MATLAB实验六.某行业营收增速的统计指标
  8. 手头有五万左右,想做个小生意,有什么值得推荐的?
  9. java毕业生设计学生在线评教系统计算机源码+系统+mysql+调试部署+lw
  10. [ElementUI] 修改默认语言为英文