十、MapReduce综合实战

              综合实战:环境大数据

案列目的
1.学会分析环境数据文件;

2.学会编写解析环境数据文件并进行统计的代码;

3.学会进行递归MapReduce。

案例要求
要求实验结束时,每位学生均已在master服务器上运行从北京2016年1月到6月这半年间的历史天气和空气质量数据文件中分析出的环境统计结果,包含月平均气温、空气质量分布情况等。

实现原理

近年来,由于雾霾问题的持续发酵,越来越多的人开始关注城市相关的环境数据,包括空气质量数据、天气数据等等。
如果每小时记录一次城市的天气实况和空气质量实况信息,则每个城市每天都会产生24条环境数据,全国所有2500多个城市如果均如此进行记录,那每天产生的数据量将达到6万多条,每年则会产生2190万条记录,已经可以称得上环境大数据。
对于这些原始监测数据,我们可以根据时间的维度来进行统计,从而得出与该城市相关的日度及月度平均气温、空气质量优良及污染天数等等,从而为研究空气污染物扩散条件提供有力的数据支持。
本实验中选取了北京2016年1月到6月这半年间的每小时天气和空气质量数据(未取到数据的字段填充“N/A”),利用MapReduce来统计月度平均气温和半年内空气质量为优、良、轻度污染、中度污染、重度污染和严重污染的天数。

实验数据如下

第一题:编写月平均气温统计程序

