由于文章太长,其余部分在我的其他几篇博客中!

  • 第一部分:Hadoop介绍及安装

  • 第二部分:HDFS

  • 第四部分:项目案例实战

5、MapReduce

5.1_简介

MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。

MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。

5.2_MapReduce的优缺点

优点

  1. MapReduce易编程
    它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。

  2. 良好的扩展性
    当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。

  3. 高容错性
    MapRedice设计的初衷就是使程字能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。

  4. 适合PB级以上海量数据的离线处理
    可以实现上千台服务器集群并发工作,提供数据处理能力。

缺点

  1. 不擅长实时计算
    MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果。

  2. 不擅长流式计算
    流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。

  3. 不擅长DAG(有向图)计算
    多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。

5.3_MapReduce的核心思想

  1. 分布式的运算程序往往需要分成至少2个阶段。
  2. 第一个阶段的MapTask并发实例,完全并行运行,互不相干。
  3. 第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出
  4. MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。

5.4_MapReduce的节点主从结构

  • jobtracker

    • tasktracker

      • MapTask
      • ReduceTask
    • tasktracker
    • tasktracker

5.5_MR运行流程

1. 简单介绍

  1. 传入一个文段
  2. 按行拆分
  3. Mapping阶段:拆分每一个单词,形式为<key,value>;key是单词,value是单词的个数
  4. Shuffing阶段:将key相同的键值对汇总到一起
  5. Reducer阶段:合并Shuffing阶段处理后,key相同的键值对(value做一个累加操作)
  6. 输出结果

2. 详细流程:

  1. 待处理文本

  2. 客户端submit()之前,获取待处理数据的信息,然后根据参数配置,形成一个任务分配的规划

  3. 提交切片信息(到 Yarn RM)

    切片信息、jar包(在不同机器时才需要提交)、xml文件

  4. 计算出MapTask的数量

    在map阶段读取数据前,FileInputFormat会将输入文件分割成split。split的个数决定了map的个数。影响map个数(split个数)的主要因素有:

    1. 文件的大小。当块(dfs.block.size)为128m时,如果输入文件为128m,会被划分为1个split;当块为256m,会被划分为2个split。

    2. 文件的个数。FileInputFormat按照文件分割split,并且只会分割大文件,即那些大小超过HDFS块的大小的文件。如果HDFS中dfs.block.size设置为128m,而输入的目录中文件有100个,则划分后的split个数至少为100个。

    3. splitsize的大小。分片是按照splitszie的大小进行分割的,一个split的大小在没有设置的情况下,默认等于hdfs block的大小。但应用程序可以通过两个参数来对splitsize进行调节

  5. 对每一个MapTask中的数据进行如下处理:

    1. 读取数据(默认是读一行)
    2. 将数据返回给Mapper(map(k,v)、Context.write(k,v))
    3. 在Mapper中处理好数据后,将数据写入到环形缓冲区(容量默认为100M,到80%后反向写入)
    4. 分区(字典顺序)、排序(快排)
    5. 写到磁盘(分区且区有序)
    6. Merge归并排序
    7. 合并
  6. 所有MapTask任务完成后,启动相应数量(前面分区分了多少个,这里就是多少个)的ReduceTask处理数据范围

    1. 将同一分区的数据下载到ReduceTask本地磁盘

    2. 合并文件 归并排序

      Reduce(k,v)

      Context.write(k,v)

  7. 写到指定路径的文件中

3. Shuffle机制

Mapreduce确保每个reducer的输入都是按key排序的。系统执行排序的过程(即将mapper输出作为输入传给reducer)称为shuffle。

4. Partition分区

分区:把数据扎堆存放

每个reducetask可以读取一个分区,产生一个结果文件,多个分区可以由多个reducetask来分别读取,并分别产生多个结果文件。


5.6_序列化

1. 什么是序列化

序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。

反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对家。

2. 为什么要序列化?

一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。

3. 为什么不用Java的序列化 - serilazable

Java的序列化是一个重量级序列化框架(Serilazable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable),特点如下:

  1. 紧凑
    紧凑的格式能让我们充分利用网络带宽,而带宽是数据中心最稀缺的资源

  2. 快速
    进程通信形成了分布式系统的骨架,所以需要尽量减少序列化和反序列化的性能开销,这是基本的;

  3. 可扩展
    协议为了满足新的需求变化,所以控制客户端和服务器过程中,需要直接引进相应的协议,这些是新协议,原序列化方式能支持新的协议报文;

  4. 互操作
    能支持不同语言写的客户端和服务端进行交互;

4. 常用的数据类型对应的Hadoop数据序列化类型

Java类型 Hadoop Writable类型
boolean BooleanWritable
byte ByteWritable
int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
String Text
map MapWritable
array ArrayWritable

5.7_MapReduce编程规范

