摘要:本文将演示如果序列化生成avro数据,并使用FlinkSQL进行解析。

本文分享自华为云社区《【技术分享】Apache Avro数据的序列化、反序列&&FlinkSQL解析Avro数据》,作者: 南派三叔。

技术背景

随着互联网高速的发展,云计算、大数据、人工智能AI、物联网等前沿技术已然成为当今时代主流的高新技术,诸如电商网站、人脸识别、无人驾驶、智能家居、智慧城市等等,不仅方面方便了人们的衣食住行,背后更是时时刻刻有大量的数据在经过各种各样的系统平台的采集、清晰、分析,而保证数据的低时延、高吞吐、安全性就显得尤为重要,Apache Avro本身通过Schema的方式序列化后进行二进制传输,一方面保证了数据的高速传输,另一方面保证了数据安全性,avro当前在各个行业的应用越来越广泛,如何对avro数据进行处理解析应用就格外重要,本文将演示如果序列化生成avro数据,并使用FlinkSQL进行解析。

本文是avro解析的demo,当前FlinkSQL仅适用于简单的avro数据解析,复杂嵌套avro数据暂时不支持。

场景介绍

本文主要介绍以下三个重点内容:

  • 如何序列化生成Avro数据
  • 如何反序列化解析Avro数据
  • 如何使用FlinkSQL解析Avro数据

前提条件

  • 了解avro是什么,可参考apache avro官网快速入门指南
  • 了解avro应用场景

操作步骤

1、新建avro maven工程项目,配置pom依赖

pom文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.huawei.bigdata</groupId><artifactId>avrodemo</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.8.1</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.8.1</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory><outputDirectory>${project.basedir}/src/main/java/</outputDirectory></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.6</source><target>1.6</target></configuration></plugin></plugins></build></project>

注意:以上pom文件配置了自动生成类的路径,即${project.basedir}/src/main/avro/和${project.basedir}/src/main/java/,这样配置之后,在执行mvn命令的时候,这个插件就会自动将此目录下的avsc schema生成类文件,并放到后者这个目录下。如果没有生成avro目录,手动创建一下即可。

2、定义schema

使用JSON为Avro定义schema。schema由基本类型(null,boolean, int, long, float, double, bytes 和string)和复杂类型(record, enum, array, map, union, 和fixed)组成。例如,以下定义一个user的schema,在main目录下创建一个avro目录,然后在avro目录下新建文件 user.avsc :

{"namespace": "lancoo.ecbdc.pre","type": "record","name": "User","fields": [{"name": "name", "type": "string"},{"name": "favorite_number",  "type": ["int", "null"]},{"name": "favorite_color", "type": ["string", "null"]}]
}

3、编译schema

点击maven projects项目的compile进行编译,会自动在创建namespace路径和User类代码

4、序列化

创建TestUser类,用于序列化生成数据

User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
// Leave favorite col or null// Alternate constructor
User user2 = new User("Ben", 7, "red");// Construct via builder
User user3 = User.newBuilder().setName("Charlie").setFavoriteColor("blue").setFavoriteNumber(null).build();// Serialize user1, user2 and user3 to disk
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
dataFileWriter.create(user1.getSchema(), new File("user_generic.avro"));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();

执行序列化程序后,会在项目的同级目录下生成avro数据

user_generic.avro内容如下:

Objavro.schema�{"type":"record","name":"User","namespace":"lancoo.ecbdc.pre","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}

至此avro数据已经生成。

5、反序列化

通过反序列化代码解析avro数据

// Deserialize Users from disk
DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class);
DataFileReader<User> dataFileReader = new DataFileReader<User>(new File("user_generic.avro"), userDatumReader);
User user = null;
while (dataFileReader.hasNext()) {// Reuse user object by passing it to next(). This saves us from// allocating and garbage collecting many objects for files with// many items.user = dataFileReader.next(user);System.out.println(user);
}

执行反序列化代码解析user_generic.avro

avro数据解析成功。

6、将user_generic.avro上传至hdfs路径

hdfs dfs -mkdir -p /tmp/lztest/
hdfs dfs -put user_generic.avro /tmp/lztest/

7、配置flinkserver

  • 准备avro jar包

将flink-sql-avro-*.jar、flink-sql-avro-confluent-registry-*.jar放入flinkserver lib,将下面的命令在所有flinkserver节点执行

cp /opt/huawei/Bigdata/FusionInsight_Flink_8.1.2/install/FusionInsight-Flink-1.12.2/flink/opt/flink-sql-avro*.jar /opt/huawei/Bigdata/FusionInsight_Flink_8.1.3/install/FusionInsight-Flink-1.12.2/flink/lib
chmod 500 flink-sql-avro*.jar
chown omm:wheel flink-sql-avro*.jar

  • 同时重启FlinkServer实例,重启完成后查看avro包是否被上传
hdfs dfs -ls /FusionInsight_FlinkServer/8.1.2-312005/lib

8、编写FlinkSQL

