Flink解析kafka canal未压平数据为message报错
canal使用非flatmessage方式获取mysql bin log日志发至kafka比直接发送json效率要高很多,数据发到kafka后需要实时解析为json,这里可以使用strom或者flink,公司本来就是使用strom解析,但是在吞吐量上有瓶颈,优化空间不大。所以试一试通过flink来做。
非flatmessage需要使用特定的反序列化方式来处理为Message对象,所以这里需要自定义一个类
1 /** 2 * 反序列化canal binlog 3 * 4 * @author @ 2019-02-20 5 * @version 1.0.0 6 */ 7 @PublicEvolving 8 public class MessageDeserializationSchema implements KeyedDeserializationSchema<Message> { 9 10 private static final long serialVersionUID = -678988040385271953L; 11 private MessageDeserializer mesDesc; 12 13 @Override 14 public Message deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { 15 try { 16 if (mesDesc == null) { 17 mesDesc = new MessageDeserializer(); 18 } 19 Message result = mesDesc.deserialize(topic, message); 20 //result.setMetaData(topic, partition, offset); 21 return result; 22 } catch (Exception e) { 23 System.out.println(e); 24 } 25 return null; 26 } 27 28 @Override 29 public boolean isEndOfStream(Message nextElement) { 30 return false; 31 } 32 33 @Override 34 public TypeInformation<Message> getProducedType() { 35 return getForClass(Message.class); 36 } 37 }
然后就可以获取到DataStream[Message],但是在做算子操作的时候就报错了,意思是不支持kryo序列化
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: props_ (com.alibaba.otter.canal.protocol.CanalEntry$Header) header_ (com.alibaba.otter.canal.protocol.CanalEntry$Entry) entries (com.alibaba.otter.canal.protocol.Message)at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.UnsupportedOperationExceptionat java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)... 29 more
参考官方文档,需要注册类的序列化方式:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/custom_serializers.html
//message 不支持kryo序列化 不然在map flatmap的时候报错
env.getConfig.addDefaultKryoSerializer(classOf[Message], classOf[StringSerializer])
如果在算子之间会有其他对象传输的话,也同样需要注册。最后通过测试,flink解析的量大概在单个solt 1W+/s 左右。
转载于:https://www.cnblogs.com/createweb/p/10580281.html
Flink解析kafka canal未压平数据为message报错相关推荐
- kafka发送与接收数据(含奇葩报错解决方案)
首先说明一下,小白在学习这一块的时候,记得导入包的时候要看清包名,网上有一些博客没有具体导入的包名称,可能会导致在本机上导包后出现各种各样奇葩的强制类型转化. kafka的相关内容与概念在这里就不再赘 ...
- 【kafka】kafka 消费数据的时候 报错 (Re-) join group
文章目录 1.场景1 1.1 概述 2.场景2 3.场景3 1.场景1 1.1 概述 kafka 消费数据的时候 报错 如下 2.场景2 spirng-kafka的多consumer问题困扰了我好久, ...
- Kafka启动成功且运行程序无报错,无法消费数据,即外网无法连接Kafka的消费者或生产者
Kafka启动成功且运行程序无报错,无法消费数据,即外网无法连接Kafka的消费者或生产者 sparkStreaming消费kafka中的数据,得不到数据以及无报错信息,找错误如下 首先检查一下,Ka ...
- mysql修改路径报错_mysql修改数据存储路径报错处理
Can't connect to local MySQL server through socket '/var/lib/mysql/mysql.sock' 解决?: >>> vim ...
- 【记录】IDEA未正确关闭导致打开报错,进不了主界面,含解决办法
[记录]IDEA未正确关闭导致打开报错,进不了主界面,含解决办法 错误提示 解决方案 参考 错误提示 截取了错误的主要部分 java.util.concurrent.CompletionExcepti ...
- vue-cli中mock本地json数据踩雷:报错404 (GET http://localhost:8080/goods 404 (Not Found) )
标题vue-cli中mock本地json数据踩雷:报错404 运用vue的脚手架进行了项目的搭建,想要通过json文件模拟后台传输数据,结果一直报错:404,经过不断的查找资料和调试终于找到问题的所在 ...
- Excel单元格数据超过32767报错问题处理
Excel单元格数据超过32767报错问题处理 EasyExcel描述 EasyExcel是一款基于Java的简单.省内存的读写Excel的开源项目.官网.使用起来确实比较方便,但是对于一些比较复杂的 ...
- EasyExcel单元格数据超过32767报错问题处理
EasyExcel单元格数据超过32767报错问题处理 EasyExcel描述 EasyExcel是一款基于Java的简单.省内存的读写Excel的开源项目.官网.使用起来确实比较方便,但是对于一些比 ...
- 关于PostgreSQL 插入数据时经常报错:no partition of relation found for row “tableName“ found for row (exec14 10
关于PostgreSQL 插入数据时经常报错: Caused by: com.seaboxsql.util.PSQLException: ERROR: no partition of relation ...
最新文章
- ubuntu18.04安装python虚拟环境:virtualenv 【亲测有效】
- 积微论坛报告视频+PPT:用微生物组时序数据重现生物膜装配动态过程
- 用-force –opengl 指令_苹果新系统ios14新功能汇总 轻点背面等小技巧怎么用
- html5块元素怎么理解,HTML5-块级元素
- HDU多校3 - 6975 Forgiving Matching(多项式匹配字符串)
- python手机自动化截图_python UI自动化截图对比
- 测试框架之GTest
- HDU 3555 Bomb(数位DP模板啊两种形式)
- android gpu linux,Arm发布针对Mali GPU的Android Linux Vulkan用户空间驱动(HiKey 960,Firefly-RK3288主板)...
- db2 删除索引_程序员必须了解的知识点——你搞懂mysql索引机制了吗?
- c ++结构体构造函数_C ++中的构造函数
- 常见的Java基础的面试题
- DOM技术对xml增删改查后更新源文件异常报错
- 多维数组的本质和指针数组
- 机会难得 | 这家上市公司终于招人了
- RSA 非对称加密之 PKCS8 格式秘钥
- win10 更新计算机时间,win10更新时间太长怎么回事_windows10更新时间太久解决教程...
- Android Mms短信的发送流程,短信发送源码解析
- php redis 操作
- 华为nova2s用哪个型号服务器,华为Nova2s和Nova2买哪个好/区别大吗?华为Nova2s与Nova2的区别对比详解...