又过了非常平淡的一周,这周聊一聊技术细节的问题,俗话说的好,细节是魔鬼,作为一个程序员,不注重技术细节会让你经常掉到坑里还不知道怎么掉进去的,太注重技术细节又会让你陷入困境无法自拔,轻则浪费掉时间,重则影响身心健康、发量锐减。接下来让我们看看本人这周遇到的几个技术细节问题以及解决方案。

  • 对象序列化与反序列化用到的类版本有差异导致反序列化失败
    先介绍一下背景,有一个运行flink sql向HDFS写入orc文件的需求,于是通过flink sql client(或flink-sql-gateway服务)向flink yarn-session提交sql任务时加入了flink-sql-orc_2.11-*.jar依赖,用来支持orc format,在执行任务被提交到yarn-session后报异常:
Caused by: java.io.InvalidClassException: org.apache.orc.TypeDescription; local class incompatible: stream classdesc serialVersionUID = -6272421633916359804, local class serialVersionUID = 8355068668343089184

咋一看这个异常感觉有点慌,从表面上看这个异常的原因是序列化时类org.apache.orc.TypeDescription的SerialVersionUID与反序列化时不一致,仔细分析你会发现其实是序列化时用的类与反序列化时用的类版本不一致。循着这个思路我就去查是不是在发布任务的依赖中存在多个版本的org.apache.orc.TypeDescription类呢?结果是肯定的,为了支持hive catalog的功能,在flink sql client的依赖中有flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar包,在这个包中同样存在org.apache.orc.TypeDescription类,至此,真正的原因找到了。具体是这样的:

  1. 提交SQL任务时,客户端会解析SQL并转为StreamGraph,org.apache.orc.TypeDescription对象的序列化就发生在这个时候,用的是flink-sql-connector-hive-*.jar中的类。
  2. 随后,StreamGraph进一步转为JobGraph后向Yarn-session发布,JobManager收到请求后会反序列化org.apache.orc.TypeDescription对象,用的是flink-sql-orc-.jar中的类,这里只有一个选择,因为向yarn-session发布时并没有带flink-sql-connector-hive-.jar

那么解决该问题的关键就是让序列化和反序列化时用同一个jar包中的org.apache.orc.TypeDescription类,flink-sql-connector-hive-.jar只是在解析SQL(具体只是用到了其中的catalog)时用到,在转为StreamGraph时并没有用到,于是我们专门实例化了一个URLClassLoader,该ClassLoader不包含flink-sql-connector-hive-.jar,然后在该ClassLoader的上下文环境中转StreamGraph,成功解决了该问题。

pipeline = executionContext.wrapDeployClassLoader(() -> executionContext.createPipeline(jobParams.get("jobName")));

这里的wrapDeployClassLoader方法就是不包含link-sql-connector-hive-*.jar的ClassLoader。

  • 多线程访问同一可变对象引发的问题
    在flink-sql-gateway中有这样一段代码:
private ExecutionContext(Environment environment,Environment originalEnvironment,@Nullable SessionState sessionState,List<URL> dependencies,Configuration flinkConfig,ClusterClientServiceLoader clusterClientServiceLoader,Options commandLineOptions,List<CustomCommandLine> availableCommandLines) throws FlinkException {this.environment = environment;this.originalEnvironment = originalEnvironment;// 1.初始化flinkConfigthis.flinkConfig = flinkConfig;// 此处省略部分代码LOG.debug("Deployment descriptor: {}", environment.getDeployment());final CommandLine commandLine = createCommandLine(environment.getDeployment(),commandLineOptions);// 2.修改flinkConfigthis.flinkConfig.addAll(createExecutionConfig(commandLine,commandLineOptions,availableCommandLines,dependencies));final ClusterClientServiceLoader serviceLoader = checkNotNull(clusterClientServiceLoader);clusterClientFactory = serviceLoader.getClusterClientFactory(this.flinkConfig);checkState(clusterClientFactory != null);clusterId = clusterClientFactory.getClusterId(this.flinkConfig);clusterSpec = clusterClientFactory.getClusterSpecification(this.flinkConfig);}

ExecutionContext对象初始化时都会传一个flinkConfig对象来初始化自身的flinkConfig属性(注释1),随后会修改this.flinkConfig(注释2),而传入的flinkConfig对象是服务中公共单实例对象defaultContext的一个属性,也就是所有ExecutionContext对象的flinkConfig属性其实指向了同一个对象,换句话说一个对象修改了它的flinkConfig属性也就影响了其他ExecutionContext对象的flinkConfig属性,出现了意想不到的执行结果,这就是多线程下共享可变对象的从而导致结果不一致的一个例子,是不是看到了魔鬼的一面,很可怕,但也比较好解决,每个ExecutionContext对象单独有自己的flinkConfig变量就好了,将注释1的地方改一下:

this.flinkConfig = flinkConfig.clone();
  • flink on yarn-session 应用受到Hadoop集群环境影响
    这个问题跟第一个问题有关,本来打算写到hdfs的数据格式为orc,结果因为2.1.1版本的hive读取该orc数据时有问题(可参考Flink-orc 与hive 2.1.1兼容性问题),决定将格式改为parquet,任务发布后执行时报如下错误:
java.lang.NoSuchMethodError: org.apache.parquet.hadoop.ParquetWriter$Builder.<init>(Lorg/apache/parquet/io/OutputFile;)Vat org.apache.flink.formats.parquet.row.ParquetRowDataBuilder.<init>(ParquetRowDataBuilder.java:60)at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$FlinkParquetBuilder.createWriter(ParquetRowDataBuilder.java:142)at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)at org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory.create(FileSystemTableSink.java:493)at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69)at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)at org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)at StreamExecCalc$332.processElement(Unknown Source)at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123)at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)at StreamExecCalc$292.processElement(Unknown Source)at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)

