先装备一个txt文档,内容如下:

13726230501 200 1100
13396230502 300 1200
13892730503 400 1300
13897230503 100 300
13597230534 500 1400
13597230534 300 1200

第1列表示手机号码,第2列表示上行流量,第3列表示下行流量。

需求:

要把同一个用户的上行流量、下行流量进行累加,并计算出综合。

例如上面的13897230503有两条记录,就要对这两条记录进行累加,计算总和,得到:

13897230503,500,1600,2100

实现思路:

map

接收日志的一行数据,key为行的偏移量,value为此行数据。

输出时,应以手机号为key,value应为一个整体,包括:上行流量、下行流量、总流量。

手机号是字符串类型Text,而这个整体不能用基本数据类型表示,需要我们自定义一个bean对象,并且要实现可序列化。

key: 13897230503

value: < upFlow:100, dFlow:300, sumFlow:400 >

reduce

接收一个手机号标识的key,及这个手机号对应的bean对象集合。

例如:

key:

13897230503

value:

< upFlow:400, dFlow:1300, sumFlow:1700 >,

< upFlow:100, dFlow:300, sumFlow:400 >

迭代bean对象集合,累加各项,形成一个新的bean对象,例如:

< upFlow:400+100, dFlow:1300+300, sumFlow:1700+400 >

最后输出:

key: 13897230503

value: < upFlow:500, dFlow:1600, sumFlow:2100 >

代码实现:

新建一个Maven工程,其中pom.xml内容如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.gd.hadoop.flowcount</groupId><artifactId>flowcount</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>flowcount</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>commons-beanutils</groupId><artifactId>commons-beanutils</artifactId><version>1.7.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.7.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.7.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-common</artifactId><version>2.7.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>2.7.3</version></dependency><dependency><groupId>jdk.tools</groupId><artifactId>jdk.tools</artifactId><version>1.8</version><scope>system</scope><systemPath>${JAVA_HOME}/lib/tools.jar</systemPath></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build></project>

其中Java代码如下:

FlowBean类:

