背景

Flink版本-1.11.0
Flink-CDC版本- 1.1.0

问题集合

1. 使用flink sql 时,需要引入flink-json依赖

异常信息

Caused by: org.apache.flink.table.api.ValidationException: Could not find any factories that implement 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath.at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:229)at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalFormatFactory(FactoryUtil.java:538)at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalDecodingFormat(FactoryUtil.java:426)at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverDecodingFormat(FactoryUtil.java:413)at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactoryBase.createDynamicTableSource(KafkaDynamicTableFactoryBase.java:73)at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)

解决方案: pom文件中引入

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.11.0</version>
</dependency>
2. Flink 1.11版本后简化了 connector 参数

Flink由于发展太快(2020年已经发布了1.10,1.11,1.12三个大版本),很多2020年初的blog提供教程已经面临失效

以 Kafka 为例,在 1.11 版本之前用户的 DDL 需要声明成如下方式

 CREATE TABLE user_behavior (...
) WITH ('connector.type'='kafka','connector.version'='universal','connector.topic'='user_behavior','connector.startup-mode'='earliest-offset','connector.properties.zookeeper.connect'='localhost:2181','connector.properties.bootstrap.servers'='localhost:9092','format.type'='json'
);

而在 Flink SQL 1.11 中则简化为

CREATE TABLE user_behavior (...
) WITH ('connector'='kafka','topic'='user_behavior','scan.startup.mode'='earliest-offset','properties.zookeeper.connect'='localhost:2181','properties.bootstrap.servers'='localhost:9092','format'='json'
);

详细变更见FLIP-122

3. flink-sql-connector-jdbc声明表时,必须指定主键

否则会报异常:

java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSink.validatePrimaryKey(JdbcDynamicTableSink.java:72)at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSink.getChangelogMode(JdbcDynamicTableSink.java:63)at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:120)at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:50)at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:39)at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)at scala.collection.Iterator$class.foreach(Iterator.scala:891)at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)at scala.collection.AbstractIterable.foreach(Iterable.scala:54)at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)at scala.collection.immutable.Range.foreach(Range.scala:160)at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)at scala.collection.Iterator$class.foreach(Iterator.scala:891)at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)at scala.collection.AbstractIterable.foreach(Iterable.scala:54)at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80)at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)···
4. 使用flink-mysql-cdc时,请注意检查线上数据库binlog-format属性,另外要给用户授权
SET GLOBAL binlog_format = 'ROW';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
5. 接收到MIXED或STATEMENT格式日志退出

虽然你可能将Mysql的binlog日志格式改为row,但是仍然可能存在之前的session或者有用户手动修改并提交mixed或者statement格式的日志,这会导致cdc组件异常并退出。

flink-mysql-cdc并没有直接关于此情况设置,但是其引用的debezium组件,在1.3版本(虽然官方文档在1.2版本也有相关属性,但是看其源码并不支持)开始支持忽略解析错误的语句
可以通过添加属性配置,来跳过。但是也可能带来丢失数据的风险。

debezium文档

'debezium.event.processing.failure.handling.mode' = 'skip',
'debezium.inconsistent.schema.handling.mode' = 'skip',
'debezium.database.history.skip.unparseable.ddl' = 'true'
6. 目标表要注意清除外键依赖

同步数据时,很多公司都会直接同步原始表的所有字段作为数仓ods层或者dim层,并使用mysql存储,不做任何处理,只有在流表与维表join的时候才会读取.

此时可能从业务数据库导出sql并导入数仓的mysql,外键依赖也可能会导入。

那么需要注意去掉其外键依赖,否则会在同步时发生异常。