import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;public class TmpStat
{public static class StatMapper extends Mapper<Object, Text, Text, IntWritable>{private IntWritable intValue = new IntWritable();private Text dateKey = new Text();public void map(Object key, Text value, Context context)throws IOException, InterruptedException{String[] items = value.toString().split(",");String date = items[0];String tmp = items[5];if(!"DATE".equals(date) && !"N/A".equals(tmp)){//排除第一行说明以及未取到数据的行dateKey.set(date.substring(0, 6));intValue.set(Integer.parseInt(tmp));context.write(dateKey, intValue);}}}public static class StatReducer extends Reducer<Text, IntWritable, Text, IntWritable>{private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException{int tmp_sum = 0;int count = 0;for(IntWritable val : values){tmp_sum += val.get();count++;}int tmp_avg = tmp_sum/count;result.set(tmp_avg);context.write(key, result);}}public static void main(String args[])throws IOException, ClassNotFoundException, InterruptedException{Configuration conf = new Configuration();Job job = new Job(conf, "MonthlyAvgTmpStat");job.setInputFormatClass(TextInputFormat.class);TextInputFormat.setInputPaths(job, args[0]);job.setJarByClass(TmpStat.class);job.setMapperClass(StatMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setPartitionerClass(HashPartitioner.class);job.setReducerClass(StatReducer.class);job.setNumReduceTasks(Integer.parseInt(args[2]));job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

运行结果:

第二题:编写每日空气质量统计程序

import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;public class AqiStatDaily
{public static class StatMapper extends Mapper<Object, Text, Text, IntWritable>{private IntWritable intValue = new IntWritable();private Text dateKey = new Text();public void map(Object key, Text value, Context context)throws IOException, InterruptedException{String[] items = value.toString().split(",");String date = items[0];String aqi = items[6];if(!"DATE".equals(date) && !"N/A".equals(aqi)){dateKey.set(date);intValue.set(Integer.parseInt(aqi));context.write(dateKey, intValue);}}}public static class StatReducer extends Reducer<Text, IntWritable, Text, IntWritable>{private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException{int aqi_sum = 0;int count = 0;for(IntWritable val : values){aqi_sum += val.get();count++;}int aqi_avg = aqi_sum/count;result.set(aqi_avg);context.write(key, result);}}public static void main(String args[])throws IOException, ClassNotFoundException, InterruptedException{Configuration conf = new Configuration();Job job = new Job(conf, "AqiStatDaily");job.setInputFormatClass(TextInputFormat.class);TextInputFormat.setInputPaths(job, args[0]);job.setJarByClass(AqiStatDaily.class);job.setMapperClass(StatMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setPartitionerClass(HashPartitioner.class);job.setReducerClass(StatReducer.class);job.setNumReduceTasks(Integer.parseInt(args[2]));job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

运行结果:

第三题: 编写各空气质量天数统计程序

import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;public class AqiStat
{public static final String GOOD = "优";public static final String MODERATE = "良";public static final String LIGHTLY_POLLUTED = "轻度污染";public static final String MODERATELY_POLLUTED = "中度污染";public static final String HEAVILY_POLLUTED = "重度污染";public static final String SEVERELY_POLLUTED = "严重污染";public static class StatMapper extends Mapper<Object, Text, Text, IntWritable>{private final static IntWritable one = new IntWritable(1);private Text cond = new Text();// map方法,根据AQI值,将对应空气质量的天数加1public void map(Object key, Text value, Context context)throws IOException, InterruptedException{String[] items = value.toString().split("\t");int aqi = Integer.parseInt(items[1]);if(aqi <= 50){// 优cond.set(GOOD);}else if(aqi <= 100){// 良cond.set(MODERATE);}else if(aqi <= 150){// 轻度污染cond.set(LIGHTLY_POLLUTED);}else if(aqi <= 200){// 中度污染cond.set(MODERATELY_POLLUTED);}else if(aqi <= 300){// 重度污染cond.set(HEAVILY_POLLUTED);}else{// 严重污染cond.set(SEVERELY_POLLUTED);}context.write(cond, one);}}// 定义reduce类,对相同的空气质量状况,把它们<K,VList>中VList值全部相加public static class StatReducer extends Reducer<Text, IntWritable, Text, IntWritable>{private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException{int sum = 0;for (IntWritable val : values){sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String args[])throws IOException, ClassNotFoundException, InterruptedException{Configuration conf = new Configuration();Job job = new Job(conf, "AqiStat");job.setInputFormatClass(TextInputFormat.class);TextInputFormat.setInputPaths(job, args[0]);job.setJarByClass(AqiStat.class);job.setMapperClass(StatMapper.class);job.setCombinerClass(StatReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setPartitionerClass(HashPartitioner.class);job.setReducerClass(StatReducer.class);job.setNumReduceTasks(Integer.parseInt(args[2]));job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

运行结果:

有什么不懂的,多多问问博主呦!!!

MapReduce数据分析(10)综合实战相关推荐

  1. 《Python数据分析与挖掘实战》第10章(下)——DNN2 筛选得“候选洗浴事件”3 构建模型

    本文是基于<Python数据分析与挖掘实战>的实战部分的第10章的数据--<家用电器用户行为分析与事件识别> 做的分析. 接着前一篇文章的内容,本篇博文重点是处理用水事件中的属 ...

  2. 《Python数据分析与挖掘实战》示例源码免费下载

    <Python数据分析与挖掘实战>​ 在当今大数据驱动的时代,要想从事机器学习.人工智能.数据挖掘等前沿技术,离不开数据跟踪与分析,通过NumPy.Pandas等进行数据科学计算,通过Se ...

  3. 23神经网络 :唐宇迪《python数据分析与机器学习实战》学习笔记

    唐宇迪<python数据分析与机器学习实战>学习笔记 23神经网络 1.初识神经网络 百度深度学习研究院的图,当数据规模较小时差异较小,但当数据规模较大时深度学习算法的效率明显增加,目前大 ...

  4. 小白都能学会的Python基础 第六讲:综合实战2 - 大数据分词与词云图绘制

    1.华小智系列 - Python基础(案例版) <Python基础>目录 第六讲:综合实战2 - 大数据分词与词云图绘制 1.大数据分词技巧 2.词频统计技巧 3.词云图绘制 4.微博词云 ...

  5. 数据分析与挖掘实战-应用系统负载分析与磁盘容量预测

    应用系统负载分析与磁盘容量预测 背景 某大型企业为了信息化发展的需要,建设了办公自动化系统.人力资源管理系统.财务管理系统.企业信息门户系统等几大企业级应用系统.因应用系统在日常运行时,会对底层软硬件 ...

  6. 工业数据分析技术与实战之运作优化——昆仑数据田春华培训听课记录

    昆仑数据田春华老师在微信公众号的专栏培训:工业数据分析与实战.培训给出了一些实际的数据分析例子,包括"设备管理"."运作优化"和"营销服务" ...

  7. 工业数据分析技术与实战之设备管理——昆仑数据田春华培训听课记录

    昆仑数据田春华老师在微信公众号的专栏培训:工业数据分析与实战.培训给出了一些实际的数据分析例子,包括"设备管理"."运作优化"和"营销服务" ...

  8. 《Python数据分析与挖掘实战》第15章 ——电商产品评论数据情感分析(LED)

    文章目录 1.挖掘背景与目标 2.2 数据探索与预处理 2.1 数据筛选 2.2 数据去重 2.3 删除前缀评分 2.4 jieba分词 3 基于LDA 模型的主题分析 4.权重 5.如何在主题空间比 ...

  9. 《Python数据分析与挖掘实战》一第1章 数据挖掘基础1.1 某知名连锁餐饮企业的困惑...

    本节书摘来自华章出版社<Python数据分析与挖掘实战>一书中的第1章,第1.1节,作者 张良均 王路 谭立云 苏剑林,更多章节内容可以访问云栖社区"华章计算机"公众号 ...

最新文章

  1. maven的pom.xml用<exclusion>解决版本问题
  2. Silverlight 2学习笔记一:初识Silverlight
  3. pgsql怎么从interval中取出数字_tp6中swoole扩展websocket的使用
  4. 【UEditor】百度编辑器插入video视频
  5. Hyper Text Transfer Protocol(超文本传输协议)
  6. 数据库重建索引 计划任务
  7. python选择排序算法图解_简单选择排序算法(C语言详解版)
  8. pymongo基本方法使用笔记
  9. Java课程设计报告
  10. 文言文编程可以编译成PHP吗,阁下可知文言编程之精妙?CMU本科生开源文言文编程语言,数天2K星...
  11. 阿里出品的在线图表制作工具
  12. 无法登陆skype显示无法找到服务器,无法登录 Lync,因为找不到此登录地址 - Skype for Business | Microsoft Docs...
  13. 2016年,你要学习这些移动开发技术
  14. 软件工程实践寒假作业
  15. MarkDown一些有用的小技巧
  16. go解析yaml文件示例
  17. php之PDO (PHP DATA OBJECT)
  18. 如何使错误日志更加方便排查问题?
  19. python折叠次数计算、一张纸5毫米_一张纸折叠多次后会怎样?计算机模拟得出,它能突破宇宙范围...
  20. DY-32/60C-DY-29、DY-38电压继电器

热门文章

  1. QQ 群聊美少女语音AI(ChatGLM 本地化版本)
  2. 【sql】sql必知必会_01
  3. oracle cpio,cpio - 手册页部分 1: 用户命令
  4. 三种典型启发式算法(禁忌搜索,模拟退火,遗传算法)
  5. 今天2005年1月6日,不是什么好日子,害我郁闷ing……
  6. 国内主流企业邮箱哪家好?外贸好用的企业邮箱是哪个?
  7. 坚果炒货市场瞄准年轻消费群体
  8. 学生类java程序_java 创建学生类
  9. signature=62a94db18a1c22c5e92df335032a67cf,Print Cradle For Retaining Pagewidth Print Cartridge
  10. python:使用selenium爬取51job(前程无忧)并将爬取数据存储到MySql数据库中的代码实例