前置博客:

搭建Hadoop3.1.2伪分布方式环境
本博客示例中可能出现的错误及解决方案:
Name node is in safe mode.
Container killed on request. Exit code is 143

简介

在开发之初,Avro就是围绕着完善Hadoop生态系统的数据处理而开展的(使用Avro作为Hadoop MapReduce需要处理数据序列化和反序列化的场景),因此Hadoop MapReduce集成Avro也就是自然而然的事情。

在MapReduce中使用Avro可以提升数据的处理性能,主要是以下几点:

  • 向Job提供数据文件时可以使用Avro序列化过的二进制数据文件
  • 在数据解析方面速度比较快
  • 排序功能

本博客用到的软件的版本:

  • CentOS7.0
  • Hadoop的版本是3.1.2
  • Avro的版本是1.9.1

示例1:单词统计

Hadoop MapReduce读取源文件进行计数统计,然后将计算结果作为Avro格式的数据写到目标文件中。

首先将avro-mapred-1.9.1.jar上传到share/hadoop/mapreduce/目录

第一步:创建Maven项目

  • 添加Maven依赖
<dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version>
</dependency>
<dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.9.1</version>
</dependency>
<dependency><groupId>org.apache.avro</groupId><artifactId>avro-mapred</artifactId><version>1.9.1</version>
</dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.2</version>
</dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version>
</dependency>
  • 配置插件
 <plugins><plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.9.1</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory><outputDirectory>${project.basedir}/src/main/java/</outputDirectory></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><configuration><archive><manifest><mainClass>com.hc.WordCountDriver</mainClass></manifest></archive></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin>
</plugins>

第二步:在resources目录中提供log4j.properties

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

第三步:创建WordCountSchema

public class WordCountSchema {public static Schema schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"WordCountRecord\",\"fields\":[{\"name\":\"count\",\"type\":\"int\"}]}");
}

上面schema的值是我们写的Avro的字符串:

{"type":"record",
"name":"WordCountRecord",
"fields":[{"name":"count","type":"int"}
]
}

第四步:创建WordCountMapper

public class WordCountMapper extends Mapper<Object, Text, AvroKey<String>, AvroValue<GenericRecord>> {private GenericRecord record = new GenericData.Record(WordCountSchema.schema);@Overrideprotected void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] words = value.toString().split(" ");for (int i = 0; i < words.length; i++) {if (words[i].length() > 0 && Character.isLetter(words[i].charAt(0))) {AvroKey<String> word = new AvroKey<>(words[i]);record.put("count", 1);context.write(word, new AvroValue<>(record));}}}
}

第五步:创建WordCountReducer

public class WordCountReducer extends Reducer<AvroKey<String>, AvroValue<GenericRecord>, AvroKey<String>, AvroValue<Integer>> {@Overrideprotected void reduce(AvroKey<String> key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException {int sum = 0;for (AvroValue<GenericRecord> value : values) {GenericRecord record = value.datum();sum += Integer.parseInt(record.get("count").toString());}context.write(key, new AvroValue<>(sum));}}

第六步:创建WordCountDriver

public class WordCountDriver {public static void main(String[] args) throws Exception {Configuration cfg = new Configuration();//获取配置信息以及封装任务Job job = Job.getInstance(cfg, "WordCountAvro");job.setJarByClass(WordCountDriver.class);//设置jar加载路径job.setMapperClass(WordCountMapper.class);//设置Mapper类,执行map方法job.setReducerClass(WordCountReducer.class);//设置Reducer类,执行reduce方法AvroJob.setMapOutputKeySchema(job,Schema.create(Schema.Type.STRING));AvroJob.setMapOutputValueSchema(job,WordCountSchema.schema);AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING));AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.INT));//设置输入和输出路径FileInputFormat.addInputPath(job, new Path(args[0]));//文件输入路径FileOutputFormat.setOutputPath(job, new Path(args[1]));//文件输出路径boolean res = job.waitForCompletion(true);//提交job并等待结束System.exit(res ? 0 : 1); //退出程序}
}

上面代码使用AvroJob来配置作业,AvroJob类主要用来给输入、map输出以及最后输出数据指定Avro模式。

第七步:运行程序

  1. 项目打包

    将打好的包上传到Linux服务器中,重命名为mrad.jar
  2. 在Linux服务器端启动Hadoop,在HDFS根目录创建input目录,同时在里面放置几篇英文文章作为待测试的数据
  3. 运行1步中上传的jar包

    4.查看程序运行结果:

示例2:通过MapReduce程序找到各个班级年龄最小的学生

第一步:将avro-1.9.1.jar上传到到share/hadoop/mapreduce/目录


否则的话会报错:

第二步:创建Maven项目