CREATE TABLE testHdfs(name String,favorite_number int,favorite_color String
) WITH('connector' = 'filesystem','path' = 'hdfs:///tmp/lztest/user_generic.avro','format' = 'avro'
);CREATE TABLE KafkaTable (name String,favorite_number int,favorite_color String
) WITH ('connector' = 'kafka','topic' = 'testavro','properties.bootstrap.servers' = '96.10.2.1:21005','properties.group.id' = 'testGroup','scan.startup.mode' = 'latest-offset','format' = 'avro'
);
insert intoKafkaTable
select*
fromtestHdfs;

保存提交任务

9、查看对应topic中是否有数据

FlinkSQL解析avro数据成功。

点击关注,第一时间了解华为云新鲜技术~

一文解析Apache Avro数据相关推荐

  1. 一文解析交通大数据是如何解决拥堵问题的

    前言 现在的社会是一个高速发展的社会,科技发达,信息流通,人们之间的交流越来越密切,生活也越来越方便,大数据就是这个高科技时代的产物.随着中国经济的快速发展,汽车行业空前繁荣,私人小汽车拥有量迅速增加 ...

  2. python读取word指定内容_python解析html提取数据,并生成word文档实例解析

    简介 今天试着用ptyhon做了一个抓取网页内容,并生成word文档的功能,功能很简单,做一下记录以备以后用到. 生成word用到了第三方组件python-docx,所以先进行第三方组件的安装.由于w ...

  3. kafka python框架_Python中如何使用Apache Avro——Apache的数据序列化系统

    了解如何创建和使用基于Apache Avro的数据,以实现更好,更有效的传输. 在这篇文章中,我将讨论Apache Avro,这是一种开源数据序列化系统,Spark,Kafka等工具正在使用该工具进行 ...

  4. Apache Avro

    Avro(读音类似于[ævrə])是Hadoop的一个子项目, 由Hadoop的创始人Doug Cutting(也是Lucene,Nutch等项目的创始人,膜拜)牵头开发, Avro是一个数据序列化系 ...

  5. Apache Avro简介,java实现官网翻译

    文章目录 Apache Avro™ Introduction Schemas Comparison with other systems JAVA简单使用 Defining a schema Seri ...

  6. poi 顺序解析word_JavaPOI解析word提取数据到excel

    Java POI解析Word提取数据存储在Excel 一.了解POI POI以前有了解,这次需求是解析word读取其中标题,还有内容赛选获取自己想要的内容 经过两天的学习,开始熟悉Java这么读取wo ...

  7. Apache Avro 与 Thrift 比较

    分享一下我老师大神的人工智能教程!零基础,通俗易懂!http://blog.csdn.net/jiangjunshow 也欢迎大家转载本篇文章.分享知识,造福人民,实现我们中华民族伟大复兴! Avro ...

  8. Apache Avro 入门

    Avro是Hadoop中的一个子项目,也是Apache中一个独立的项目,Avro是一个基于二进制数据传输高性能的中间件.在Hadoop的其他项目中例如HBase(Ref)和Hive(Ref)的Clie ...

  9. Apache Avro项目简介

    1.Avro简介 Avro是一种远程过程调用RPC和数据序列化框架,是在Apache的Hadoop项目之内开发的.它使用JSON来定义数据类型和通讯协议,使用压缩二进制格式来序列化数据.它主要用于Ha ...

最新文章

  1. 一个球从100m高度自由落下,第10次反弹多高
  2. SQL Server 中 EXEC 与 SP_EXECUTESQL 的区别
  3. python正则表达式面试_Python面试 Re-正则表达式
  4. altium导出钻孔文件_[Altium Designer 学习]怎样输出Gerber文件和钻孔文件
  5. 白板机器学习笔记 P22-P27 PCA降维
  6. java tcp聊天程序_java实现基于Tcp的socket聊天程序
  7. Linux环境下编程有哪些优势?
  8. 用keepalived配置高可用,监控NGINX服务
  9. BZOJ4066 简单题(KD-Tree)
  10. unity有用资源的导出未package便于在其他工程用的问题解决
  11. 电子产品可靠性测试报告
  12. httppost请求工具类
  13. python给excel排序_数据处理,Excel的排序功能,使用pandas在Python中轻松完成
  14. 魔兽TBC常用WA字符串收集
  15. 西安计算机考证培训学校
  16. 网页加载慢,你知道几种原因?
  17. 随笔-不足与外人道也
  18. 毕业生签三方?报到证?档案户口?
  19. java web 题_javaWeb习题与答案
  20. css3变形 transform中复合写法的注意问题总结

热门文章

  1. CSS常用单词-弹性盒(专业版)
  2. 传记 | 我的大学三年-不以物喜,不以己悲
  3. HTML5 绘制动画
  4. Bootstrap CSS 编码规范之不要使用 @import
  5. es6 export 命令
  6. java dbcursor_优化JAVA查询Mongodb数量过大,查询熟读慢的方法
  7. 查询没有走索引_关于MySQL种的in函数到底走不走索引、我和同事差点大打出手!...
  8. 二分查找算法实现(图解)与实例
  9. redis复制原理和应用
  10. 关于使用rem单位,calc()进行自适应布局