用户编写的程序分成三个部分:Mapper、Reducer和Driver。

  1. Mapper阶段
    (1)用户自定义的Mapper要继承自己的父类Mapper

    (2)Mapper的输入数据是KV对的形式(KV的类型可自定义)

    (3)Mapper中的业务逻辑写在map()方法

    (4)Mapper的输出数据是KV对的形式(KV的类型可自定义)

    (5)map()方法(MapTak进程)对每一个<K,v>调用一次

  2. Reducer阶段
    (1)用户自定义的Reducer要继承自己的父类Mapper

    (2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV

    (3)Reducer的业务逻辑写在reduce()方法中

    (4)ReduceTask进程对每一组相同<k,v>组调用一次reduce()方法

  3. Driver阶段
    相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象

5.8_统计案例

1. 统计单词案例

需求:

  • 统计相同单词的个数
MapDemo.java
package com.atSchool.MapReduce;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;/*** Map 阶段* Map的输入是在读一个文本,输入时的key是一个偏移量,value是文本的类型* 输入的key是偏移量,value是读到的文本* 输出的key是文本类型,value就是Hadoop中的整数类型*/
public class MapDemo extends Mapper<LongWritable, Text, Text, IntWritable> {/*** 该类使用标准UTF-8编码存储文本。它提供了在字节级别序列化、反序列化和比较文本的方法。* 长度类型为整数,并使用零压缩格式序列化.* 此外,它还提供了字符串遍历的方法,而不将字节数组转换为字符串。 * 还包括用于序列化/反序列化字符串、编码/解码字符串、检查字节数组是否包含有效UTF-8代码、计算编码字符串长度的实用程序。*/private Text keyword = new Text();private IntWritable keyvalue = new IntWritable(1);/*** 参数说明: context:上下文对象,贯穿整个任务的对象*/@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)throws IOException, InterruptedException {// 1、获取一行String line = value.toString();// 2、截取String[] split = line.split(",");// 3、输出到上下文对象中for (String string : split) {keyword.set(string);context.write(keyword, keyvalue);}}
}
ReduceDemo.java
package com.atSchool.MapReduce;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;/*** Reduce 阶段* Reduce的输入和Map的输出应该是一样的,输出的话保持不变就可以了* 到了Reduce阶段,传过来的数据是 hadoop-{1,1,1} 的形式*/
public class ReduceDemo extends Reducer<Text, IntWritable, Text, IntWritable> {IntWritable val = new IntWritable();/*** 参数1Text key:Map传过来的key 参数2Iterable<IntWritable> values(是一个迭代器)中存储的是{1,1,1}*/@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {// 累加int sum = 0;for (IntWritable intWritable : values) {sum += intWritable.get();}// 输出val.set(sum);context.write(key, val);}
}
WordCountJob.java
package com.atSchool.MapReduce;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import com.atSchool.utils.HDFSUtils;public class WordCountJob extends Configured implements Tool {public static void main(String[] args) throws Exception {new ToolRunner().run(new WordCountJob(), null);}@Overridepublic int run(String[] args) throws Exception {// 获取JobConfiguration configuration = new Configuration();configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");Job job = Job.getInstance(configuration);// 设置需要运行的任务job.setJarByClass(WordCountJob.class);// 告诉job Map和Reduce在哪job.setMapperClass(MapDemo.class);job.setReducerClass(ReduceDemo.class);// 告诉job Map输出的key和value的数据类型的是什么job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 告诉job Reduce输出的key和value的数据类型的是什么job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 告诉job输入和输出的路径FileInputFormat.addInputPath(job, new Path("/world.txt"));/*** 因为输出的文件不允许存在,所以需要处理一下*/FileSystem fileSystem = HDFSUtils.getFileSystem();Path path = new Path("/out");if (fileSystem.exists(path)) {fileSystem.delete(path, true);System.out.println("删除成功");}FileOutputFormat.setOutputPath(job, path);// 提交任务boolean waitForCompletion = job.waitForCompletion(true);System.out.println(waitForCompletion ? "执行成功" : "执行失败");return 0;}
}

2. 统计相同字母组成的单词案例

MapDemo.java
package com.atSchool.MapReduce;import java.io.IOException;
import java.util.Arrays;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;/*** Map 阶段 Map的输入是在读一个文本,输入时的key是一个偏移量,value是文本的类型* 输出的key是文本类型,value就是Hadoop中的整数类型*/
public class MapDemo extends Mapper<LongWritable, Text, Text, Text> {/*** 该类使用标准UTF-8编码存储文本。它提供了在字节级别序列化、反序列化和比较文本的方法。 长度类型为整数,并使用零压缩格式序列化.* 此外,它还提供了字符串遍历的方法,而不将字节数组转换为字符串。* 还包括用于序列化/反序列化字符串、编码/解码字符串、检查字节数组是否包含有效UTF-8代码、计算编码字符串长度的实用程序。*/private Text outKey = new Text();/*** 参数说明: context:上下文对象,贯穿整个任务的对象*/@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException {// 1、获取一行String line = value.toString().trim();// 2、转成字符数组,并进行排序char[] charArray = line.toCharArray();Arrays.sort(charArray);String string = new String(charArray);// 3、输出到上下文对象中outKey.set(string);// 输出的key是排好序后的数据,value是传过来的原数据context.write(outKey, value);}
}
ReduceDemo.java
package com.atSchool.MapReduce;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;/*** Reduce 阶段 Reduce的输入和Map的输出应该是一样的 输出的话保持不变就可以了* 到了Reduce阶段,传过来的数据是hadoop{1,1,1}的形式*/
public class ReduceDemo extends Reducer<Text, Text, Text, Text> {Text val = new Text();/*** 参数1Text key:Map传过来的key 参数2Iterable<IntWritable> values(是一个迭代器)中存储的是{1,1,1}*/@Overrideprotected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException {StringBuffer stringBuffer = new StringBuffer();// 累加for (Text text : values) {stringBuffer.append("-" + text.toString());}// 输出val.set(stringBuffer.toString());context.write(key, val);}
}
WordCountJob.java
package com.atSchool.MapReduce;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import com.atSchool.utils.HDFSUtils;public class WordCountJob extends Configured implements Tool {public static void main(String[] args) throws Exception {new ToolRunner().run(new WordCountJob(), null);}@Overridepublic int run(String[] args) throws Exception {// 获取JobConfiguration configuration = new Configuration();configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");Job job = Job.getInstance(configuration);// 设置需要运行的任务job.setJarByClass(WordCountJob.class);// 告诉job Map和Reduce在哪job.setMapperClass(MapDemo.class);job.setReducerClass(ReduceDemo.class);// 告诉job Map输出的key和value的数据类型的是什么job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);// 告诉job Reduce输出的key和value的数据类型的是什么job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 告诉job输入和输出的路径FileInputFormat.addInputPath(job, new Path("/world.txt"));/*** 因为输出的文件不允许存在,所以需要处理一下*/FileSystem fileSystem = HDFSUtils.getFileSystem();Path path = new Path("/MapReduceOut");if (fileSystem.exists(path)) {fileSystem.delete(path, true);System.out.println("删除成功");}FileOutputFormat.setOutputPath(job, path);// 提交任务boolean waitForCompletion = job.waitForCompletion(true);System.out.println(waitForCompletion ? "执行成功" : "执行失败");return 0;}
}

3. 气象平均值案例 (获取文件名)

分析资料:

  • 链接:https://pan.baidu.com/s/18-U1mQ3bimG7sBH4twds9A 提取码:fs79
MapDemo.java
package com.atSchool.MapReduce;import java.io.IOException;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;/*** Map 阶段 Map的输入是在读一个文本,输入时的key是一个偏移量,value是文本的类型* 输出的key是文本类型,value就是Hadoop中的整数类型*/
public class MapDemo extends Mapper<LongWritable, Text, Text, IntWritable> {/*** 该类使用标准UTF-8编码存储文本。它提供了在字节级别序列化、反序列化和比较文本的方法。 长度类型为整数,并使用零压缩格式序列化.* 此外,它还提供了字符串遍历的方法,而不将字节数组转换为字符串。* 还包括用于序列化/反序列化字符串、编码/解码字符串、检查字节数组是否包含有效UTF-8代码、计算编码字符串长度的实用程序。*/private Text outKey = new Text();private IntWritable outValue = new IntWritable();/*** 参数说明: context:上下文对象,贯穿整个任务的对象*/@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)throws IOException, InterruptedException {/*** 1、Hadoop将MapReduce的输入数据划分成等长的小数据块,称为输入分片(inputsplit)或简称为“分片”。* 2、Hadoop为每个分片构建一个map任务,并由该任务来运行用户自定义的map函数从而处理分片中的每条数据。* 3、getSplits()负责将文件切分成多个分片(InputSplit),但InputSplit并没有实际切分文件,而只是说明了如何切分数据,也就是说,InputSplit只是逻辑上的切分。* 4、每个InputSplit对应一个map任务。作为map的输入,在逻辑上提供了这个map任务所要处理的key-value对。*/// 获取到文件名对应的InputSplitInputSplit inputSplit = context.getInputSplit();// 强转成子类类型FileSplitFileSplit fSplit = (FileSplit) inputSplit;// 获取到路径Path path = fSplit.getPath();// 获取到文件名String name = path.getName();// 1、获取一行String line = value.toString();// 2、截取String[] split = line.split(" +");// 3、输出到上下文对象中outKey.set(name);int valueOf = Integer.valueOf(split[4].trim());// 由于数据文件中存在脏数据,所以要进行判断处理if (valueOf != -9999) {outValue.set(valueOf);context.write(outKey, outValue);}}
}
ReduceDemo.java
package com.atSchool.MapReduce;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;/*** Reduce 阶段 Reduce的输入和Map的输出应该是一样的 输出的话保持不变就可以了* 到了Reduce阶段,传过来的数据是hadoop{1,1,1}的形式*/
public class ReduceDemo extends Reducer<Text, IntWritable, Text, IntWritable> {IntWritable val = new IntWritable();/*** 参数1Text key:Map传过来的key 参数2Iterable<IntWritable> values(是一个迭代器)中存储的是{1,1,1}*/@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {// 累加int sum = 0; // 存储累加后的值int count = 0; // 统计数据的个数for (IntWritable intWritable : values) {sum += intWritable.get();count++;}sum /= count;// 输出val.set(sum);context.write(key, val);}
}
WordCountJob.java
package com.atSchool.MapReduce;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import com.atSchool.utils.HDFSUtils;public class WordCountJob extends Configured implements Tool {public static void main(String[] args) throws Exception {new ToolRunner().run(new WordCountJob(), null);}@Overridepublic int run(String[] args) throws Exception {// 获取JobConfiguration configuration = new Configuration();configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");Job job = Job.getInstance(configuration);// 设置需要运行的任务job.setJarByClass(WordCountJob.class);// 告诉job Map和Reduce在哪job.setMapperClass(MapDemo.class);job.setReducerClass(ReduceDemo.class);// 告诉job Map输出的key和value的数据类型的是什么job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 告诉job Reduce输出的key和value的数据类型的是什么job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 告诉job输入和输出的路径FileInputFormat.addInputPath(job, new Path("/weather_forecast_source"));/*** 因为输出的文件不允许存在,所以需要处理一下*/FileSystem fileSystem = HDFSUtils.getFileSystem();Path path = new Path("/MapReduceOut");if (fileSystem.exists(path)) {fileSystem.delete(path, true);System.out.println("删除成功");}FileOutputFormat.setOutputPath(job, path);// 提交任务boolean waitForCompletion = job.waitForCompletion(true);System.out.println(waitForCompletion ? "执行成功" : "执行失败");return 0;}
}

4. 统计IP流量案例

分析资料:

  • 链接:https://pan.baidu.com/s/16e418utH22G09FuLwXtFIQ 提取码:en5r
MapDemo.java
package com.atSchool.MapReduce;import java.io.IOException;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;/*** Map 阶段 Map的输入是在读一个文本,输入时的key是一个偏移量,value是文本的类型* 输出的key是文本类型,value就是Hadoop中的整数类型*/
public class MapDemo extends Mapper<LongWritable, Text, Text, IntWritable> {/*** 该类使用标准UTF-8编码存储文本。它提供了在字节级别序列化、反序列化和比较文本的方法。 长度类型为整数,并使用零压缩格式序列化.* 此外,它还提供了字符串遍历的方法,而不将字节数组转换为字符串。* 还包括用于序列化/反序列化字符串、编码/解码字符串、检查字节数组是否包含有效UTF-8代码、计算编码字符串长度的实用程序。*/private Text outKey = new Text();private IntWritable outValue = new IntWritable();/*** 参数说明: context:上下文对象,贯穿整个任务的对象*/@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)throws IOException, InterruptedException {// 1、获取一行String line = value.toString();// 2、截取String[] split = line.split(" +");// 3、输出到上下文对象中// 由于数据文件中存在脏数据,所以要进行判断处理if (!split[split.length - 1].trim().equals("-")) {outKey.set(split[0].trim()); // name 为ip地址outValue.set(Integer.valueOf(split[split.length - 1].trim()));context.write(outKey, outValue);}}
}
ReduceDemo.java
package com.atSchool.MapReduce;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;/*** Reduce 阶段 Reduce的输入和Map的输出应该是一样的 输出的话保持不变就可以了* 到了Reduce阶段,传过来的数据是hadoop{1,1,1}的形式*/
public class ReduceDemo extends Reducer<Text, IntWritable, Text, IntWritable> {IntWritable val = new IntWritable();/*** 参数1Text key:Map传过来的key 参数2Iterable<IntWritable> values(是一个迭代器)中存储的是{1,1,1}*/@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {// 累加int sum = 0; // 存储累加后的值for (IntWritable intWritable : values) {sum += intWritable.get();}// 输出val.set(sum);context.write(key, val);}
}
WordCountJob.java
package com.atSchool.MapReduce;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import com.atSchool.utils.HDFSUtils;public class WordCountJob extends Configured implements Tool {public static void main(String[] args) throws Exception {new ToolRunner().run(new WordCountJob(), null);}@Overridepublic int run(String[] args) throws Exception {// 获取JobConfiguration configuration = new Configuration();configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");Job job = Job.getInstance(configuration);// 设置需要运行的任务job.setJarByClass(WordCountJob.class);// 告诉job Map和Reduce在哪job.setMapperClass(MapDemo.class);job.setReducerClass(ReduceDemo.class);// 告诉job Map输出的key和value的数据类型的是什么job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 告诉job Reduce输出的key和value的数据类型的是什么job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 告诉job输入和输出的路径FileInputFormat.addInputPath(job, new Path("/traffic_source"));/*** 因为输出的文件不允许存在,所以需要处理一下*/FileSystem fileSystem = HDFSUtils.getFileSystem();Path path = new Path("/MapReduceOut");if (fileSystem.exists(path)) {fileSystem.delete(path, true);System.out.println("删除成功");}FileOutputFormat.setOutputPath(job, path);// 提交任务boolean waitForCompletion = job.waitForCompletion(true);System.out.println(waitForCompletion ? "执行成功" : "执行失败");return 0;}
}

运行MapReduce程序没有过程?

将Windows下安装的Hadoop下的hadoop-2.7.3\etc\hadoop\log4j.properties文件复制到项目的src目录下即可。

执行成功后生成的两个文件说明?

_SUCESS:仅仅用来说明执行成功了。

part-r-00000:存储执行过后的结果。

5.9_自定义序列化(排序)

1. 规范

自定义bean对象要想序列化传输,必须实现序列化接口,需要注意以下几项。

  1. 必须实现writable接口

  2. 反序列化时,需要反射调用空参构造函数,所以必须有空参构造

    public FlowBean() { super();
    }
    
  3. 重写序列化方法

    @override
    public void write(DataOutput out)throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);
    }
    
  4. 重写反序列化方法

    @override
    public void readFields(DataInput in)throws IOException {upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();
    }
    
  5. 注意反序列化的顺序和序列化的顺序完全一致

  6. 要想把结果显示在文件中,需要重写toString(),可用”\t"分开,方便后续用。

2. 流量统计案例

分析资料:

1363157985066    13726230503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com        24  27  2481    24681   200
1363157995052   13826544101 5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4            4   0   264 0   200
1363157991076   13926435656 20-10-7A-28-CC-0A:CMCC  120.196.100.99          2   4   132 1512    200
1363154400022   13926251106 5C-0E-8B-8B-B1-50:CMCC  120.197.40.4            4   0   240 0   200
1363157993044   18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99  iface.qiyi.com  视频网站    15  12  1527    2106    200
1363157995074   84138413    5C-0E-8B-8C-E8-20:7DaysInn  120.197.40.4    122.72.52.12        20  16  4116    1432    200
1363157993055   13560439658 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          18  15  1116    954 200
1363157995033   15920133257 5C-0E-8B-C7-BA-20:CMCC  120.197.40.4    sug.so.360.cn   信息安全    20  20  3156    2936    200
1363157983019   13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82          4   0   240 0   200
1363157984041   13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4    s19.cnzz.com    站点统计    24  9   6960    690 200
1363157973098   15013685858 5C-0E-8B-C7-F7-90:CMCC  120.197.40.4    rank.ie.sogou.com   搜索引擎    28  27  3659    3538    200
1363157986029   15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99  www.umeng.com   站点统计    3   3   1938    180 200
1363157992093   13560439658 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          15  9   918 4938    200
1363157986041   13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4            3   3   180 180 200
1363157984040   13602846565 5C-0E-8B-8B-B6-00:CMCC  120.197.40.4    2052.flash2-http.qq.com 综合门户    15  12  1938    2910    200
1363157995093   13922314466 00-FD-07-A2-EC-BA:CMCC  120.196.100.82  img.qfc.cn      12  12  3008    3720    200
1363157982040   13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99  y0.ifengimg.com 综合门户    57  102 7335    110349  200
1363157986072   18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99  input.shouji.sogou.com  搜索引擎    21  18  9531    2412    200
1363157990043   13925057413 00-1F-64-E1-E6-9A:CMCC  120.196.100.55  t3.baidu.com    搜索引擎    69  63  11058   48243   200
1363157988072   13760778710 00-FD-07-A4-7B-08:CMCC  120.196.100.82          2   2   120 120 200
1363157985066   13560436666 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com        24  27  2481    24681   200
1363157993055   13560436666 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          18  15  1116    954 200
// 说明:
1363157985066   13726230503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com    24      27              2481      24681     200
记录报告时间戳     手机号码        AP mac                      AC mac      访问的网址     网址种类  上行数据包个数   下行数据包个数  上行流量  下行流量

需求说明:

  • 统计每一个号码的上行流量、下行流量、总流量
FlowBean.java
package com.atSchool.Serialize;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.Writable;/*** 自定义序列化类* 1、实现Writable接口*/
public class FlowBean implements Writable {// 上行流量(即上传时使用的流量)private Long upFlow;// 下行流量(即下载时使用的流量)private Long downFlow;// 总流量private Long sumFlow;// 2、反序列化时,需要反射调用空参构造函数,所以必须有空参构造public FlowBean() {super();}// 带参的构造器public FlowBean(Long upFlow, Long downFlow) {this.upFlow = upFlow;this.downFlow = downFlow;this.sumFlow = upFlow + downFlow;}// 3、重写序列化方法@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}// 4、重写反序列化方法@Overridepublic void readFields(DataInput in) throws IOException {upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();}/*** 重写toString方法* 如果不重写toString方法,则不会输出FlowBean中的内容,回输出以下内容:* 13480253104  com.atSchool.Serialize.FlowBean@77353c84* 13502468823  com.atSchool.Serialize.FlowBean@31b3f152* 13560436666  com.atSchool.Serialize.FlowBean@52a567cf* 13560439658  com.atSchool.Serialize.FlowBean@3c17b48b* 13602846565  com.atSchool.Serialize.FlowBean@62186e91* 13660577991  com.atSchool.Serialize.FlowBean@4c0f3ae1* 13719199419  com.atSchool.Serialize.FlowBean@356db7b0*/@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}public Long getUpFlow() {return upFlow;}public void setUpFlow(Long upFlow) {this.upFlow = upFlow;}public Long getDownFlow() {return downFlow;}public void setDownFlow(Long downFlow) {this.downFlow = downFlow;}public Long getSumFlow() {return sumFlow;}public void setSumFlow(Long sumFlow) {this.sumFlow = sumFlow;}
}
MapSerialize.java
package com.atSchool.Serialize;import java.io.IOException;import javax.xml.transform.OutputKeys;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class MapSerialize extends Mapper<LongWritable, Text, Text, FlowBean> {private FlowBean outValue;private Text outKey = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context)throws IOException, InterruptedException {// 读取一行String line = value.toString();// 分割String[] split = line.split("\t+");// 输出到上下文对象中String tel = split[1].trim();String upFlow = split[split.length - 3].trim();String downFlow = split[split.length - 2].trim();outKey.set(tel);outValue = new FlowBean(Long.parseLong(upFlow), Long.parseLong(downFlow));// 输出的key:电话号码;value:FlowBean对象context.write(outKey, outValue);}
}
ReduceSerialize.java
package com.atSchool.Serialize;import java.io.IOException;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class ReduceSerialize extends Reducer<Text, FlowBean, Text, FlowBean> {@Overrideprotected void reduce(Text key, Iterable<FlowBean> value, Reducer<Text, FlowBean, Text, FlowBean>.Context context)throws IOException, InterruptedException {long upFlow = 0;long downFlow = 0;// 累加上行和下行流量数for (FlowBean flowBean : value) {upFlow += flowBean.getUpFlow();downFlow += flowBean.getDownFlow();}// 输出的key:电话号码;value:FlowBean对象context.write(key, new FlowBean(upFlow, downFlow));}
}
FlowJob.java
package com.atSchool.Serialize;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import com.atSchool.utils.HDFSUtils;public class FlowJob extends Configured implements Tool {public static void main(String[] args) throws Exception {new ToolRunner().run(new FlowJob(), null);}@Overridepublic int run(String[] args) throws Exception {// 获取JobConfiguration configuration = new Configuration();configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");Job job = Job.getInstance(configuration);// 设置需要运行的任务job.setJarByClass(FlowJob.class);// 告诉job Map和Reduce在哪job.setMapperClass(MapSerialize.class);job.setReducerClass(ReduceSerialize.class);// 告诉job Map输出的key和value的数据类型的是什么job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);// 告诉job Reduce输出的key和value的数据类型的是什么job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 告诉job输入和输出的路径FileInputFormat.addInputPath(job, new Path("/phone_traffic.txt"));/*** 因为输出的文件不允许存在,所以需要处理一下*/FileSystem fileSystem = HDFSUtils.getFileSystem();Path path = new Path("/MapReduceOut");if (fileSystem.exists(path)) {fileSystem.delete(path, true);System.out.println("删除成功");}FileOutputFormat.setOutputPath(job, path);// 提交任务boolean waitForCompletion = job.waitForCompletion(true);System.out.println(waitForCompletion ? "执行成功" : "执行失败");return 0;}
}

3. 流量排序案例(对流量统计案例的结果进行排序处理)

MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是 map输出的key

所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable,然后重写keycompareTo方法,最后在reduce中把传入的maperr处理好的数据的keyvalue进行调换,这样输出结果就是手机号在前,其他的在后了

FlowBean.java
package com.atSchool.Serialize;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;/*** 自定义序列化类* 1、实现WritableComparable接口* 说明:*       public interface WritableComparable<T> extends Writable, Comparable<T> {}*      该接口继承了Writable, Comparable两个接口,所以直接实现该接口即可*/
public class FlowBean implements WritableComparable<FlowBean> {// 上行流量(即上传时使用的流量)private Long upFlow;// 下行流量(即下载时使用的流量)private Long downFlow;// 总流量private Long sumFlow;// 2、反序列化时,需要反射调用空参构造函数,所以必须有空参构造public FlowBean() {super();}// 带参的构造器public FlowBean(Long upFlow, Long downFlow) {this.upFlow = upFlow;this.downFlow = downFlow;this.sumFlow = upFlow + downFlow;}// 为了在排序的时候使用(排序时已经知道了sumFlow,再用上面的一个进行加减不太好)public FlowBean(Long upFlow, Long downFlow, Long sumFlow) {this.upFlow = upFlow;this.downFlow = downFlow;this.sumFlow = sumFlow;}// 3、重写序列化方法@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}// 4、重写反序列化方法@Overridepublic void readFields(DataInput in) throws IOException {upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();}// 重写toString方法@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}public Long getUpFlow() {return upFlow;}public void setUpFlow(Long upFlow) {this.upFlow = upFlow;}public Long getDownFlow() {return downFlow;}public void setDownFlow(Long downFlow) {this.downFlow = downFlow;}public Long getSumFlow() {return sumFlow;}public void setSumFlow(Long sumFlow) {this.sumFlow = sumFlow;}// 重写了compareTo方法,设置成按照sumFlow的倒序排序@Overridepublic int compareTo(FlowBean flowBean) {return sumFlow > flowBean.sumFlow ? -1 : 1;}
}
MapSort.java

这里先将 keyvalue 颠倒进行排序

package com.atSchool.Serialize;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;/*** 这里因为是要按sumFlow排序,所以将输出的key设置为FlowBean类型*/
public class MapSort extends Mapper<LongWritable, Text, FlowBean, Text> {private Text outValue = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context)throws IOException, InterruptedException {String line = value.toString();// 分割String[] split = line.split("\t+");// 输出到上下文对象中String tel = split[0].trim();String upFlow = split[1].trim();String downFlow = split[2].trim();String sumFlow = split[3].trim();FlowBean flowBean = new FlowBean(Long.parseLong(upFlow), Long.parseLong(downFlow), Long.parseLong(sumFlow));outValue.set(tel);context.write(flowBean, outValue);}
}
ReduceSort.java

这里再将 keyvalue 换回来。

package com.atSchool.Serialize;import java.io.IOException;
import java.util.Iterator;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class ReduceSort extends Reducer<FlowBean, Text, Text, FlowBean> {@Overrideprotected void reduce(FlowBean key, Iterable<Text> value, Reducer<FlowBean, Text, Text, FlowBean>.Context context)throws IOException, InterruptedException {// 这里不需要遍历value,因为传过来的value就只是一个电话号码,直接拿出来就好了Iterator<Text> iterator = value.iterator();Text next = iterator.next();context.write(next, key);}
}
SortJob.java
package com.atSchool.Serialize;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import com.atSchool.utils.HDFSUtils;public class SortJob extends Configured implements Tool {public static void main(String[] args) throws Exception {new ToolRunner().run(new SortJob(), null);}@Overridepublic int run(String[] args) throws Exception {// 获取JobConfiguration configuration = new Configuration();configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");Job job = Job.getInstance(configuration);// 设置需要运行的任务job.setJarByClass(FlowJob.class);// 告诉job Map和Reduce在哪job.setMapperClass(MapSort.class);job.setReducerClass(ReduceSort.class);// 告诉job Map输出的key和value的数据类型的是什么job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);// 告诉job Reduce输出的key和value的数据类型的是什么job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 告诉job输入和输出的路径FileInputFormat.addInputPath(job, new Path("/MapReduceOut/part-r-00000"));/*** 因为输出的文件不允许存在,所以需要处理一下*/FileSystem fileSystem = HDFSUtils.getFileSystem();Path path = new Path("/SortOut");if (fileSystem.exists(path)) {fileSystem.delete(path, true);System.out.println("删除成功");}FileOutputFormat.setOutputPath(job, path);// 提交任务boolean waitForCompletion = job.waitForCompletion(true);System.out.println(waitForCompletion ? "执行成功" : "执行失败");return 0;}
}

5.10_计数器

计数器是用来记录job的执行进度和状态的。它的作用可以理解为日志。我们可以在程序的某个位置插入计数器,记录数据或者进度的变化情况。

MapReduce计数器(Counter)为我们提供一个窗口,用于观察MapReduce Job运行期的各种细节数据。对MapReduce性能调优很有帮助,MapReduce性能优化的评估大部分都是基于这些Counter的数值表现出来的。

MapReduce自带了许多默认Counter,现在我们来分析这些默认Counter的含义,方便大家观察Job结果,如输入的字节数、输出的字节数、Map端输入/输出的字节数和条数、Reduce端的输入/输出的字节数和条数等。

1. 解读:计数器Counter

...
...
21/04/02 11:06:49 INFO mapreduce.Job: Counters: 35      // Counters: 35 - 表示运行过程中使用到了35种计数器File System Counters // 计数器FILE: Number of bytes read=46873922FILE: Number of bytes written=107355440FILE: Number of read operations=0FILE: Number of large read operations=0FILE: Number of write operations=0HDFS: Number of bytes read=765843722HDFS: Number of bytes written=624014HDFS: Number of read operations=37HDFS: Number of large read operations=0HDFS: Number of write operations=10Map-Reduce Framework    // 计数器Map input records=1948789Map output records=1151021Map output bytes=21132892Map output materialized bytes=23434952Input split bytes=387Combine input records=0Combine output records=0Reduce input groups=31046Reduce shuffle bytes=23434952Reduce input records=1151021Reduce output records=31046Spilled Records=2302042Shuffled Maps =3Failed Shuffles=0Merged Map outputs=3GC time elapsed (ms)=1612Total committed heap usage (bytes)=3603431424Shuffle Errors      // 计数器BAD_ID=0     CONNECTION=0IO_ERROR=0WRONG_LENGTH=0WRONG_MAP=0WRONG_REDUCE=0File Input Format Counters        // 计数器Bytes Read=218157941File Output Format Counters  // 计数器Bytes Written=624014
  1. File System Counters:MR-Job执行依赖的数据来自不同的文件系统,这个group表示job与文件系统交互的读写统计

    • HDFS: Number of bytes read=765843722

      **说明:**map从HDFS读取数据,包括源文件内容、split元数据。所以这个值比FileInputFormatCounters.BYTES_READ 要略大些。

    • FILE: Number of bytes written=107355440

      **说明:**表示map task往本地磁盘中总共写了多少字节(其实,Reduce端的Merge也会写入本地File)

    • FILE: Number of bytes read=46873922

      **说明:**reduce从本地文件系统读取数据(map结果保存在本地磁盘)

    • HDFS: Number of bytes written=624014

      **说明:**最终结果写入HDFS

  2. Job Counters(上面的例子种没有出现):MR子任务统计,即map tasks 和 reduce tasks

    • Launched map tasks=4

      **说明:**启用map task的个数

    • Launched reduce tasks=5

      **说明:**启用reduce task的个数

  3. Map-Reduce Framework:MR框架计数器

    • Map input records=1948789

      **说明:**map task从HDFS读取的文件总行数

    • Reduce input groups=31046

      **说明:**Reduce输入的分组个数,如<hello,{1,1}> <me,1> <you,1>。如果有Combiner的话,那么这里的数值就等于map端Combiner运算后的最后条数,如果没有,那么就应该等于map的输出条数

    • Combine input records=0

      **说明:**Combiner输入 = map输出

    • Spilled Records=2302042

      **说明:**spill过程在map和reduce端都会发生,这里统计在总共从内存往磁盘中spill了多少条数据

  4. Shuffle Errors:

  5. File Input Format Counters:文件输入格式化计数器

    • Bytes Read=218157941

      **说明:**map阶段,各个map task的map方法输入的所有value值字节数之和

  6. File Output Format Counters:文件输出格式化计数器

    • Bytes Written=624014

      **说明:**MR输出总的字节数,包括【单词】,【空格】,【单词个数】及每行的【换行符】

2. 自定义计数器

//自定义计数器<Key , Value>的形式
Counter counter = context.getCounter("查找hello", "hello");
if (string.contains("hello")) {counter.increment(1L); //出现一次+1
}

3. map执行前后说明

在map方法运行的前后各会有一个方法执行

  • 前:setup(context);方法
  • 后:cleanup(context);方法

所以写计数器的时候我们不需要reduce,直接重写cleanup进行输出即可。

4. 统计文件的行数和单词的个数案例

WordCounterMapper.java
package com.atSchool.Counter;import java.io.IOException;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
import org.apache.zookeeper.txn.Txn;/*** 计数总的行数和单词数量*/
public class WordCounterMapper extends Mapper<LongWritable, Text, Text, IntWritable> {enum COUNTERS {ALLLINES, ALLWORDS}Counter counter = null;Counter counter2 = null;/*** 对输入拆分中的每个键/值对调用一次。大多数应用程序都应该覆盖这一点,但默认的是标识函数。*/@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)throws IOException, InterruptedException {/*** 获取一个计数器* * 此处用了动态代理*/counter = context.getCounter(COUNTERS.ALLLINES);counter2 = context.getCounter(COUNTERS.ALLWORDS);/*** void increment(long incr) 按给定值递增此计数器 参数: incr-增加该计数器的价值*/counter.increment(1L); // 出现一次 +1String[] split = value.toString().split(" +");for (String string : split) {/*** public static boolean isNotBlank(java.lang.String str)* 检查字符串是否为空(“”),不为空,是否仅为空白。* 示例:*      StringUtils.isNotBlank(null) = false*      StringUtils.isNotBlank("") = false*      StringUtils.isNotBlank(" ") = false*         StringUtils.isNotBlank("bob") = true*        StringUtils.isNotBlank(" bob ") = true*/if (StringUtils.isNotBlank(string)) {counter2.increment(1L); // 出现一次 +1}}}/*** 在map方法运行的前后各会有一个方法执行* 前:setup(context);方法* 后:cleanup(context);方法* * 所以这里我们不需要reduce,直接重写cleanup进行输出即可。*/@Overrideprotected void cleanup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)throws IOException, InterruptedException {// 去除计数器String name = counter.getName();long value = counter.getValue();String name2 = counter2.getName();long value2 = counter2.getValue();context.write(new Text(name), new IntWritable((int) value));context.write(new Text(name2), new IntWritable((int) value2));}
}
WordCounterJob.java
package com.atSchool.Counter;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import com.atSchool.utils.HDFSUtils;public class WordCounterJob extends Configured implements Tool {public static void main(String[] args) throws Exception {new ToolRunner().run(new WordCounterJob(), null);}@Overridepublic int run(String[] args) throws Exception {// 获取JobConfiguration configuration = new Configuration();configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");Job job = Job.getInstance(configuration);// 设置需要运行的任务job.setJarByClass(WordCounterJob.class);// 告诉job Map和Reduce在哪job.setMapperClass(WordCounterMapper.class);job.setNumReduceTasks(0); // !!!!!!这里用为没有用到reduce所以设置为0!!!!!!!!// 告诉job Map输出的key和value的数据类型的是什么job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 告诉job输入和输出的路径FileInputFormat.addInputPath(job, new Path("/a.txt"));/*** 因为输出的文件不允许存在,所以需要处理一下*/FileSystem fileSystem = HDFSUtils.getFileSystem();Path path = new Path("/MapReduceOut");if (fileSystem.exists(path)) {fileSystem.delete(path, true);System.out.println("删除成功");}FileOutputFormat.setOutputPath(job, path);// 提交任务boolean waitForCompletion = job.waitForCompletion(true);System.out.println(waitForCompletion ? "执行成功" : "执行失败");return 0;}
}

5. 统计超速车辆案例

分析资料:

  • 链接:https://pan.baidu.com/s/1XAVzBBEnZkBW_8bvVPqX8w 提取码:5gd0

数据说明:

日期         摄像机ID       地址        车速    纬度        经度               位置
07/01/2014,CHI003,4124 W FOSTER AVE,123,41.9756053,-87.7316698,"(41.9756053, -87.7316698)"
07/01/2014,CHI004,5120 N PULASKI,68,41.9743327,-87.728347,"(41.9743327, -87.728347)"
07/01/2014,CHI005,2080 W PERSHING,68,41.8231888,-87.6773488,"(41.8231888, -87.6773488)"
07/01/2014,CHI007,3843 S WESTERN,75,41.823564,-87.6847211,"(41.823564, -87.6847211)"
07/01/2014,CHI008,3655 W JACKSON,15,41.8770708,-87.7181683,"(41.8770708, -87.7181683)"
07/01/2014,CHI009,3646 W MADISON,50,41.8809382,-87.7178984,"(41.8809382, -87.7178984)"
07/01/2014,CHI010,1111 N HUMBOLDT,77,,,

需求:统计车速超过120的车辆

SpeedMaperr.java
package com.atSchool.Counter.overspeed;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;public class SpeedMaperr extends Mapper<LongWritable, Text, Text, LongWritable> {enum speed {OVERSPPD_CARS}Counter counter = null;@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)throws IOException, InterruptedException {// 获取计数器counter = context.getCounter(speed.OVERSPPD_CARS);// 处理数据String[] split = value.toString().split(",");// 由于第一行是表头,所以要稍微的处理一下、if (Character.isDigit(split[3].charAt(0))) {// 统计if (Integer.parseInt(split[3].trim()) >= 120) {counter.increment(1L); // 只要车速超过120就算超速+1}}}@Overrideprotected void cleanup(Mapper<LongWritable, Text, Text, LongWritable>.Context context)throws IOException, InterruptedException {String name = counter.getName();long value = counter.getValue();context.write(new Text(name), new LongWritable(value));}
}
SpeedJob.java
package com.atSchool.Counter.overspeed;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import com.atSchool.utils.HDFSUtils;public class SpeedJob extends Configured implements Tool {public static void main(String[] args) throws Exception {new ToolRunner().run(new SpeedJob(), null);}@Overridepublic int run(String[] args) throws Exception {// 获取JobConfiguration configuration = new Configuration();configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");Job job = Job.getInstance(configuration);// 设置需要运行的任务job.setJarByClass(SpeedJob.class);// 告诉job Map和Reduce在哪job.setMapperClass(SpeedMaperr.class);job.setNumReduceTasks(0); // 这里用为没有用到reduce所以设置为0// 告诉job Map输出的key和value的数据类型的是什么job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);// 告诉job输入和输出的路径FileInputFormat.addInputPath(job, new Path("/overspeed_cars.csv"));/*** 因为输出的文件不允许存在,所以需要处理一下*/FileSystem fileSystem = HDFSUtils.getFileSystem();Path path = new Path("/MapReduceOut");if (fileSystem.exists(path)) {fileSystem.delete(path, true);System.out.println("删除成功");}FileOutputFormat.setOutputPath(job, path);// 提交任务boolean waitForCompletion = job.waitForCompletion(true);System.out.println(waitForCompletion ? "执行成功" : "执行失败");return 0;}
}

5.11_MapReduce中的cleanup和setup

我们知道reduce和map都有一个局限性就是map是读一行执行一次reduce是每一组执行一次但是当我们想全部得到数据之后,按照需求筛选然后再输出(例如,经典的topN问题)怎么办?这时候只使用map和reduce显然是达不到目的的?那该怎么呢?

这时候我们想到了setUp和cleanUp的特性,只执行一次。这样我们对于最终数据的过滤,然后输出要放在cleanUp中。这样就能实现对数据,不一组一组输出,而是全部拿到,最后过滤输出。

topN问题案例

分析:

  • 我们知道mapreduce有切分、聚合的功能,所以第一步就是:先在map种把每个单词读出来,然后在reduce中聚合,求出每个单词出现的次数

  • 但是怎么控制只输出前三名呢?我们知道,map是读一行执行一次,reduce是每一组执行一次,所以只用map和reduce是无法控制输出的次数的,

    但是我们又知道,无论map或者reduce都有setUp和cleanUp,而且这两个方法只执行一次

    所以我们可以在reduce阶段把每一个单词当做key,单词出现的次数当做value,每一组存放到一个map集合里面(此时只存,不写出)。在reduce的cleanUp阶段对map进行排序,然后输出前三名

maperrDemo.java
package com.atSchool.topN;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class maperrDemo extends Mapper<LongWritable, Text, Text, IntWritable> {private Text outKey = new Text();private IntWritable outValue = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)throws IOException, InterruptedException {String[] split = value.toString().split(" +");for (String string : split) {outKey.set(string);context.write(outKey, outValue);}}
}
reduceDemo.java
package com.atSchool.topN;import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class reduceDemo extends Reducer<Text, IntWritable, Text, IntWritable> {// HashMap的key不能重复Map<String, Integer> hashMap = new HashMap<>();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {int count = 0;for (IntWritable intWritable : values) {count += intWritable.get();}// 拿到每个单词和总的数量后hashMap.put(key.toString(), count);}@Overrideprotected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)throws IOException, InterruptedException {// 进行排序后输出前三名/*** Set<Map.Entry<K,V>> entrySet():获取 键值对对象 的集合 * 说明: Map.Entry<K,V>:描述的是键值对的对象实体* 这个对象的方法有: a*        K getKey() 返回与此条目相对应的键。 *       V getValue() 返回与此条目相对应的值。*/LinkedList<Entry<String, Integer>> linkedList = new LinkedList<>(hashMap.entrySet());// 排序Collections.sort(linkedList, new Comparator<Map.Entry<String, Integer>>() {@Overridepublic int compare(Entry<String, Integer> o1, Entry<String, Integer> o2) {return o2.getValue() - o1.getValue();}});for (int i = 0; i < linkedList.size(); i++) {if (i <= 2) {context.write(new Text(linkedList.get(i).getKey()), new IntWritable(linkedList.get(i).getValue()));}}}
}
jobDemo.java
package com.atSchool.topN;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import com.atSchool.utils.HDFSUtils;public class jobDemo extends Configured implements Tool {public static void main(String[] args) throws Exception {new ToolRunner().run(new jobDemo(), null);}@Overridepublic int run(String[] args) throws Exception {// 获取JobConfiguration configuration = new Configuration();configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");Job job = Job.getInstance(configuration);// 设置需要运行的任务job.setJarByClass(jobDemo.class);// 告诉job Map和Reduce在哪job.setMapperClass(maperrDemo.class);job.setReducerClass(reduceDemo.class);// 告诉job Map输出的key和value的数据类型的是什么job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 告诉job Reduce输出的key和value的数据类型的是什么job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 告诉job输入和输出的路径FileInputFormat.addInputPath(job, new Path("/a.txt"));/*** 因为输出的文件不允许存在,所以需要处理一下*/FileSystem fileSystem = HDFSUtils.getFileSystem();Path path = new Path("/MapReduceOut");if (fileSystem.exists(path)) {fileSystem.delete(path, true);System.out.println("删除成功");}FileOutputFormat.setOutputPath(job, path);// 提交任务boolean waitForCompletion = job.waitForCompletion(true);System.out.println(waitForCompletion ? "执行成功" : "执行失败");return 0;}
}

5.12_MapReduce中的Combiner

MapReduce中的Combiner是为了避免map任务和reduce任务之间的数据传输而设置的。Hadoop允许用户针对maptask的输出指定一个合并函数。即为了减少传输到Reduce中的数据量。它主要是为了削减Mapper的输出从而减少网络带宽和Reducer之上的负载。

1. Combiner和Reducer的区别

  1. Combiner和Reducer的区别在于运行的位置:Combiner是在每一个MapTask所在的节点运行,Reducer是接收全局所有Mapper的输出结果
  2. Combiner的输入key-value的类型就是Mapper组件输出的key-value的类型,Combiner的输出key-value要跟reducer的输入key-value类型要对应起来
  3. Combiner的使用要非常谨慎,因为Combiner在MapReduce过程中是可选的组件,可能调用也可能不调用,可能调一次也可能调多次,所以:Combiner使用的原则是:有或没有都不能影响业务逻辑,都不能影向最终结果

2. 统计单词案例

1. 没有使用Combiner
21/04/04 14:14:27 INFO mapreduce.Job: Counters: 35File System CountersFILE: Number of bytes read=5975774FILE: Number of bytes written=9544692FILE: Number of read operations=0FILE: Number of large read operations=0FILE: Number of write operations=0HDFS: Number of bytes read=3562268HDFS: Number of bytes written=39HDFS: Number of read operations=15HDFS: Number of large read operations=0HDFS: Number of write operations=6Map-Reduce FrameworkMap input records=172368Map output records=229824Map output bytes=2528064Map output materialized bytes=2987718Input split bytes=98Combine input records=0            // 这里显示0Combine output records=0Reduce input groups=3Reduce shuffle bytes=2987718Reduce input records=229824Reduce output records=3Spilled Records=459648Shuffled Maps =1Failed Shuffles=0Merged Map outputs=1GC time elapsed (ms)=28Total committed heap usage (bytes)=466616320Shuffle ErrorsBAD_ID=0CONNECTION=0IO_ERROR=0WRONG_LENGTH=0WRONG_MAP=0WRONG_REDUCE=0File Input Format Counters Bytes Read=1781134File Output Format Counters Bytes Written=39
执行成功
2. 使用Combiner后
21/04/04 14:25:39 INFO mapreduce.Job: Counters: 35File System CountersFILE: Number of bytes read=426FILE: Number of bytes written=582500FILE: Number of read operations=0FILE: Number of large read operations=0FILE: Number of write operations=0HDFS: Number of bytes read=3562268HDFS: Number of bytes written=39HDFS: Number of read operations=15HDFS: Number of large read operations=0HDFS: Number of write operations=6Map-Reduce FrameworkMap input records=172368Map output records=229824Map output bytes=2528064Map output materialized bytes=44Input split bytes=98Combine input records=229824     // 这里说明使用了CombinerCombine output records=3Reduce input groups=3       // Reduce的工作量明显减少了Reduce shuffle bytes=44Reduce input records=3Reduce output records=3Spilled Records=6Shuffled Maps =1Failed Shuffles=0Merged Map outputs=1GC time elapsed (ms)=25Total committed heap usage (bytes)=464519168Shuffle ErrorsBAD_ID=0CONNECTION=0IO_ERROR=0WRONG_LENGTH=0WRONG_MAP=0WRONG_REDUCE=0File Input Format Counters Bytes Read=1781134File Output Format Counters Bytes Written=39
执行成功
MapDemo.java(和前面一样)
package com.atSchool.MapReduce;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class MapDemo extends Mapper<LongWritable, Text, Text, IntWritable> {private Text keyword = new Text();private IntWritable keyvalue = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)throws IOException, InterruptedException {// 1、获取一行String line = value.toString();// 2、截取String[] split = line.split(" +");// 3、输出到上下文对象中for (String string : split) {keyword.set(string);context.write(keyword, keyvalue);}}
}
ReduceDemo.java(和前面一样)
package com.atSchool.MapReduce;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class ReduceDemo extends Reducer<Text, IntWritable, Text, IntWritable> {IntWritable val = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {// 累加int sum = 0;for (IntWritable intWritable : values) {sum += intWritable.get();}// 输出val.set(sum);context.write(key, val);}
}
WordCombiner.java
package com.atSchool.MapReduce;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class WordCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {IntWritable val = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {// 累加int sum = 0;for (IntWritable intWritable : values) {sum += intWritable.get();}// 输出val.set(sum);context.write(key, val);}
}
WordCountJob.java
package com.atSchool.MapReduce;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import com.atSchool.utils.HDFSUtils;public class WordCountJob extends Configured implements Tool {public static void main(String[] args) throws Exception {new ToolRunner().run(new WordCountJob(), null);}@Overridepublic int run(String[] args) throws Exception {// 获取JobConfiguration configuration = new Configuration();configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");Job job = Job.getInstance(configuration);// 设置需要运行的任务job.setJarByClass(WordCountJob.class);// 告诉job Map和Reduce在哪job.setMapperClass(MapDemo.class);job.setCombinerClass(WordCombiner.class);  // 此处指定要使用的Combiner即可job.setReducerClass(ReduceDemo.class);// 告诉job Map输出的key和value的数据类型的是什么job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 告诉job Reduce输出的key和value的数据类型的是什么job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 告诉job输入和输出的路径FileInputFormat.addInputPath(job, new Path("/a.txt"));/*** 因为输出的文件不允许存在,所以需要处理一下*/FileSystem fileSystem = HDFSUtils.getFileSystem();Path path = new Path("/out");if (fileSystem.exists(path)) {fileSystem.delete(path, true);System.out.println("删除成功");}FileOutputFormat.setOutputPath(job, path);// 提交任务boolean waitForCompletion = job.waitForCompletion(true);System.out.println(waitForCompletion ? "执行成功" : "执行失败");return 0;}
}

5.13_倒排索引

1. 索引的概念

  • 在关系数据库中,索引是一种单独的、物理的对数据库表中一列或多列的值进行排序的一种存储结构,它是某个表中一列或若干列值的集合 和 相应的指向表中物理标识这些值的数据页的逻辑指针清单。

  • 索引的作用相当于图书的目录,可以根据目录中的页码快速找到所需的内容。索引提供指向存储在表的指定列中的数据值的指针,然后根据您指定的排序顺序对这些指针排序。数据库使用索引以找到特定值,然后顺指针找到包含该值的行。这样可以使对应于表的SQL语句执行得更快,可快速访问数据库表中的特定信息。

  • 当表中有大量记录时,若要对表进行查询,第一种搜索信息方式是全表搜索,是将所有记录一一取出,和查询条件进行一一对比,然后返回满足条件的记录,这样做会消耗大量数据库系统时间,并造成大量磁盘I/0操作;

    第二种就是在表中建立索引,然后在索引中找到符合查询条件的索引值,最后通过保存在索引中的ROWID(相当于页码)快速找到表中对应的记录.

2. 倒排索引

正向索引结构:
“文档1”的ID > 单词1:出现次数,出现位置列表;单词2:出现次数,出现位置列表;…………。
“文档2”的ID > 此文档出现的关键词列表。

一般是通过key,去找value当用户在主页上搜索关键词“华为手机”时,假设只存在正向索引(forward index),那么就需要扫描索引库中的所有文档,找出所有包含关键词“华为手机”的文档,再根据打分模型进行打分,排出名次后呈现给用户。因为互联网上收录在搜索引擎中的文档的数目是个天文数字,这样的索引结构根本无法满足实时返回排名结果的要求

所以,搜索引擎会将正向索引重新构建为倒排索引,即把文件ID对应到关键词的映射转换为关键词到文件ID的映射,每个关键词都对应着一系列的文件,这些文件中都出现这个关键词.

得到倒排索引的结构如下:
“关键词1”:“文档1”的ID,“文档2”的ID.
“关键词2”:带有此关键词的文档ID列表.

从词的关键字,去找文档

3. 制作倒排索引案例

分析资料:
  • 链接:https://pan.baidu.com/s/1Vng4GW0J1qa9jC_-7pGE7Q
    提取码:y3sf
需求:

得到day01~03文件中关键字的倒排索引

具体过程:
-------------第一步Mapper的输出结果格式如下:--------------------
context.wirte("love:day01.txt", "1")
context.wirte("beijing:day01.txt", "1")
context.wirte("love:day02.txt", "1")
context.wirte("beijing:day01.txt", "1")-------------第二步Combiner的得到的输入数据格式如下:-------------
<"love:day01.txt", {"1"}>
<"beijing:day01.txt", {"1","1"}>
<"love:day02.txt", {"1"}>-------------第二步Combiner的输出数据格式如下---------------------
context.write("love", "day01.txt:1")
context.write("beijing", "day01.txt:2")
context.write("love", "day02.txt:1")-------------第三步Reducer得到的输入数据格式如下:-----------------
<"love", {"day01.txt:1", "day02.txt:1"}>
<"beijing", {"day01.txt:2"}>-------------第三步Reducer输出的数据格式如下:-----------------
context.write("love", "day01.txt:1 day02.txt:2 ")
context.write("beijing", "day01.txt:2 ")最终结果为:
love    day01.txt:1 day02.txt:2
beijing day01.txt:2
mapperDemo.java
package com.atSchool.index;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;/*** 输出的形式:* context.wirte("love:day01.txt", "1")* context.wirte("beijing:day01.txt", "1")* context.wirte("love:day02.txt", "1")* context.wirte("beijing:day01.txt", "1")*/
public class mapperDemo extends Mapper<LongWritable, Text, Text, Text> {@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException {// 分割String[] split = value.toString().split(" +");// 获取文件名// 1、获取切片InputSplit inputSplit = context.getInputSplit();// 2、强转FileSplit fSplit = (FileSplit) inputSplit;// 3、得到文件路径再获取文件名String name = fSplit.getPath().getName();for (String string : split) {context.write(new Text(string + ":" + name), new Text("1"));System.out.println("mapper:" + "<" + string + ":" + name + "," + "1" + ">");}}
}
combinerDemo.java
package com.atSchool.index;import java.io.IOException;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;/*** 输入的形式:*  <"love:day01.txt", {"1"}>*    <"beijing:day01.txt", {"1","1"}>*   <"love:day02.txt", {"1"}>* * 输出的形式:*   context.write("love", "day01.txt:1")*  context.write("beijing", "day01.txt:2")*  context.write("love", "day02.txt:1")*/
public class combinerDemo extends Reducer<Text, Text, Text, Text> {private Text outKey = new Text();private Text outValue = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException {// 处理valueint sum = 0;for (Text text : value) {int parseInt = Integer.parseInt(text.toString().trim());sum += parseInt;}// 处理keyString[] split = key.toString().split(":");// 输出outKey.set(split[0].trim());outValue.set(split[1].trim() + ":" + String.valueOf(sum));context.write(outKey, outValue);System.out.println("combiner:" + "<" + split[0].trim() + "," + split[1].trim() + ":" + String.valueOf(sum) + ">");}
}
reduceDemo.java
package com.atSchool.index;import java.io.IOException;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;/*** 输入格式:*  <"love", {"day01.txt:1", "day02.txt:1"}>*  <"beijing", {"day01.txt:2"}>* * 输出格式:*  context.write("love", "day01.txt:1 day02.txt:2 ")  *  context.write("beijing", "day01.txt:2 ")*/
public class reduceDemo extends Reducer<Text, Text, Text, Text> {// StringBuilder stringBuilder = new StringBuilder();private Text outValue = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException {/*** 这里为什么不能将stringBuilder定义在外面?* 因为MP程序在运行的时候,只会走reduce方法,并不是将这整个类走一遍。* 这里stringBuilder的append方法会一直将数据在末尾追加,不会覆盖之前的数据。* 使用Test中的set方法时会将之前的数据覆盖。*/StringBuilder stringBuilder = new StringBuilder();/*** 迭代器为什么不能多次遍历?* 当调用next()时,返回当前索引(cursor)指向的元素,然后当前索引值(cursor)会+1,* 当所有元素遍历完,cursor == Collection.size(),* 此时再使用while(Iterator.hasNext())做循环条件时,返回的是false,无法进行下次遍历,* 如果需要多次使用Iterator进行遍历,当一次遍历完成,需要重新初始化Collection的iterator()。*/// for (Text text : value) {// System.out.println("reduce输入的value:" + text.toString());// }for (Text text : value) {System.out.println("reduce输入的value:" + text.toString());stringBuilder.append(" " + text.toString());}// 输出outValue.set(stringBuilder.toString());context.write(key, outValue);System.out.println("reduce:" + "<" + key.toString() + "," + stringBuilder.toString() + ">");}
}
jobDemo.java
package com.atSchool.index;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import com.atSchool.utils.HDFSUtils;public class jobDemo extends Configured implements Tool {public static void main(String[] args) throws Exception {new ToolRunner().run(new jobDemo(), null);}@Overridepublic int run(String[] args) throws Exception {// 获取JobConfiguration configuration = new Configuration();configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");Job job = Job.getInstance(configuration);// 设置需要运行的任务job.setJarByClass(jobDemo.class);// 告诉job Map和Reduce在哪job.setMapperClass(mapperDemo.class);job.setCombinerClass(combinerDemo.class);job.setReducerClass(reduceDemo.class);// 告诉job Map输出的key和value的数据类型的是什么job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);// 告诉job Reduce输出的key和value的数据类型的是什么job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 告诉job输入和输出的路径FileInputFormat.addInputPath(job, new Path("/index_source"));/*** 因为输出的文件不允许存在,所以需要处理一下*/FileSystem fileSystem = HDFSUtils.getFileSystem();Path path = new Path("/MapReduceOut");if (fileSystem.exists(path)) {fileSystem.delete(path, true);System.out.println("删除成功");}FileOutputFormat.setOutputPath(job, path);// 提交任务boolean waitForCompletion = job.waitForCompletion(true);System.out.println(waitForCompletion ? "执行成功" : "执行失败");return 0;}
}
运行结果
and   day01.txt:1
beijing  day01.txt:1 day03.txt:1
capital  day03.txt:1
china    day03.txt:1 day02.txt:1
i    day02.txt:1 day01.txt:2
is   day03.txt:1
love     day01.txt:2 day02.txt:1
of   day03.txt:1
shanghai     day01.txt:1
the  day03.txt:1

4. 统计辩论词频案例

分析资料:

  • 链接:https://pan.baidu.com/s/1XYZpOWRvxEyi1Chlv8CQRg
    提取码:lvpm

需求:

  • 统计资料中的speaker所说的话中单词出现的频率,以 单词 文件名:speaker:出次数 的形式输出。
mapperDemo.java
package com.atSchool.speak;import java.io.IOException;
import java.util.StringTokenizer;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;/*** 统计辩论赛的词频* * 文件内容* "Line","Speaker","Text","Date"* 1,"Holt","Good evening from Hofstra University in Hempstead, New York. I'm Lester Holt, anchor of ""NBC Nightly News."" I want to welcome you to the first presidential debate. The participants tonight are Donald Trump and Hillary Clinton. This debate is sponsored by the Commission on Presidential Debates, a nonpartisan, nonprofit organization. The commission drafted tonight's format, and the rules have been agreed to by the campaigns. The 90-minute debate is divided into six segments, each 15 minutes long. We'll explore three topic areas tonight: Achieving prosperity; America's direction; and securing America. At the start of each segment, I will ask the same lead-off question to both candidates, and they will each have up to two minutes to respond. From that point until the end of the segment, we'll have an open discussion. The questions are mine and have not been shared with the commission or the campaigns. The audience here in the room has agreed to remain silent so that we can focus on what the candidates are saying. I will invite you to applaud, however, at this moment, as we welcome the candidates: Democratic nominee for president of the United States, Hillary Clinton, and Republican nominee for president of the United States, Donald J. Trump.","9/26/16"* 2,"Audience","(APPLAUSE)","9/26/16"* 3,"Clinton","How are you, Donald?","9/26/16"* 4,"Audience","(APPLAUSE)","9/26/16"* 5,"Holt","Good luck to you.","9/26/16"*/
public class mapperDemo extends Mapper<LongWritable, Text, Text, Text> {private Text outKey = new Text();private Text outValue = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException {// 获取按指定规则分割后的一行String[] split = value.toString().split(",");if (split.length == 4 && Character.isDigit(split[0].charAt(0))) {String speaker = split[1]; // 辩论者String text = split[2]; // 辩论内容// 对辩论内容再进行分割/*** 字符串StringTokenizer类允许应用程序将字符串拆分成令牌。 * StringTokenizer方法不区分标识符,数字和引用的字符串,也不识别和跳过注释。 * 可以在创建时或每个令牌的基础上指定一组分隔符(分隔标记的字符)。 */StringTokenizer stringTokenizer = new StringTokenizer(text, " (),.?!--\"\"\n#");// 获取文件名String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();// 遍历每一个单词while (stringTokenizer.hasMoreElements()) {String nextToken = stringTokenizer.nextToken();outKey.set(nextToken + ":" + fileName + ":" + speaker);outValue.set("1");context.write(outKey, outValue);}}}
}
combinerDemo.java
package com.atSchool.speak;import java.io.IOException;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class combinerDemo extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException {// 合并int sum = 0;for (Text text : value) {Integer valueOf = Integer.valueOf(text.toString());sum += valueOf;}// 改变kv重新输出String[] split = key.toString().split(":");String outKey = split[0]; // 单词String outValue = split[1] + ":" + split[2] + ":" + String.valueOf(sum); // 文件名:辩论者:出现次数context.write(new Text(outKey), new Text(outValue));}
}
reduceDemo.java
package com.atSchool.speak;import java.io.IOException;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class reduceDemo extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException {StringBuffer stringBuffer = new StringBuffer();for (Text text : value) {stringBuffer.append(text.toString() + "\t");}context.write(key, new Text(stringBuffer.toString()));}
}
jobDemo.java
package com.atSchool.speak;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import com.atSchool.utils.HDFSUtils;public class jobDemo extends Configured implements Tool {public static void main(String[] args) throws Exception {new ToolRunner().run(new jobDemo(), null);}@Overridepublic int run(String[] args) throws Exception {// 获取JobConfiguration configuration = new Configuration();configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");Job job = Job.getInstance(configuration);// 设置需要运行的任务job.setJarByClass(jobDemo.class);// 告诉job Map和Reduce在哪job.setMapperClass(mapperDemo.class);job.setCombinerClass(combinerDemo.class);job.setReducerClass(reduceDemo.class);// 告诉job Map输出的key和value的数据类型的是什么job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);// 告诉job Reduce输出的key和value的数据类型的是什么job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 告诉job输入和输出的路径FileInputFormat.addInputPath(job, new Path("/美国两党辩论关键词"));/*** 因为输出的文件不允许存在,所以需要处理一下*/FileSystem fileSystem = HDFSUtils.getFileSystem();Path path = new Path("/MapReduceOut");if (fileSystem.exists(path)) {fileSystem.delete(path, true);System.out.println("删除成功");}FileOutputFormat.setOutputPath(job, path);// 提交任务boolean waitForCompletion = job.waitForCompletion(true);System.out.println(waitForCompletion ? "执行成功" : "执行失败");return 0;}}

5.14_计算共同好友案例

分析资料:

A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J

mapperDemo.java

package com.atSchool.friend;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;/*** 统计两个人的共同好友* * 原数据:* A:B,C,D,F,E,O* B:A,C,E,K* C:F,A,D,I* D:A,E,F,L* * 性质:好友双方应当都有对方名字,不会存在单一的情况* 所以A中有B,即B中有A*/
public class mapperDemo extends Mapper<LongWritable, Text, Text, Text> {private Text outKey = new Text();private Text outValue = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException {String[] split = value.toString().trim().split(":");// 获取好友列表String[] split2 = split[1].split(",");outValue.set(split[0]);      // 当前用户// 遍历好友列表for (String string : split2) {/*** 输出:* B A* C A* D A* F A* E A* O A* A B* C B* E B* K B*/outKey.set(string);context.write(outKey, outValue);System.out.println("mapper:" + string + "\t" + split[0]);}}
}

reduceDemo.java

package com.atSchool.friend;import java.io.IOException;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class reduceDemo extends Reducer<Text, Text, Text, Text> {private Text outValue = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException {StringBuilder stringBuilder = new StringBuilder();for (Text text : value) {stringBuilder.append(text.toString() + ",");}outValue.set(stringBuilder.toString());context.write(key, outValue);System.out.println("reduce-in-out:" + key.toString() + "\t" + stringBuilder.toString());}
}

jobDemo.java

package com.atSchool.friend;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import com.atSchool.utils.HDFSUtils;public class jobDemo extends Configured implements Tool {public static void main(String[] args) throws Exception {new ToolRunner().run(new jobDemo(), null);}@Overridepublic int run(String[] args) throws Exception {// 获取JobConfiguration configuration = new Configuration();configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");Job job = Job.getInstance(configuration);// 设置需要运行的任务job.setJarByClass(jobDemo.class);// 告诉job Map和Reduce在哪job.setMapperClass(mapperDemo.class);job.setReducerClass(reduceDemo.class);// 告诉job Map输出的key和value的数据类型的是什么job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);// 告诉job Reduce输出的key和value的数据类型的是什么job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 告诉job输入和输出的路径FileInputFormat.addInputPath(job, new Path("/friend.txt"));/*** 因为输出的文件不允许存在,所以需要处理一下*/FileSystem fileSystem = HDFSUtils.getFileSystem();Path path = new Path("/MapReduceOut");if (fileSystem.exists(path)) {fileSystem.delete(path, true);System.out.println("删除成功");}FileOutputFormat.setOutputPath(job, path);// 提交任务boolean waitForCompletion = job.waitForCompletion(true);System.out.println(waitForCompletion ? "执行成功" : "执行失败");return 0;}}

mapperDemo2.java

package com.atSchool.friend;import java.io.IOException;
import java.util.Arrays;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class mapperDemo2 extends Mapper<LongWritable, Text, Text, Text> {private Text outKey = new Text();private Text outValue = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException {// 输出的valueString[] split = value.toString().split("\t");outValue.set(split[0]);// 输出的keyString[] split2 = split[1].split(",");Arrays.sort(split2);for (int i = 0; i < split2.length; i++) {for (int j = i + 1; j < split2.length; j++) {outKey.set(split2[i] + split2[j]);/*** 输出:* BC A* BD   A* BF   A* BG   A* BH   A* BI   A* BK   A* BO   A* CD   A* CF   A* CG   A* CH   A* CI   A* CK   A* CO   A*/context.write(outKey, outValue);System.out.println("mapper-out:" + outKey.toString() + "\t" + outValue.toString());}}}
}

jobDemo2.java

package com.atSchool.friend;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import com.atSchool.utils.HDFSUtils;public class jobDemo2 extends Configured implements Tool {public static void main(String[] args) throws Exception {new ToolRunner().run(new jobDemo2(), null);}@Overridepublic int run(String[] args) throws Exception {// 获取JobConfiguration configuration = new Configuration();configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");Job job = Job.getInstance(configuration);// 设置需要运行的任务job.setJarByClass(jobDemo2.class);// 告诉job Map和Reduce在哪job.setMapperClass(mapperDemo2.class);job.setReducerClass(reduceDemo.class);// 告诉job Map输出的key和value的数据类型的是什么job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);// 告诉job Reduce输出的key和value的数据类型的是什么job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 告诉job输入和输出的路径FileInputFormat.addInputPath(job, new Path("/MapReduceOut/part-r-00000"));/*** 因为输出的文件不允许存在,所以需要处理一下*/FileSystem fileSystem = HDFSUtils.getFileSystem();Path path = new Path("/out");if (fileSystem.exists(path)) {fileSystem.delete(path, true);System.out.println("删除成功");}FileOutputFormat.setOutputPath(job, path);// 提交任务boolean waitForCompletion = job.waitForCompletion(true);System.out.println(waitForCompletion ? "执行成功" : "执行失败");return 0;}}

5.15_自定义分区

1. 自定义Partitioner步骤

(1)自定义类继承Partitioner,重写getPartition()方法

(2)在job驱动中,设置自定义partitioner:

job.setPartitionerClass(CustomPartitioner.class);

(3)自定义partition后,要根据自定义Partitioner的逻辑设置相应数量的reducetask

job.setNumReduceTasks(5);

注意:

reduceTask的个数决定了有几个文件!!

如果reduceTask的数量 > getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;

如果1 < reduceTask的数量 < getPartition的结果数,则有一部分 分区数据无处安放,会报Exception;

如果reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件part–00000;

2. 统计手机流量并按手机号划分案例

分析资料:

1363157985066    13726230503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com        24  27  2481    24681   200
1363157995052   13826544101 5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4            4   0   264 0   200
1363157991076   13926435656 20-10-7A-28-CC-0A:CMCC  120.196.100.99          2   4   132 1512    200
1363154400022   13926251106 5C-0E-8B-8B-B1-50:CMCC  120.197.40.4            4   0   240 0   200
1363157993044   18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99  iface.qiyi.com  视频网站    15  12  1527    2106    200
1363157995074   84138413    5C-0E-8B-8C-E8-20:7DaysInn  120.197.40.4    122.72.52.12        20  16  4116    1432    200
1363157993055   13560439658 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          18  15  1116    954 200
1363157995033   15920133257 5C-0E-8B-C7-BA-20:CMCC  120.197.40.4    sug.so.360.cn   信息安全    20  20  3156    2936    200
1363157983019   13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82          4   0   240 0   200
1363157984041   13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4    s19.cnzz.com    站点统计    24  9   6960    690 200
1363157973098   15013685858 5C-0E-8B-C7-F7-90:CMCC  120.197.40.4    rank.ie.sogou.com   搜索引擎    28  27  3659    3538    200
1363157986029   15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99  www.umeng.com   站点统计    3   3   1938    180 200
1363157992093   13560439658 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          15  9   918 4938    200
1363157986041   13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4            3   3   180 180 200
1363157984040   13602846565 5C-0E-8B-8B-B6-00:CMCC  120.197.40.4    2052.flash2-http.qq.com 综合门户    15  12  1938    2910    200
1363157995093   13922314466 00-FD-07-A2-EC-BA:CMCC  120.196.100.82  img.qfc.cn      12  12  3008    3720    200
1363157982040   13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99  y0.ifengimg.com 综合门户    57  102 7335    110349  200
1363157986072   18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99  input.shouji.sogou.com  搜索引擎    21  18  9531    2412    200
1363157990043   13925057413 00-1F-64-E1-E6-9A:CMCC  120.196.100.55  t3.baidu.com    搜索引擎    69  63  11058   48243   200
1363157988072   13760778710 00-FD-07-A4-7B-08:CMCC  120.196.100.82          2   2   120 120 200
1363157985066   13560436666 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com        24  27  2481    24681   200
1363157993055   13560436666 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          18  15  1116    954 200
// 说明:(注意不一定每一条数据都有如下项,所以得注意)
1363157995033   15920133257 5C-0E-8B-C7-BA-20:CMCC  120.197.40.4    sug.so.360.cn   信息安全      20             20        3156     2936        200
记录报告时间戳     手机号码        AP mac                  AC mac         访问的网址      网址种类  上行数据包个数   下行数据包个数  上行流量  下行流量 HTTP Response的状态
mapperDemo.java
package com.atSchool.partitioner;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class mapperDemo extends Mapper<LongWritable, Text, Text, LongWritable> {private Text outKey = new Text();private LongWritable outValue = new LongWritable();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)throws IOException, InterruptedException {// 读取一行String line = value.toString();// 分割String[] split = line.split("\t+");// 输出到上下文对象中String tel = split[1].trim();String upFlow = split[split.length - 3].trim();String downFlow = split[split.length - 2].trim();outKey.set(tel);outValue.set(Long.parseLong(upFlow) + Long.parseLong(downFlow));context.write(outKey, outValue);}
}
partitionerDemo.java
package com.atSchool.partitioner;import java.util.HashMap;
import java.util.Map;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;public class partitionerDemo extends Partitioner<Text, LongWritable> {// 初始化分区private static Map<String, Integer> maps = new HashMap<>();static {maps.put("135", 0);maps.put("137", 1);maps.put("138", 2);maps.put("139", 3);}// 如果不是上面分区的任意一种,则放到第5个分区@Overridepublic int getPartition(Text key, LongWritable value, int numPartitions) {String substring = key.toString().substring(0, 3);if (maps.get(substring) == null) {return 4;} else {return maps.get(substring);}}
}
reduceDemo.java
package com.atSchool.partitioner;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class reduceDemo extends Reducer<Text, LongWritable, Text, LongWritable> {@Overrideprotected void reduce(Text key, Iterable<LongWritable> value,Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {long sum = 0;for (LongWritable longWritable : value) {sum += longWritable.get();}context.write(key, new LongWritable(sum));}
}
jobDemo.java
package com.atSchool.partitioner;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import com.atSchool.utils.HDFSUtils;public class jobDemo extends Configured implements Tool {public static void main(String[] args) throws Exception {new ToolRunner().run(new jobDemo(), null);}@Overridepublic int run(String[] args) throws Exception {// 获取JobConfiguration configuration = new Configuration();configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");Job job = Job.getInstance(configuration);// 设置需要运行的任务job.setJarByClass(jobDemo.class);// 告诉job Map和Reduce在哪job.setMapperClass(mapperDemo.class);job.setPartitionerClass(partitionerDemo.class); // 设置自定义partitionerjob.setReducerClass(reduceDemo.class);// 告诉job Map输出的key和value的数据类型的是什么job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);// 告诉job Reduce输出的key和value的数据类型的是什么job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);job.setNumReduceTasks(5); // 根据自定义Partitioner的逻辑设置相应数量的reducetask// 告诉job输入和输出的路径FileInputFormat.addInputPath(job, new Path("/phone_traffic.txt"));/*** 因为输出的文件不允许存在,所以需要处理一下*/FileSystem fileSystem = HDFSUtils.getFileSystem();Path path = new Path("/MapReduceOut");if (fileSystem.exists(path)) {fileSystem.delete(path, true);System.out.println("删除成功");}FileOutputFormat.setOutputPath(job, path);// 提交任务boolean waitForCompletion = job.waitForCompletion(true);System.out.println(waitForCompletion ? "执行成功" : "执行失败");return 0;}
}

5.16_MapReduce综合应用

统计分析学生课程与成绩案例

分析资料:

computer,huangxilaoming,78,90,80,76,87
computer,huangbo,65,60,0,75,77
computer,xuzheng,50,60,60,40,80,90,100
computer,wangbaoqiang,57,87,98,87,54,65,32,21
java,liuyifei,50,40,90,50,80,70,60,50,40
java,xuezhiqian,50,40,60,70,80,90,90,50
java,wanghan,88,99,88, 99,77,55
java,huangzitao,90,50,80,70,60,50,40
java,huangbo,70,80,90,90,50
java,xuzheng,99,88,99,77,55
scala,wangzulan,50,60,70,80,90,40,50,60
scala,dengchao,60,50,90,60,40,50,60,70,50
scala,zhouqi,60,50,40,90,80,70,80,90
scala,liuqian,80,90,40,50,60
scala,liutao,60,40,50,60,70,50
scala,zhourunfa,40,90,80,70,80,90
mysql,liutao,50,60,70,80,90,100,65,60
mysql,liuqian,50,60,50,70,80,90,50,60,40
mysql,zhourunfa,80,50,60,90,70,50,60,40,50

数据说明:

数据解释数据字段个数不固定:第一个是课程名称,总共四个课程,computer,math,english,algorithm,第二个是学生姓名,后面是每次考试的分数,但是每个学生在某门课程中的考试次数不固定

1.统计每门课程的参考人数和课程平均分

mapperDemo.java
package com.atSchool.partitioner2;import java.io.IOException;import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;/*** 统计分析学生课程与成绩案例*/
public class mapperDemo extends Mapper<LongWritable, Text, Text, FloatWritable> {private Text outKey = new Text();private FloatWritable outValue = new FloatWritable();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FloatWritable>.Context context)throws IOException, InterruptedException {// 读取一行+分割String[] split = value.toString().trim().split(",");// 输出到上下文对象中String courseName = split[0].trim(); // 课程名float sum = 0;float index = 0;for (int i = 2; i < split.length; i++) {sum += Float.parseFloat(split[i].trim());index++;}float averageScore = sum / index; // 学生平均分outKey.set(courseName);outValue.set(averageScore);context.write(outKey, outValue);}
}
reduceDemo.java
package com.atSchool.partitioner2;import java.io.IOException;import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class reduceDemo extends Reducer<Text, FloatWritable, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<FloatWritable> value,Reducer<Text, FloatWritable, Text, Text>.Context context) throws IOException, InterruptedException {float sum = 0;float index = 0;for (FloatWritable floatWritable : value) {sum += floatWritable.get();index++;}float averageScore = sum / index;context.write(key, new Text("考试人数:" + index + "\t" + "课程考试平均成绩:" + averageScore));}
}
jobDemo.java
package com.atSchool.partitioner2;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import com.atSchool.utils.HDFSUtils;public class jobDemo extends Configured implements Tool {public static void main(String[] args) throws Exception {new ToolRunner().run(new jobDemo(), null);}@Overridepublic int run(String[] args) throws Exception {// 获取JobConfiguration configuration = new Configuration();configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");Job job = Job.getInstance(configuration);// 设置需要运行的任务job.setJarByClass(jobDemo.class);// 告诉job Map和Reduce在哪job.setMapperClass(mapperDemo.class);job.setReducerClass(reduceDemo.class);// 告诉job Map输出的key和value的数据类型的是什么job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FloatWritable.class);// 告诉job Reduce输出的key和value的数据类型的是什么job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 告诉job输入和输出的路径FileInputFormat.addInputPath(job, new Path("/student.txt"));/*** 因为输出的文件不允许存在,所以需要处理一下*/FileSystem fileSystem = HDFSUtils.getFileSystem();Path path = new Path("/MapReduceOut");if (fileSystem.exists(path)) {fileSystem.delete(path, true);System.out.println("删除成功");}FileOutputFormat.setOutputPath(job, path);// 提交任务boolean waitForCompletion = job.waitForCompletion(true);System.out.println(waitForCompletion ? "执行成功" : "执行失败");return 0;}
}

输出结果:

computer 考试人数:4.0 课程考试平均成绩:67.19911
java    考试人数:6.0 课程考试平均成绩:71.98823
mysql   考试人数:3.0 课程考试平均成绩:64.69907
scala   考试人数:6.0 课程考试平均成绩:64.23148

2.统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件,并且按平均分从高到低排序,分数保留一位小数

beanDemo.java
package com.atSchool.studentDemo2;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class beanDemo implements WritableComparable<beanDemo> {private String courseName; // 课程名称private String stuName; // 学生姓名private float avgScore; // 平均成绩// 构造方法public beanDemo() {super();}public beanDemo(String courseName, String stuName, float avgScore) {super();this.courseName = courseName;this.stuName = stuName;this.avgScore = avgScore;}// getter/setter方法public String getCourseName() {return courseName;}public void setCourseName(String courseName) {this.courseName = courseName;}public String getStuName() {return stuName;}public void setStuName(String stuName) {this.stuName = stuName;}public float getAvgScore() {return avgScore;}public void setAvgScore(float avgScore) {this.avgScore = avgScore;}// 重写方法@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(courseName);out.writeUTF(stuName);out.writeFloat(avgScore);}@Overridepublic void readFields(DataInput in) throws IOException {courseName = in.readUTF();stuName = in.readUTF();avgScore = in.readFloat();}/*** 返回一个负整数、零或正整数,因为此对象小于、等于或大于指定对象。* * o1.compareTo(o2);* 返回正数,比较对象(compareTo传参对象o2)放在 当前对象(调用compareTo方法的对象o1)的前面* 返回负数,放在后面*/@Overridepublic int compareTo(beanDemo o) {// 判断是不是同一个课程int compareTo = o.courseName.compareTo(this.courseName);// 如果是同一个课程if (compareTo == 0) {// 如果比较的对象比当前对象小,就返回正数,比较对象放在当前对象的后面return avgScore > o.avgScore ? -1 : 1;}return compareTo > 0 ? 1 : -1;}@Overridepublic String toString() {return "courseName=" + courseName + ", stuName=" + stuName + ", avgScore=" + avgScore;}
}
getPartition.java
package com.atSchool.studentDemo2;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;public class partitionerDemo extends Partitioner<beanDemo, NullWritable> {@Overridepublic int getPartition(beanDemo key, NullWritable value, int numPartitions) {String courseName = key.getCourseName();if (courseName.equals("java")) {return 0;} else if (courseName.equals("computer")) {return 1;} else if (courseName.equals("scala")) {return 2;} else if (courseName.equals("mysql")) {return 3;} else {return 4;}}
}
mapperDemo.java
package com.atSchool.studentDemo2;import java.io.IOException;import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;/*** 统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件,* 并且按平均分从高到低排序,分数保留一位小数*/
public class mapperDemo extends Mapper<LongWritable, Text, beanDemo, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, beanDemo, NullWritable>.Context context)throws IOException, InterruptedException {// 读取一行+分割String[] split = value.toString().trim().split(",");// 输出到上下文对象中String courseName = split[0].trim(); // 课程名String stuName = split[1].trim();float sum = 0;float index = 0;for (int i = 2; i < split.length; i++) {sum += Float.parseFloat(split[i].trim());index++;}String format = String.format("%.1f", sum / index); // 学生平均分,保留一位小数beanDemo beanDemo = new beanDemo(courseName, stuName, Float.parseFloat(format););context.write(beanDemo, NullWritable.get());}
}
jobDemo.java
package com.atSchool.studentDemo2;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import com.atSchool.utils.HDFSUtils;public class jobDemo extends Configured implements Tool {public static void main(String[] args) throws Exception {new ToolRunner().run(new jobDemo(), null);}@Overridepublic int run(String[] args) throws Exception {// 获取JobConfiguration configuration = new Configuration();configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");Job job = Job.getInstance(configuration);// 设置需要运行的任务job.setJarByClass(jobDemo.class);// 告诉job Map和Reduce在哪job.setMapperClass(mapperDemo.class);job.setPartitionerClass(partitionerDemo.class); // 设置自定义partitionerjob.setNumReduceTasks(5);   // 根据自定义Partitioner的逻辑设置相应数量的reducetask// 告诉job Map输出的key和value的数据类型的是什么job.setMapOutputKeyClass(beanDemo.class);job.setMapOutputValueClass(NullWritable.class);// 告诉job Reduce输出的key和value的数据类型的是什么job.setOutputKeyClass(beanDemo.class);job.setOutputValueClass(NullWritable.class);// 告诉job输入和输出的路径FileInputFormat.addInputPath(job, new Path("/student.txt"));/*** 因为输出的文件不允许存在,所以需要处理一下*/FileSystem fileSystem = HDFSUtils.getFileSystem();Path path = new Path("/MapReduceOut");if (fileSystem.exists(path)) {fileSystem.delete(path, true);System.out.println("删除成功");}FileOutputFormat.setOutputPath(job, path);// 提交任务boolean waitForCompletion = job.waitForCompletion(true);System.out.println(waitForCompletion ? "执行成功" : "执行失败");return 0;}
}

3.求出每门课程参考学生成绩排名前3的学生的信息:课程,姓名和平均分(分组)

groupDemo.java
package com.atSchool.studentDemo3;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;/*** 分组*/
public class groupDemo extends WritableComparator {public groupDemo() {super(beanDemo.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {beanDemo s1 = (beanDemo) a;beanDemo s2 = (beanDemo) b;return s1.getCourseName().compareTo(s2.getCourseName());}}
mapperDemo.java
package com.atSchool.studentDemo3;import java.io.IOException;import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;/*** 求出每门课程参考学生成绩排名前3的学生的信息:课程,姓名和平均分*/
public class mapperDemo extends Mapper<LongWritable, Text, beanDemo, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, beanDemo, NullWritable>.Context context)throws IOException, InterruptedException {// 读取一行+分割String[] split = value.toString().trim().split(",");// 输出到上下文对象中String courseName = split[0].trim(); // 课程名String stuName = split[1].trim();    // 学生姓名float sum = 0;float index = 0;for (int i = 2; i < split.length; i++) {sum += Float.parseFloat(split[i].trim());index++;}float averageScore = sum / index; // 学生平均分beanDemo beanDemo = new beanDemo(courseName, stuName, averageScore);context.write(beanDemo, NullWritable.get());}
}
reduceDemo.java
package com.atSchool.studentDemo3;import java.io.IOException;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;public class reduceDemo extends Reducer<beanDemo, NullWritable, beanDemo, NullWritable> {@Overrideprotected void reduce(beanDemo key, Iterable<NullWritable> value,Reducer<beanDemo, NullWritable, beanDemo, NullWritable>.Context context)throws IOException, InterruptedException {int count = 0;for (NullWritable nullWritable : value) {if (count < 3) {context.write(key, nullWritable);count++;} else {break;}}}
}
jobDemo.java
package com.atSchool.studentDemo3;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import com.atSchool.utils.HDFSUtils;public class jobDemo extends Configured implements Tool {public static void main(String[] args) throws Exception {new ToolRunner().run(new jobDemo(), null);}@Overridepublic int run(String[] args) throws Exception {// 获取JobConfiguration configuration = new Configuration();configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");Job job = Job.getInstance(configuration);// 设置需要运行的任务job.setJarByClass(jobDemo.class);// 告诉job Map和Reduce在哪job.setMapperClass(mapperDemo.class);job.setReducerClass(reduceDemo.class);// 告诉job Map输出的key和value的数据类型的是什么job.setMapOutputKeyClass(beanDemo.class);job.setMapOutputValueClass(NullWritable.class);// 告诉job Reduce输出的key和value的数据类型的是什么job.setOutputKeyClass(beanDemo.class);job.setOutputValueClass(NullWritable.class);job.setGroupingComparatorClass(groupDemo.class);  // 指定分组的规则// 告诉job输入和输出的路径FileInputFormat.addInputPath(job, new Path("/student.txt"));/*** 因为输出的文件不允许存在,所以需要处理一下*/FileSystem fileSystem = HDFSUtils.getFileSystem();Path path = new Path("/MapReduceOut");if (fileSystem.exists(path)) {fileSystem.delete(path, true);System.out.println("删除成功");}FileOutputFormat.setOutputPath(job, path);// 提交任务boolean waitForCompletion = job.waitForCompletion(true);System.out.println(waitForCompletion ? "执行成功" : "执行失败");return 0;}
}

5.17_MapReduce+MySQL

1. Java与MySql数据类型对照表

2. MapReduce读取MySQL中的数据案例

分析数据库:

  • 链接:https://pan.baidu.com/s/1td4NDtn3xaENAhQrF-F5Ow
    提取码:lc6c
Shopping.java
package com.atSchool.db.MR;import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;import org.apache.hadoop.mapreduce.lib.db.DBWritable;public class Shopping implements DBWritable {private int id; // 商品idprivate String name; // 商品名称private String subtitle; // 商品副标题private float price; // 商品价格private int stock; // 库存数量public Shopping() {}public Shopping(int id, String name, String subtitle, float price, int stock) {super();this.id = id;this.name = name;this.subtitle = subtitle;this.price = price;this.stock = stock;}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getSubtitle() {return subtitle;}public void setSubtitle(String subtitle) {this.subtitle = subtitle;}public float getPrice() {return price;}public void setPrice(float price) {this.price = price;}public int getStock() {return stock;}public void setStock(int stock) {this.stock = stock;}// PreparedStatement在JDBC中用来存储已经预编译好了的sql语句@Overridepublic void write(PreparedStatement statement) throws SQLException {statement.setInt(1, id);statement.setString(2, name);statement.setString(3, subtitle);statement.setFloat(4, price);statement.setInt(5, stock);}// ResultSet在JDBC中是存储结果集的对象@Overridepublic void readFields(ResultSet resultSet) throws SQLException {this.id = resultSet.getInt(1);this.name = resultSet.getString(2);this.subtitle = resultSet.getString(3);this.price = resultSet.getFloat(4);this.stock = resultSet.getInt(5);}@Overridepublic String toString() {return "id=" + id + ", name=" + name + ", subtitle=" + subtitle + ", price=" + price + ", stock=" + stock;}
}
MapperDemo.java
package com.atSchool.db.MR;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Mapper;public class MapperDemo extends Mapper<LongWritable, Shopping, NullWritable, Shopping> {@Overrideprotected void map(LongWritable key, Shopping value,Mapper<LongWritable, Shopping, NullWritable, Shopping>.Context context)throws IOException, InterruptedException {context.write(NullWritable.get(), value);}
}
JobDemo.java
package com.atSchool.db.MR;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import com.atSchool.utils.HDFSUtils;public class JobDemo extends Configured implements Tool {private String className = "com.mysql.cj.jdbc.Driver";private String url = "jdbc:mysql:// 127.0.0.1:3306/shopping?&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true";private String user = "root";private String password = "password";public static void main(String[] args) throws Exception {new ToolRunner().run(new JobDemo(), null);}@Overridepublic int run(String[] args) throws Exception {/*** 获取job:一个工作对象*/// 创建一个 配置 对象Configuration configuration = new Configuration();// 设置 name属性 的值。// 如果名称已弃用或有一个弃用的名称与之关联,它会将值设置为两个名称。// 名称将在配置前进行修剪。configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");// 在configuration中设置数据库访问相关字段。DBConfiguration.configureDB(configuration, className, url, user, password);// 根据配置文件创建一个jobJob job = Job.getInstance(configuration);/*** 设置job*/// sql语句String sql_1 = "SELECT id,name,subtitle,price,stock FROM neuedu_product WHERE price>1999";String sql_2 = "SELECT COUNT(id) FROM neuedu_product WHERE price>1999";/*** public static void setInput(Job job, Class<? extends DBWritable> inputClass, String inputQuery, String inputCountQuery)* 使用适当的输入设置初始化作业的映射-部分。* 参数:*      job-The map-reduce job*         inputClass-实现DBWritable的类对象,它是保存元组字段的Java对象。*        inputQuery-选择字段的输入查询。示例:"SELECT f1, f2, f3 FROM Mytable ORDER BY f1"*      inputCountQuery-返回表中记录数的输入查询。示例:"SELECT COUNT(f1) FROM Mytable"*/DBInputFormat.setInput(job, Shopping.class, sql_1, sql_2);// 通过查找给定类的来源来设置Jar。job.setJarByClass(JobDemo.class);// 给 job 设置 Map和Reducejob.setMapperClass(MapperDemo.class);job.setNumReduceTasks(0); // 这里用为没有用到reduce所以设置为0// 给 job 设置InputFormat// InputFormat:描述 Map-Reduce job 的输入规范// DBInputFormat:从一个SQL表中读取输入数据的输入格式。job.setInputFormatClass(DBInputFormat.class);// 告诉job Map输出的key和value的数据类型的是什么job.setMapOutputKeyClass(NullWritable.class);job.setMapOutputValueClass(Shopping.class);/*** 设置输出路径*/// 因为输出的文件不允许存在,所以需要处理一下FileSystem fileSystem = HDFSUtils.getFileSystem();Path path = new Path("/MapReduceOut");if (fileSystem.exists(path)) {fileSystem.delete(path, true);System.out.println("删除成功");}// 设置map-reduce job的输出目录FileOutputFormat.setOutputPath(job, path);// 将job提交到集群并等待它完成。boolean waitForCompletion = job.waitForCompletion(true);System.out.println(waitForCompletion ? "执行成功" : "执行失败");return 0;}
}

3. MapReduce写出数据到MySQL

注意:写出的时候,写出的表在数据库中要事先建好

User.java
package com.atSchool.db.outmysql;import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;import org.apache.hadoop.mapreduce.lib.db.DBWritable;public class User implements DBWritable {private int id;private String name;private String password;public User() {}public User(int id, String name, String password) {super();this.id = id;this.name = name;this.password = password;}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}@Overridepublic void write(PreparedStatement statement) throws SQLException {statement.setInt(1, id);statement.setString(2, name);statement.setString(3, password);}@Overridepublic void readFields(ResultSet resultSet) throws SQLException {this.id = resultSet.getInt(1);this.name = resultSet.getString(2);this.password = resultSet.getString(3);}@Overridepublic String toString() {return "id=" + id + ", name=" + name + ", password=" + password;}
}
MapperDemo.java
package com.atSchool.db.outmysql;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class MapperDemo extends Mapper<LongWritable, Text, User, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, User, NullWritable>.Context context)throws IOException, InterruptedException {String[] split = value.toString().split(",");User user = new User(Integer.parseInt(split[0]), split[1], split[2]);context.write(user, NullWritable.get());System.out.println("mapper-out:" + user.toString());}
}
JobDemo.java
package com.atSchool.db.outmysql;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class JobDemo extends Configured implements Tool {private String className = "com.mysql.cj.jdbc.Driver";/*** rewriteBatchedStatements=true* 说明:*         MySQL Jdbc驱动在默认情况下会无视executeBatch()语句,把我们期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,直接造成较低的性能。*      而把rewriteBatchedStatements参数置为true, 驱动就会批量执行SQL* 注意:*        在这里就不能将该属性设置成true了,这样会和mapreduce中形成冲突*/private String url = "jdbc:mysql://127.0.0.1:3306/shopping?&charactercEncoding=utf-8&useSSL=false&serverTimezone=UTC";private String user = "root";private String password = "password";public static void main(String[] args) throws Exception {new ToolRunner().run(new JobDemo(), null);}@Overridepublic int run(String[] args) throws Exception {/*** 获取job:一个工作对象*/// 创建一个 配置 对象Configuration configuration = new Configuration();// 设置 name属性 的值。// 如果名称已弃用或有一个弃用的名称与之关联,它会将值设置为两个名称。// 名称将在配置前进行修剪。configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");// 在configuration中设置数据库访问相关字段。DBConfiguration.configureDB(configuration, className, url, user, password);// 根据配置文件创建一个jobJob job = Job.getInstance(configuration);/*** 设置job*//*** setOutput(Job job, String tableName, String... fieldNames) throws IOException* 用适当的输出设置初始化作业的缩减部分* 参数:* job:The job* tableName:要插入数据的表* fieldNames:表中的字段名。*/DBOutputFormat.setOutput(job, "user", new String[] { "id", "name", "password" });// 通过查找给定类的来源来设置Jar。job.setJarByClass(JobDemo.class);// 给 job 设置 Map和Reducejob.setMapperClass(MapperDemo.class);job.setNumReduceTasks(0); // 这里用为没有用到reduce所以设置为0// 告诉job Map输出的key和value的数据类型的是什么job.setMapOutputKeyClass(User.class);job.setMapOutputValueClass(NullWritable.class);// 给 job 设置InputFormat// InputFormat:描述 Map-Reduce job 的输入规范// DBInputFormat:从一个SQL表中读取输入数据的输入格式。job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(DBOutputFormat.class);/*** 设置输入路径*/FileInputFormat.setInputPaths(job, "/user_for_mysql.txt");// 将job提交到集群并等待它完成。boolean waitForCompletion = job.waitForCompletion(true);System.out.println(waitForCompletion ? "执行成功" : "执行失败");return 0;}
}

【Hadoop笔记_3】MapReduce、案例分析、实例分析代码相关推荐

  1. Hadoop之图解MapReduce与WordCount示例分析

    Hadoop的框架最核心的设计就是:HDFS和MapReduce.HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算. HDFS是Google File System(GFS) ...

  2. stata绘制roc曲线_使用Stata进行ROC曲线分析实例分析-roc曲线分析实例

    使用Stata进行ROC曲线分析实例分析 roctab mods pre,g . roccomp mods pre ldh cr abl,g . roccomp mods  pre ldh cr ab ...

  3. 初学Hadoop之图解MapReduce与WordCount示例分析

    Hadoop的框架最核心的设计就是:HDFS和MapReduce.HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算. HDFS是Google File System(GFS) ...

  4. 【Big Data - Hadoop - MapReduce】初学Hadoop之图解MapReduce与WordCount示例分析

    Hadoop的框架最核心的设计就是:HDFS和MapReduce.HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算. HDFS是Google File System(GFS) ...

  5. MapReduce案例-wordcount-步骤分析

  6. Hadoop中关于MapReduce的编程实例(过滤系统日志)

    框架 Apache Hadoop:分布式处理架构,结合了 MapReduce(并行处理).YARN(作业调度)和HDFS(分布式文件系统);  一.下载Hadoop相关文件 1.在Hadoop官网上下 ...

  7. hadoop实例分析之WordCount单词统计分析

    WordCount单词统计分析  最近在网上看了hadoop相关资料以及单词计数的一个实例,结合网上的资料和自己的看法简要分析一下执行过程. MyMapper.java package com.mpr ...

  8. 第二章:小朱笔记hadoop之源码分析-脚本分析

    第二章:小朱笔记hadoop之源码分析-脚本分析 第一节:start-all.sh 第二节:hadoop-config.sh 第三节:hadoop-env.sh 第四节:start-dfs.sh 第五 ...

  9. 第七章:小朱笔记hadoop之源码分析-hdfs分析 第四节:namenode-LeaseManagerMonitor

    第七章:小朱笔记hadoop之源码分析-hdfs分析 第四节:namenode分析 4.4 namenode文件租约分析LeaseManagerMonitor 文件租约就是将操作的文件和操作它的客户端 ...

  10. 第七章:小朱笔记hadoop之源码分析-hdfs分析 第三节:hdfs实现分析

    第七章:小朱笔记hadoop之源码分析-hdfs分析 第三节:hdfs实现分析 3.3 namenode (1)FSDirectory FSDirectory用来管理HDFS整个文件系统的namesp ...

最新文章

  1. ATS 4.2.3隐藏服务器名称及版本号的方法
  2. 一个简单的socket程序-linux
  3. SpringMVC+MyBatis 返回时间格式转换的解决方案
  4. 多线程 流水线 java_Java Lock锁多线程中实现流水线任务
  5. 25k英里高速建48个充电走廊,美国电动汽车产业迎来春天
  6. 学习Dubbo 影院模块 所得所获?
  7. console 调试javascript
  8. vit-pytorch
  9. 静态路由的不通配置方式
  10. 爬虫2 urllib用法
  11. js中中括号,大括号使用详解
  12. 想去机器学习初创公司做数据科学家?这些问题值得你三思!
  13. Spring中注册Bean的方式有哪些?
  14. CentOS6.5挂载大于2TB的磁盘使用parted和GPT类型
  15. DZY Loves Math 系列详细题解
  16. 输入数值n,计算并输出下列多项式的值:S = 1 + 1/1! + 1/2! + 1/3! + 1/4! + ... + 1/n!
  17. java中按字母排序_如何在Java中按字母顺序对字符串进行排序
  18. 汽车充电桩主板的构成及7kw交流桩的优势阐述
  19. 超定方程的求解、最小二乘解、Ax=0、Ax=b的解,求解齐次方程组,求解非齐次方程组(推导十分详细)
  20. 8种食品,塑身美体过程中的你不要空腹食用

热门文章

  1. 《javascript高级程序设计》核心知识总结
  2. Android:简易计算器
  3. 第一阶段:Java基础
  4. [NOIP2017 普及组 T1] 成绩
  5. 浅析智慧照明,实现建筑节能
  6. pip下载更新及采用镜像安装numpy、matplotlib等包
  7. 【信号与系统|吴大正】4:信号分解、傅里叶变换与信号谱(下)
  8. PDMS管道设计基础到精通视频教程
  9. [原创]超轻量级Web安全漏洞扫描工具Netsparker使用教程介绍
  10. 好看的2020年html倒计时源码