  • 添加Maven依赖
<dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version>
</dependency>
<dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.9.1</version>
</dependency>
<dependency><groupId>org.apache.avro</groupId><artifactId>avro-mapred</artifactId><version>1.9.1</version>
</dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.2</version>
</dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version>
</dependency>
  • 配置插件
 <plugins><plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.9.1</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory><outputDirectory>${project.basedir}/src/main/java/</outputDirectory></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><configuration><archive><manifest><mainClass>com.hc.StuDriver</mainClass></manifest></archive></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin>
</plugins>
  • 第二步:在resources目录中
  1. 提供log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
  1. 提供stu.avsc
{"type": "record","name": "StuRecord","fields": [{"name": "name", "type": "string"},{"name": "age", "type": "int"},{"name": "gender", "type": "string"},{"name": "class", "type": "string"}]
}

第三步:创建StuSchema

public class StuSchema {public static Schema schema ;static{InputStream is = StuSchema.class.getClassLoader().getResourceAsStream("stu.avsc");try {schema = new Parser().parse(is);} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) {System.out.println(schema);}
}

第四步:创建StuMapper

public class StuMapper extends Mapper<LongWritable, Text, AvroKey<String>, AvroValue<GenericRecord>> {private GenericRecord record = new GenericData.Record(StuSchema.schema);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] strings = value.toString().split("\\s+"); //匹配一个或多个空格if (strings.length < 4) {return;}record.put("name",strings[0]);record.put("age",Integer.parseInt(strings[1]));record.put("gender",strings[2]);record.put("class",strings[3]);context.write(new AvroKey<>(strings[3]),new AvroValue<>(record));}
}

Mapper程序主要做的事情就是将存放在txt中的记录解析成一个个的GenericRecord对戏,然后以班级名称为键,record为值传递给Reducer做进一步处理。

第五步:创建StuReducer

public class StuReducer extends Reducer<AvroKey<String>, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {@Overrideprotected void reduce(AvroKey<String> key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException {GenericRecord min = null;for (AvroValue<GenericRecord> item : values) {GenericRecord record = item.datum();if (min == null || (Integer.parseInt(min.get("age").toString()) > Integer.parseInt(record.get("age").toString()))) {min = new GenericData.Record(StuSchema.schema);min.put("name", record.get("name"));min.put("age", record.get("age"));min.put("gender", record.get("gender"));min.put("class", record.get("class"));}}context.write(new AvroKey<>(min), NullWritable.get());}
}

Reducer的逻辑其实是通过循环比较的方式找到每个班级年龄最小的学生。

第六步:创建StuDriver

public class StuDriver {public static void main(String[] args) throws Exception {Configuration cfg = new Configuration();// 可以解决在Hadoop集群中运行时使用的Avro版本和集群中Avro版本不一致的问题。//cfg.setBoolean(Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);//获取配置信息以及封装任务Job job = Job.getInstance(cfg, "Stu");job.setJarByClass(StuDriver.class);//设置jar加载路径job.setMapperClass(StuMapper.class);//设置Mapper类,执行map方法job.setReducerClass(StuReducer.class);//设置Reducer类,执行reduce方法AvroJob.setMapOutputKeySchema(job,Schema.create(Schema.Type.STRING));;AvroJob.setMapOutputValueSchema(job,StuSchema.schema);AvroJob.setOutputKeySchema(job, StuSchema.schema);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(AvroKeyOutputFormat.class);//设置输入和输出路径FileInputFormat.addInputPath(job, new Path(args[0]));//文件输入路径FileOutputFormat.setOutputPath(job, new Path(args[1]));//文件输出路径boolean res = job.waitForCompletion(true);//提交job并等待结束System.exit(res ? 0 : 1); //退出程序}
}

第七步:运行程序

  1. 项目打包
    将打好的包上传到Linux服务器中,重命名为mras.jar

  2. 在Linux服务器端启动Hadoop,在HDFS根目录创建input目录,同时在里面创建文件文件stu.txt,在其中添加测试的数据:

zhangsan    13  male    shiziBan
lisi    14  female  musicBan
wanger  19  male    musicBan
mazi    15  male    shiziBan
qianwu  12  female  wudaoBan
zhaoliu 16  female  shiziBan
lisi    18  male    wudaoBan
xiangming   13  female  shiziBan
wangwei 18  female  wudaoBan
ligang  10  male    musicBan
  1. 运行1步中上传的jar包

    4.查看程序运行结果:

    上传avro-tools-1.9.1.jar到Linux服务器的test目录,


运行命令:

MapReduce整合Avro相关推荐

  1. 大数据分析工程师大纲