package com.gd.hadoop.flowcount;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.Writable;public class FlowBean implements Writable {private long upFlow;private long downFlow;private long sumFlow;public FlowBean() {super();// TODO Auto-generated constructor stub}public FlowBean(long upFlow, long downFlow) {super();this.upFlow = upFlow;this.downFlow = downFlow;this.sumFlow = upFlow + downFlow;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}@Overridepublic void readFields(DataInput arg0) throws IOException {// TODO Auto-generated method stubupFlow=arg0.readLong();downFlow=arg0.readLong();sumFlow=arg0.readLong();}@Overridepublic void write(DataOutput arg0) throws IOException {// TODO Auto-generated method stubarg0.writeLong(upFlow);arg0.writeLong(downFlow);arg0.writeLong(sumFlow);}@Overridepublic String toString() {// TODO Auto-generated method stubreturn upFlow+"\t"+downFlow+"\t"+sumFlow;}}

FlowCount类:

package com.gd.hadoop.flowcount;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowCount {static class MyMapper extends Mapper<LongWritable, Text, Text, FlowBean>{@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stubString line=value.toString();String[] words=line.split(" ");context.write(new Text(words[0]), new FlowBean(Long.parseLong(words[1]), Long.parseLong(words[2])));}     }static class MyReducer extends Reducer<Text, FlowBean, Text, FlowBean>{@Overrideprotected void reduce(Text arg0, Iterable<FlowBean> arg1, Reducer<Text, FlowBean, Text, FlowBean>.Context arg2)throws IOException, InterruptedException {// TODO Auto-generated method stublong sum_upFlow=0;long sum_downFlow=0;for (FlowBean flowBean : arg1) {sum_upFlow+=flowBean.getUpFlow();sum_downFlow+=flowBean.getDownFlow();}arg2.write(arg0, new FlowBean(sum_upFlow, sum_downFlow));}}public static void main(String[] args) throws Exception {// TODO Auto-generated method stub//创建配置对象Configuration configuration=new Configuration();//创建Job对象Job job=Job.getInstance(configuration, "FlowCount");//指定Jar包所在的本地路径job.setJarByClass(FlowCount.class);//指定本Job任务要使用的Mapper类与Reducer类job.setMapperClass(MyMapper.class);job.setReducerClass(MyReducer.class);//指定Mapper输出的key-value类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//指定最终输出的key-value类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean flag=job.waitForCompletion(true);if (flag) {System.out.println("complete sucessfully");}else{System.out.println("complete error");}}}

然后导出jar包为flowcount.jar。

然后将jar包和txt文档上传到集群的Master用户主目录下面,再将txt文档上传到hdfs指定目录下面。

执行:

待得到“complete sucessfully”时,执行如下命令查看结果:

Hadoop实战学习(3)-手机流量统计相关推荐

  1. 实现用户手机流量统计(ReduceTask并行度控制)

    需求:1.实现用户手机流量统计(ReduceTask并行度控制) 数据如下:保存为.dat文件(因为以\t切分数据,文件格式必须合适) 13726230503 00-FD-07-A4-72-B8:CM ...

  2. android统计app流量的软件,流量控(手机流量统计)app

    流量控(手机流量统计)app,为你带来准确,及时的流量通知,帮助用户时刻查询自己的手机流量和使用状况,管理程序的流量使用,全方面保证您的流量使用 流量控(手机流量统计)app介绍 流量控是一款统计安卓 ...

  3. Hadoop实战学习(3)-读取数据库内容

    要读取数据库中的数据,首先需要实现一个实体类,这个实体类部分映射数据库中要查询的表的字段.且该实体类需要实 现Writable与DBWritable两个接口,DBWritable的实现类负责查询与写入 ...

  4. Hadoop实战:明星搜索指数统计,找出人气王

    项目介绍 本项目我们使用明星搜索指数数据,分别统计出搜索指数最高的男明星和女明星. 数据集 明星搜索指数数据集,如下图所示.猛戳此链接下载数据集 思路分析 基于项目的需求,我们通过以下几步完成: 1. ...

  5. Hadoop学习_mapreduce提交方式+实现简单流量统计程序(有注释)+shuffle

    注:以下内容来源于互联用,用于个人读书笔记. mapreduce提交方式 MR程序的几种提交运行模式: 本地模型运行 1/在windows的eclipse里面直接运行main方法,就会将job提交给本 ...

  6. 【MapReduce】实战:流量统计(完整Java代码)

    [MapReduce]系列学习笔记: 第一部分:基本介绍 第二部分:MapReduce的编程 第三部分:MapReduce的分区 第四部分:MaoReduce的排序 第五部分:MapReduce实战: ...

  7. MapReduce案例:手机流量的统计

    案例:手机流量的统计 对于记录用户手机信息的文件,得出统计每一个用户(手机号)所耗费的总上行流量.下行流量,总流量结果. 分析 1. 实现自定义的 bean 来封装流量信息,使用手机号码作为Key,B ...

  8. Hadoop实战视频分享 2013年完整版学习视频种子下载

    Hadoop实战视频分享_2013年完整版学习视频种子下载 http://pan.baidu.com/share/link?shareid=354989&uk=1778669877 本人有一套 ...

  9. Hadoop实战视频分享_2013年完整版学习视频种子下载

    Hadoop实战视频分享_2013年完整版学习视频种子下载 http://pan.baidu.com/share/link?shareid=354989&uk=1778669877 本人有一套 ...

最新文章

  1. Python使用matplotlib可视化绘制并导出可视化结果图表到PDF文件中
  2. jsonarray和jsonObject的转化
  3. [Linux][Hadoop] 将hadoop跑起来
  4. 阿里云ECS服务器的搭建
  5. python 双向链表_数据结构-双向链表(Python实现)
  6. 变量声明和函数声明的意义详解
  7. 如何在Appscale下发布自己的应用(一)
  8. 盘点技术史:流量运营(PC 时代)
  9. django处理静态文件
  10. windows、ubuntu、Red Hat、Solaris 单网卡绑定多个IP
  11. Python返回Json格式定义的例子
  12. 快速突破面试算法之动态规划篇
  13. 【Ubuntu】如何使用命令行(优雅地)安装/卸载Microsoft Edge
  14. 基于vue开发一个组件库
  15. 不叫 Andromeda?Google 新系统疑为 Fuchsia
  16. spss和sas和python_T检验第三篇(SPSS,SAS,R,Python) 两样本T检验
  17. 迷你csgo饰品租赁系统
  18. 前端面试题之【CSS】
  19. From Microservices to Data Microservices-pivotal-专题视频课程
  20. [Spring手撸专栏学习笔记]——把AOP动态代理,融入到Bean的生命周期

热门文章

  1. IDEA运行JavaWeb服务器图片无法加载
  2. java方法覆盖Override
  3. openwrt折腾记5 广告屏蔽大师adbyby深度调优(借用老毛子完美数据文件)
  4. 外部局域网直接访问WSL2
  5. Windows10如何实现开机按F8进入传统旧版的安全模式
  6. margin和padding的用法
  7. 3个月内爆胎鼓包全额赔付 京东轮胎节送“轮胎险”解后顾之忧
  8. matlab 黑白同心圆环,Canvas如何绘制同心圆环
  9. 安全机构建议奥巴马政府谨慎使用开源软件
  10. Redis-sentinels启动时报错 All sentinels down, cannot determine where is redis-sentinel master is running.