发生这个异常的原因是执行时没有找到 org.apache.parquet.hadoop.ParquetWriter B u i l d e r . < i n i t > ( L o r g / a p a c h e / p a r q u e t / i o / O u t p u t F i l e ) 构 造 方 法 , 查 了 一 下 发 现 o r g . a p a c h e . p a r q u e t . h a d o o p . P a r q u e t W r i t e r Builder.<init>(Lorg/apache/parquet/io/OutputFile)构造方法,查了一下发现org.apache.parquet.hadoop.ParquetWriter Builder.<init>(Lorg/apache/parquet/io/OutputFile)构造方法,查了一下发现org.apache.parquet.hadoop.ParquetWriterBuilder类来自于parquet-hadoop-.jar包,版本是1.10.0,parquet-hadoop-.jar被flink-sql-parquet*.jar包依赖,用来支持flink parquet format。CDH版本的Hadoop集群中也存在parquet-hadoop-.jar包,版本是1.9.0-cdh6.2.1,看了一下确实没有带org.apache.parquet.io.OutputFile参数的构造函数,于是问题基本明确了:程序在yarn集群上执行时依赖了Hadoop集群的.9.0-cdh6.2.1版本的parquet-hadoop-.jar,该版本包中org.apache.parquet.hadoop.ParquetWriter$Builder类没有带org.apache.parquet.io.OutputFile参数的构造函数,从而发生了异常。

该怎么解决呢?就算发布应用时带上1.10.0版本的parquet-hadoop包执行时也可能不去加载而是继续用Hadoop集群的,其实flink在开发时为了避免与Hadoop生态中的依赖发生冲突已经给出了方案,那就是针对Hadoop生态相关依赖单独打一个修改了package路径的shaded包,开发时依赖该shaded包。

照猫画虎,首先用maven-shade-plugin插件将parquent-hadoop相关的包路径从org.apache.parquet改为shaded.org.apache.parquet,打成新的依赖包flink-shaded-parquet-hadoop-1.10.0.jar;然后,将flink-formats/flink-parquet 模块代码中import org.apache.parquet.改为import shaded.org.apache.parquet.;最后发布时依赖带上flink-shaded-parquet-hadoop-1.10.0.jar及新改的flink-parquet format包即可。

至此,问题解决。

大致梳理一下这几个问题解决的思路,首先是要从问题表面分析造成问题或异常的原因,一般来说是先通过现象思考造成问题的可能原因,然后通过一些手段(比如打日志、查看源码)去验证或适当Google做进一步分析,如此反复, 这个过程会比较消耗脑力,但是非常有必要,找到原因后会非常的有成就感,最忌讳的是看到异常因为慌乱而像无头苍蝇一样乱撞,不停地百度去看一些别人可能因为别的原因得出的一些片面结论而误入歧途;其次是通过原因指定解决方案,一般来说可能有一种或多种解决办法,用最简单的那个,如果实在难以解决,比如因为系统间存在版本兼容问题而又一时难以快速升级版本,那就躺平,寻求机会再解决,一点都不丢人。这里聊一个题外话,绝大多数程序员都有一个执念,就是遇到一个问题不管能不能解决我一定要凭自己的一己之力解决,以此来证明技术自己技术实力多么牛x,从而实现自己的价值,但你不知道的是很多问题其实不是技术问题,或者说技术实力的本身并不能用解决这样的技术细节问题而证明,能体系化的解决问题的能力才是真的牛。