    大数据分析工程师大纲 阶段一.业务数据分析师 课程一.数据挖掘/分析师之硬技能 - 必备常用工具使用与高级技巧 本部分内容主要介绍了数据挖掘.分析师.数据产品经理必备的常用工具的,主要有 Excel, ...

  2. HBase 常用操作

    hbase只支持行级事务,不支持多行事务. 进入shell:hbase shell: 配置完分布式zk后: 单启Hmaster:hbase-daemon.sh start master HFile默认 ...

  3. BigBrother的大数据之旅Day 13 hbase(2)

    HBase(2) 详述人员角色表的设计思路以及实现 思路:两个部分的信息分别保存到两张表中,因为hbase是列存储的表,一般存储非关系数据,就像记笔记一样,把关键点写上. 第一张表: 个人信息表 ro ...

  4. Spark性能优化--如何解决数据倾斜

    1 Data Skew 数据倾斜 1.1 数据倾斜概念 对Hive.Spark.Flink等大数据计算框架来讲,数据量大并不可怕,可怕的是数据倾斜. 数据倾斜是指并行处理的数据集中某一部分的数据显著多 ...

  5. 大数据开发工程师学习路线分享

      大数据是对海量数据存储.计算.统计.分析等一系列处理手段,处理的数据量是TB级,甚至是PB或EB级的数据,是传统数据处理手段无法完成的,大数据涉及分布式计算.高并发处理.高可用处理.集群.实时性计 ...

  6. 山东大学大数据管理与分析知识点总结

    大数据概述 大数据(big data),或称巨量资料,指的是需要新处理模式才能具有更强的决策力.洞察发现力和流程优化能力的海量.高增长率和多样化的信息资产 大数据指不用随机分析法(抽样调查)这样的捷径 ...

  7. 北京哪里大数据培训好?

    大数据是对海量数据存储.计算.统计.分析等一系列处理手段,处理的数据量是TB级,甚至是PB或EB级的数据,是传统数据处理手段无法完成的,大数据涉及分布式计算.高并发处理.高可用处理.集群.实时性计算等 ...

  8. 基于Hadoop技术实现的离线电商分析平台(Flume、Hadoop、Hbase、SpringMVC、highcharts)- 驴妈妈旅游项目

    离线数据分析平台是一种利用hadoop集群开发工具的一种方式,主要作用是帮助公司对网站的应用有一个比较好的了解.尤其是在电商.旅游.银行.证券.游戏等领域有非常广泛,因为这些领域对数据和用户的特性把握 ...

  9. HBase vs Cassandra: 我们迁移系统的原因

    HBase vs Cassandra: 我们迁移系统的原因 HBase vs Cassandra: 我们迁移系统的原因 » 我有分寸 HBase vs Cassandra: 我们迁移系统的原因 Mar ...

最新文章

  1. asp.net 在 Ngnix 服务器 中配置攻略
  2. 【阿里云API】 阿里云API调用的若干说明
  3. vue实现时间选择器,精确到秒
  4. windows 常用快捷键
  5. 一种M2M业务的架构及实现M2M业务的方法
  6. 软件测试b s环境如何配置,B/S架构测试环境搭建_DB2篇(Win32系统)
  7. 买彩票中奖的概率是多少?
  8. ur3 aubo movit机械臂运动
  9. 《libGDX移动游戏开发从入门到精通》一1.5 其他的一些准备工作
  10. Unity3D方向键控制人物移动的代码
  11. 基于微信小程序的家教信息管理系统毕业设计源码
  12. 【Scratch】青少年蓝桥杯_每日一题_11.01_画笔功能
  13. 高性能服务器龙头,中国服务器的龙头企业:在全球市场中排第三,超越了华为联想...
  14. 【国家局发布】医疗器械注册流程及相关法规大全
  15. winSocket第一步WSAStartup
  16. fpm工作流程(转)--写的很完整很明白
  17. 时间字符串解析(格式问题如2020-04-25T07:00:00+00:00)
  18. 帆软初级证书 第三部分 FineBI 答案
  19. 涂鸦智能校招——前端
  20. 在linux系统命令行模式下如何输入中文

热门文章

  1. 【BMC Plant Biol】MYB转录因子基因HtMYB2调控菊芋花青素生物合成
  2. PySCENIC(四)个性化分析:pyscenic结果之差异转录因子分析及其他可视化
  3. java中print换行符_如何在java中打印带换行符的字符串
  4. JS实现-DIV自动居中代码
  5. apple耳机与nokia耳机内部电路
  6. markdown 对字体大小进行调整
  7. 判断整数序列是否是二叉查找树的后序遍历
  8. minix 文件系统学习
  9. 苏州企业申报高新技术企业的工作流程
  10. 面向对象的案例表格的排序