Flink CDC踩坑集合相关推荐

  1. 阿里巴巴大规模应用Flink的踩坑经验:如何大幅降低 HDFS 压力?

    简介: 众所周知 Flink 是当前广泛使用的计算引擎,Flink 使用 checkpoint 机制进行容错处理[1],Flink 的 checkpoint 会将状态快照备份到分布式存储系统,供后续恢 ...

  2. 小米项目实习踩坑集合

    2020.2.21    隐私合规    vue 问题一:按钮为button,点击出现蓝色边框,与设计稿不符合.如下: 原因:浏览器默认设置的button样式 解决: button:focus{out ...

  3. fibos开发踩坑集合

    fibos.js API资料: 与eosjs相比,fibos.js没有添加新功能,可以在eosjs项目页面https://developers.eos.io/eosio-nodeos/referenc ...

  4. uni-app踩坑集合(小程序)

    1.指定页面不显示返回或首页按钮 onShow(){uni.h } 2.uni-app 开发微信小程序长按识别公众号 ![在这里插入图片描述](https://img-blog.csdnimg.cn/ ...

  5. 东八区转为0时区_踩坑记 | Flink 天级别窗口中存在的时区问题

    ❝ 本系列每篇文章都是从一些实际的 case 出发,分析一些生产环境中经常会遇到的问题,抛砖引玉,以帮助小伙伴们解决一些实际问题.本文介绍 Flink 时间以及时区问题,分析了在天级别的窗口时会遇到的 ...

  6. 【Flink】实时归因场景踩坑

    关注交流微信公众号:小满锅 背景 前几天思考了一个问题,在很多业务场景下,需要关注流量的来源或是某个业务哪个入口的流量最大,带来的效益最多,那么就涉及到流量的归因了.比如说,我是一个bilibili ...

  7. Flink CDC 2.0 正式发布,详解核心改进

    简介:Flink CDC 2.0.0 版本于 8 月 10 日正式发布,点击了解详情- 本文由社区志愿者陈政羽整理,内容来源自阿里巴巴高级开发工程师徐榜江 (雪尽) 7 月 10 日在北京站 Flin ...

  8. html2canvas图片的文字偏移,html2canvas在Vue项目踩坑-生成图片偏移不完整

    背景 最近做一个Vue项目需求是用户长按保存图片,页面的数据是根据不同id动态生成的,页面渲染完生成内容图片让用户长按保存的时候,把整个页面都保存起来. 在项目遇到的坑是图片能生成,可是生成的图片总是 ...

  9. java调用clang编译的so_写Java这么久,JDK源码编译过没?编译JDK源码踩坑纪实

    好奇害死羊 很多小伙伴们做Java开发,天天写Java代码,肯定离不开Java基础环境:JDK,毕竟我们写好的Java代码也是跑在JVM虚拟机上. 一般来说,我们学Java之前,第一步就是安装JDK环 ...

最新文章

  1. java 客户端发起http请求2
  2. 【设计模式】—— 模板方法Template
  3. 【Android 电量优化】电量优化 ( 充电状态获取 | 主动获取充电状态 | 广播接受者监听充电状态 | 被动获取充电状态 | 注册空广播接受者获取历史广播 )
  4. 初学Web前端开发,你需要掌握的11项技能
  5. jstack 脚本 自动日志_GitLab从安装到全自动化备份一条龙
  6. Linux 软件安装到 /usr,/usr/local/ 还是 /opt 目录?
  7. 以snull为例分析linux网卡驱动的技术文档[转载]二
  8. debian adsl上网
  9. Windows xp 定时关机命令 [转贴]
  10. 计算机组成原理GLK,计算机组成原理课件第一讲.ppt
  11. 使用for循环同时便利两个列表
  12. EXCEL转PDF,JACOB,生成checkbox
  13. 低价神话缔造者!宏碁A500最全面评测
  14. 【好记性不如烂笔头】Spring框架内容问答的形式回忆-长期更新
  15. watershed(分水岭算法)
  16. MAC 反编译安卓问题汇总
  17. 51单片机流水灯LED
  18. php类图怎么画,类图怎么画?
  19. Skew数(二进制数)-C语言
  20. 微信小程序资料集(下)

热门文章

  1. Appium+Java实现对安卓APK的自动化测试(环境搭建、脚本编写、错误问题、解决方法)
  2. M100——Onboard SDK编译出错
  3. 【牛客前端刷题】JS拔高篇
  4. linux 7z打包排除特殊文件夹,7-zip的压缩的时候排除某目录
  5. 店宝宝:电商的下一个十年,是什么局面
  6. 剑指Offer——“你最大的缺点是什么”回答技巧及范例
  7. Real-Time Rendering——5.5.1 Blending Order 混合顺序
  8. 小程序脚本语言WXS详解
  9. matlab画三维向量的曲面,Matlab绘制三维曲面(以二维高斯函数为例)
  10. 满屏飞舞的心HTML动画,使用snowfall.jquery.js实现爱心满屏飞的效果