完结。

文章中flink-sql-gateway代码来自flink-sql-gateway

周记2021-01-29之细节是魔鬼相关推荐

  1. 2021.01.29小型计算器

    2021.01.29小型计算器 题目描述 模拟程序型计算器,依次输入指令,可能包含的指令有: 数字:'NUM X',X为一个只包含大写字母和数字的字符串,表示一个当前进制的数 运算指令:'ADD',' ...

  2. 2021.01.29 Visual QA论文阅读

    目录 [2014][NIPS] A Multi-World Approach to Question Answering about Real-World Scenes based on Uncert ...

  3. 2021/01/29思维导图(3)

  4. 史上最详细微信小程序授权登录与后端SprIngBoot交互操作说明,附源代码,有疑惑大家可以直接留言,蟹蟹 2021.11.29完善更新小程序代码,

    2021.11.29 更新文章 你好,我是博主宁在春,一起学习吧!!! 写这篇文章的原因,主要是因为最近在写毕业设计,用到了小程序,这中间曲曲折折,一言难尽啊.毕业设计真的让人麻脑阔

  5. A. [2021.1.29多校省选模拟11]最大公约数(杜教筛/数论)

    A. [2021.1.29多校省选模拟11]最大公约数 这是一个杜教筛的经典题目,最后我们只需要筛一下1∗xμ(x)1*x\mu(x)1∗xμ(x)这个函数的前缀和即可,然后看到有111这个函数,我们 ...

  6. 洛谷 刷题 深基部分题解(python版)-2022.01.29

    P5703 [深基2.例5]苹果采购(python3实现) https://blog.csdn.net/dllglvzhenfeng/article/details/122690555 P5703 [ ...

  7. SCI论文投稿前必须检查的29个细节

    论文投稿前必须检查的29个细节 1.拼写检查? 主要检查是否有写错的单词.用错的语句以及标书不合适的地方. 2.多次修改? 一般情况下,论文从写成初稿到最终投稿,至少要修改三遍. 3.不要反复的检查, ...

  8. PowerBI视觉对象共计271组,2021.01.20日更新

    PowerBI视觉对象共计271组,2021.01.20日更新 内容包含导入文件和图标.预览图.文件名一致,在预览图内找到合适的可以直接在视觉对象文件夹搜索 下载地址:点击下载 超便宜 或者复制链接打 ...

  9. Leetcode刷题 2021.01.22

    Leetcode刷题 2021.01.22 Leetcode1042 不邻接植花 Leetcode1010 总持续时间可被 60 整除的歌曲 Leetcode1091 二进制矩阵中的最短路径 Leet ...

最新文章

  1. 记录android老项目studio升级3.1+后重新配置gradle踩过的坑
  2. 矩阵特征值与行列式、迹的关系
  3. 让XP远程桌面支持多用户
  4. 工作134:custom组件
  5. slic3r prusaslicer编译
  6. shell脚本的逻辑判断
  7. 惠普HP Designjet Z5200 PostScript 打印机驱动
  8. maya中实时置换效果 dx11shader
  9. Chapter 2 (Matrix Algebra): Partitioned matrices (分块矩阵)
  10. 微信公众号 第三方登录 获取微信用户信息(java版)
  11. WorldPress中文乱码问题
  12. Android开发之so文件使用方法详解
  13. AUTOCAD——快速提取边界线、CAD绘制单双开门
  14. 解决three.js渲染gltf 模型与gltfViewer网站效果不一致问题 krpano发黑问题 three.js gltf模型渲染发黑问题
  15. 美妆短视频的定位分享,教程、技巧都可做,还可尝试变装内容
  16. 如何解决Excel文档已损坏呢?
  17. logo制作软件有哪些?这些好用的logo制作软件别错过。​
  18. 淘宝网店如何提升无线端宝贝权重,抢占更多无线流量?
  19. Selenium库实现推特爬虫
  20. 计算机桌面文件夹删除如何找回,被误删(永久删除、彻底删除)的文件如何找回?...

热门文章

  1. 3A认证系统 环境搭建,freeradius + mysql + daloradius 时间:2018.2.2
  2. 部分地图瓦片数据源整理
  3. 超星python程序设计答案_超星Python程序设计免费答案
  4. IceE-1.3.0 arm-linux 的移植
  5. 利用ffmpeg+QSV进行视频处理
  6. 一个k8s pod报错端口被占用的问题
  7. Java(标识符和关键字)
  8. Flask 数据库操作出现的低级错误
  9. 网络技能大赛做云平台部分-赛前注意事项[非常重要]
  10. C++树状数组模板题 敌兵布阵解题报告