MapReduce实现TopN算法,以获取某路网上某段时间内车速最快的前N辆车的记录为例

  • 一、文章目的:
    • 1.增加读者对MapReduce的setup(),cleanup()的理解
    • 2.使读者熟悉Java.unit.TreeMap的使用
    • 3.提供了基于TreeMap的TopN问题的MapReduce解决方案
  • 二、预备知识:
    • 1.TopN类问题的常见描述:
    • 2.基于Java.unit.TreeMap解决TopN问题的单机方法:
  • 三、博客正文
    • 1.问题描述:
    • 2.解决思路:
      • 2.1 输入输出文件分析:
      • 2.2 主函数参数参数设计:
    • 2.3 MapReduce程序设计的总体思路:
    • 2.4 映射器Mapper的具体设计:
    • 2.5 规约器Reducer的具体设计:
    • 2.6 驱动器TopN(即MR程序入口)的具体设计:
    • 3.整合上述代码并运行:

一、文章目的:

1.增加读者对MapReduce的setup(),cleanup()的理解

本博客通过讨论如何使用MapReduce来实现 “在某路网上,检索出某段时间内车速最快的前N辆车对应的记录”的例子程序,以使读者增加对MapReduce编程模型的理解,包括如何规划MapReduce程序的setup()函数、cleanup()函数,以及编写map()函数和ruduce()函数的设计思路

2.使读者熟悉Java.unit.TreeMap的使用

TreeMap是java.unit.Collection的一个实现类,其实现了SortedMap接口,其存储的元素类型是一个键值对,在向TreeMap的对象添加元素时,其按键进行排序和去重。在进行复杂数据处理时,可以自定义数据类型,按业务逻辑重写其compareTo(),来实现对自定义类型的排序和去重;

3.提供了基于TreeMap的TopN问题的MapReduce解决方案

以 “在某路网上,检索出某段时间内车速最快的前N辆车对应的最高时速时的轨迹点记录”的例子程序讲解了如何规划映射器Mapper和规约器Reducer需要实现的任务,并给出了详细的代码;本博客的参考了《数据算法-Hadoop/Spark大数据处理技巧》TopN算法的思路,需要扩展的朋友可以自行参考。1

二、预备知识:

1.TopN类问题的常见描述:

TopN是一种现实生活环境中很常见的问题,其通常描述为“寻找出所有数据集中,最大/小的前N项记录”,举例如下:

  1. 问题例举1:如何在10亿个整数中找出前1000个最大的数;
  2. 问题举例2:如何在全校所有班级中找出《数据结构》成绩最高的前5位同学;

问题延伸:已知某时间段内经过某路网的多辆车辆,求速度最快的前n辆车; 即:

  • 已知:【时间段】、【经过某路网的车辆数据】、【n】;
  • 求:速度最快的前n辆车达到最高速度时对应的记录

注:【经过某路网的车辆数据】存储在某一文件中,其一条文本记录的格式为

车牌号,经度,纬度,时间,速度

具体实际记录如下:

云A111,120.666,23.666,2020-01-04 12:00:00,90

2.基于Java.unit.TreeMap解决TopN问题的单机方法:

因为本文讲解的内容是通过MapReduce的方法来实现TopN,而编写MapReduce的时,常用的编程语言是Java,所以我们先通过一个简单实现TopN的Java程序实例来入手,大致理解如何去解决TopN问题的思路:

【Java实例】使用单机版的java程序,解决问题举例2中提出的问题:

  • 假设某记录了多名学生多门功课考试成绩的文本文件grad.txt的内容如下(格式为"学号,姓名,科目,成绩"):

S000,张三,数据结构,94
S001,李四,数据结构,66
S010,王五,数据结构,77
S011,燕小六,数据结构,89
S100,洪老七,计算机网络,90
S101,黑背老八,数据科学,60
S110,熊大,数据结构,100
S111,熊二,数据结构,78

  • 求文件中所包含的《数据结构》科目考试分数最高的前3个同学对应的记录。

则使用Java实现时,我们考虑实现流程如下:

  1. 遍历每一行文本记录,将每一行文本记录分割为多个字段
  2. 判断第3个字段是否为"数据结构,如果第3个字段为数据结构,将当前记录添加到某有序容器
  3. 判断此时容器中的元素个数是否>N,如果是则删除排序后最小/大的元素
  4. 遍历结束后,有序容器中剩下的元素就是符合要求的前N个记录,可以保存或输出

实现代码的思路如下(建议看完后参照后续代码阅读):

  • 定义了public static String [] JavaTopN.getTopN(String filePath,int N):通过传入符合 “学号,姓名,科目,分数” 格式的文件和一个整数N,得到前N个《数据结构》科目分数最高的同学对应的文本记录内容;
  • 于主函数调用getTopN(),分别传入参数【grad.txt的文件路径】和【N】,得到前N条满足条件的记录数组,遍历数组并输出当前对象的字符串。
  • 实现具体的代码时我们使用了java.io包下的File、FileInputStream、InputStreamReader、BufferedReader对象,读取文件输入流,具体操作,详见代码。
  • 非常重要!!! 本代码实现流程中第2步的有序容器使用的抽象类型是SortedMap类,其具体实现使用的是TreeMap<Double,String> 类型,其具备如下特点:
    • 存储元素类型为一个键值对<key,value>,容器内的元素按key值类型的compareTo()方法排序,也可以通过指定key的比较器对象Comparator <keyTpey>来排序;
    • TreeMap可以进行高效的首尾操作,对第一个元素(排序后最小的元素)和最后一个元素(排序后最大的元素)进行操作。
    • 其常用API为:
      • 增加元素:put(key,value)
      • 按键索取元素:get(key)
      • 按键删除元素:remove(key)
      • 取最小值的键:firstKey()
      • 取最大值的键:endKey()

完整的用于实现TopN的单机版Java代码:

package topN;import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.SortedMap;
import java.util.TreeMap;public class JavaTopN {public static void main(String[] args) {    //主函数:程序入口//调用getTopN()方法,传入学生成绩文件和N的取值,得到前N个符合要求的元素构成的数组Object[] records=getTopN("data\\data\\grad.txt",3); //此处的文件路径为相对路径//遍历getTopN()输出的数组,输出每个元素for(Object r:records) {System.out.println(r.toString());}}/*** 实现功能:读取传入文件,计算并返回成绩最高的前N个同学的记录* @param filePath 输入文件的路径* @param N TopN的N* @return 前N个符合要求的元素对应的记录*/public static Object[] getTopN(String filePath, int N) {String line = ""; //初始化lineSortedMap<Double, String> topN = new TreeMap<Double, String>(); //初始化TreMaptry // 防止文件建立或读取失败,用catch捕捉错误并打印,也可以throw{String pathname = filePath; File filename = new File(pathname); // 要读取以上路径的input.txt文件InputStreamReader reader = new InputStreamReader(new FileInputStream(filename), "GBK"); // 建立一个输入流对象readerBufferedReader br = new BufferedReader(reader); // 建立一个对象,它把文件内容转成计算机能读懂的语言while ((line = br.readLine()) != null) // 遍历这个文件的所有行{String[] data = line.split(","); // 按逗号将每行拆分成4个部分:学号,姓名,科目,成绩if (data[2].equals("数据结构")) { //判断科目是否为《数据结构》/*PS:TreeMap在使用put加入对象时,会按key排序以及去重*/topN.put(Double.valueOf(data[3]), line); //是,则添加进TreeMap对象topN}if (topN.size() > N) {  //如果TreeMap中的元素个数大于N了topN.remove(topN.firstKey());  //删除最小的那个元素}}br.close();    reader.close();} catch (Exception e) {e.printStackTrace();}return topN.values().toArray(); //返回TreeMap中的所有values转换出来的数组}}

三、博客正文

相信经过前面的铺垫,大家对TopN类的问题和解决思路已经有了大体的了解,所以接下来博客将主要围绕某个例子程序来阐述如何联系实际的应用场景来编写一个TopN程序;

1.问题描述:

  • 已知:某段时间内经过了某路段上的车辆信息,其存储在某文件内,内容格式如下:

    • 格式:车牌号,经度,纬度,时间,速度
    • 具体内容样式举例:(为了保护车主隐私,对车牌号和日期做了修改,不影响程序的编写)

    牛P6BN16,113.808701,22.607718,2020-10-24 11:09:20,0
    牛P6BJ13,113.809486,22.618282,2020-10-24 11:05:09,69
    牛P6BJ20,113.81015,22.617617,2020-10-24 14:05:20,75
    牛P6BJ16,113.81263,22.620733,2020-10-24 10:59:49,75

  • 求:某时段(可自行给出)内,经过该路段的车速最快的车牌号,及其车速最快时对应的轨迹点的记录信息。

2.解决思路:

2.1 输入输出文件分析:

  • 输入文件分析: 由问题描述中输入文件的具体内容样式例举,我们不难发现每条记录为【一辆车经过该路网时留下的一个轨迹点】,各轨迹点间的车牌号,时间是乱序的。但我们求解的是车速最快的前N辆车所对应的最高时速的记录信息,所以无需关心文件记录是否有序

牛P6BN16,113.808701,22.607718,2020-10-24 11:09:20,0
牛P6BJ13,113.809486,22.618282,2020-10-24 11:05:09,69
牛P6BJ20,113.81015,22.617617,2020-10-24 14:05:20,75
牛P6BJ16,113.81263,22.620733,2020-10-24 10:59:49,75

  • 输出文件分析:输出文件的内容格式应该与输入文件内容格式一致,即:“车牌号,经度,纬度,时间,速度”,但文本记录数只能为N条,即前N条该路段上车速最快的车辆在最高时速的轨迹点记录。

2.2 主函数参数参数设计:

  • [inputPath]:输入文件的路径,作为args[0]
  • [outputPath]:输出文件的路径,作为args1
  • [int N]:TopN算法要找出的前N条记录的N,作为args[2]
  • [String startTime]:检索时间范围的起始时间,作为args[3]
  • [String endTime]:检索时间范围的起始时间,作为args[4]

前两个参数作为必要参数,没有默认值;对于后3个参数:

  • N:默认值为10
  • start:默认为"1970-01-01 00:00:00"
  • end:默认为Date.toString()

与Mapper和Reducer的关系:

  • Mapper:内置成员变量[Date start=new Date(0)]和[Date end=new Date()],如果主函数有将[String startTime]和[String startTime]写入conf,则在setup()中获取startTime和endTime,初始化start和end。于map()中通过start和end过滤符合时间段要求的轨迹,写入上下文
  • Reducer:内置了成员变量[TreeMap topN]来存储前N个车速最快的车辆最高时速的轨迹点,内置了成员变量[int N]用来确定N具体的值

2.3 MapReduce程序设计的总体思路:

  • 一般来说,我们希望通过Mapper的map()函数来实现对数据的过滤和聚集,因此我们让Mapper完成按起始日期和截止日期对记录进行过滤、以及按车牌号聚集的功能。待Mapper把数据按车牌号聚集后,于Reducer中再取每一辆车的最高速度对应的记录加入Reducer定义的成员变量topN中,当topN的元素个数大于N时,删除车速最小的那条记录,经过Reducer调用完最后一次reduce()时,topN中的结果就是我们需要的结果,此时使用cleanup()函数遍历topN的值,并写入上下文。
  • 为了更好的描述一条记录信息,提高程序的逻辑性,同时加深读者对MapReduce程序的理解,本博客化简为繁,考虑自定义了一个数据结构Point作为一条轨迹点记录的抽象,将其作为map()写入下文时的value,以及TreeMap<key,value>排序的依据的key。(若不如此,map()可以直接写入<车牌号,文本记录>,然后取TreeMap<Double,String>,存当前记录的速度和记录本身,实现起来要简单很多)
    • 则Point应该有如下成员变量,并完成Point的构造方法和成员变量的get()和set()方法:
 /*0.成员变量*/private String id;private double lon;private double lat;private double speed;private String time;/*1.构造方法*/public Point() {// 无参构造方法}public Point(String id, String time, double lon, double lat, double speed) {this.id = id;this.lat = lat;this.lon = lon;this.time = time;this.speed = speed;}/*2.一系列set方法*/public void setId(String id) {this.id = id;}public void setLat(double lat) {this.lat = lat;}public void setLon(double lon) {this.lon = lon;}public void setTime(String time) {this.time = time;}public void setSpeed(double speed) {this.speed = speed;}/*3.一系列get方法*/public String getId() {return id;}public double getLat() {return lat;}public double getLon() {return lon;}public String getTime() {return time;}public double getSpeed() {return speed;}
  • 因为Point要能写入上下文,所以需实现接口Writable,重写readFields()和readFields()方法,使Point对象能序列化以便写入上下文;因为Point要能作为TreeMap<key,value>的key用来排序,所以要实现Comparable<Point>接口,重写compareTo()方法作为排序的依据;此外为了避免因为同一对象的hash码引起的引起的TreeMap去重异常,需要重写clone()方法;同时为了将抽象的Point转换为文本记录写入上下文还需要实现toString()方法,把Point转换为字符串对象:

实现接口WritableComparable的声明:

//实现接口WritableComparable(等于实现Writable接口和Comparable<Point>接口)
public class Point implements WritableComparable<Point>

重写readFields()方法:

@Overridepublic void readFields(DataInput in) throws IOException {this.id = in.readUTF();this.lat = in.readDouble();this.lon = in.readDouble();this.speed = in.readDouble();this.time = in.readUTF();}

重写write()方法

@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(id);out.writeDouble(lat);out.writeDouble(lon);out.writeDouble(speed);out.writeUTF(time);}

重写compareTo()方法:

 public int compareTo(Point o) {Point p = (Point) o;int value =Double.valueOf(this.speed).compareTo(p.getSpeed());//System.out.println(this.speed+"\t"+p.speed+"\t"+value);return value;}

重写clone()方法:

 @Overrideprotected Point clone() throws CloneNotSupportedException {return new Point(id, time, lon, lat, speed);}

重写toString()方法

@Overridepublic String toString() {return this.id + "," + this.time + "," + this.lat + "," + this.lon + "," + this.speed;}

2.4 映射器Mapper的具体设计:

  • Mapper泛型 <k1,v1,k2,v2>

    • <k1,v1>为<文件偏移量,文本记录>,故类型为<LongWritable,Text>
    • <k2,v2>为<车牌号,轨迹记录>,故类型为<Text,Point> //此处的Point为对轨迹记录的抽象
  • 成员变量
    • Date start:检索时间范围的起始时间,默认值为new Date(0)
    • Date end: 检索时间范围的截止时间,默认值为new Date( )
    • 代码片段:
public class MapTopN extends Mapper<LongWritable, Text, Text, Point> {private Date start = new Date(0);private Date end = new Date();private Point p = new Point();
}
  • setup():

    • 执行时间:map任务第一次执行map()前,会执行一次setup()
    • 功能:常用来获取MR程序的全局参数和分布式缓存的内容,对Mapper的成语变量做初始化
    • 具体任务:初始化start和end
    • 代码片段:
 /*** setup是在map任务第一次调用map()函数前,用来对映射器Mapper的内容做初始化的; * map()函数会被map任务反复调用,而setup()函数只会被map任务调用一次*/protected void setup(Mapper<LongWritable, Text, Text, Point>.Context context)throws IOException, InterruptedException {// 1.获取传入的起始时间和终止时间:Configuration conf = context.getConfiguration();String startTime = conf.get("start");String endTime = conf.get("end");// 2.初始化要排查轨迹记录的时间范围SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");if (!(startTime==null)) {  //如果传入起始时间不为空try {start = ft.parse(startTime); //令start为传入的起始时间} catch (ParseException e) {e.printStackTrace();}}if (!(endTime==null)) { //如果传入终止时间不为空try {end = ft.parse(endTime); //令end为传入的结束时间} catch (ParseException e) {e.printStackTrace();}}}
  • map():

    • 执行时间:map任务调用一次setup()后,调用cleanup()之前,会被多次执行
    • 功能:常用来对数据记录进行过滤和聚集
    • 具体任务:按Mapper.start和Mapper.end,过滤出时间在两者间的记录,并把文本记录按","分割,分割后取车牌号字段,序列化为Text类型,作为键写入上下文,用于聚集同一车牌号对应的轨迹记录,map写入的值为Point类型的对象,经聚集后,每个reduce()的value为一个Point类型的可迭代对象。
    • 代码片段:
 /*** map()函数,输入是一个<key,value>的键值对,其中key是文件偏移量,value为输入文件的一条文本记录;* map()函数会被map任务多次反复调用;*/protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Point>.Context context)throws IOException, InterruptedException {// 0.读取文本记录,构建Point作为排序依据:// 0.1 文本记录格式:id,lat,lon,time,speedString[] data = value.toString().split(","); // 将文本记录按","分割// 0.2 构造Point对象pp.setId(data[0]); // 车牌号p.setLat(Double.valueOf(data[1])); // 经度p.setLon(Double.valueOf(data[2])); // 纬度p.setTime(data[3]); // 时间p.setSpeed(Double.valueOf(data[4]));// 速度// 1.将符合时间范围的轨迹点p作为value,其对应的车牌号作为key,写入上下文try {SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");Date ptime = ft.parse(p.getTime());if (ptime.before(end) && ptime.after(start)) {context.write(new Text(data[0]), p);}} catch (ParseException e) {e.printStackTrace();}}

至此映射器Mapper的设计完毕,其任务就是过滤出符合检索时间段的记录,将<车牌号,轨迹点记录>写入上下文,使轨迹点记录按车牌号聚合,供Reducer使用。

2.5 规约器Reducer的具体设计:

  • Reducer泛型 <k2,v2,k3,v3>

    • <k2,v2>为<车牌号,轨迹点记录>,故类型为<Text,Point>
    • <k3,v3>为<null,前N轨迹点记录之一>故类型为<NullWritable,Text>
  • 成员变量
    • int N:TopN算法的N,默认为10
    • TreeMap <Point,Stirng> topN: 用于存储点(Point对象)和其对应的轨迹(Point.toString)
    • 代码片段:
public class ReduceTopN extends Reducer<Text, Point, NullWritable, Text> {/*** N: topN算法要选取的前几个记录,N就是几,默认为10*/private int N = 10;/*** <p>topN: 用来存储前N个符合要求元素的TreeMap的对象<p>* <p>键为Point的对象,值为该对象调用toString()返回的字符串,toString()方法按需要重写即可<p>* <p>TreeMap按键进行排序,因此需要在Point类内重写compareTo()方法,自定义排序的规则<p>* <p>排序后TreeMap内的元素有序,当元素个数大于N时,删除最小的元素(第一个键对应的元素);<p>* <p>直到最后一次reduce()执行完毕后,保证最终留下的N个元素是按要求排序后最大的前N个;<p>*/private SortedMap<Point, String> topN = new TreeMap<Point, String>();}
  • setup():

    • 执行时间:reduce任务第一次执行reduce()前,会执行一次setup()
    • 功能:常用来获取MR程序的常量和分布式缓存的内容,对Reducer的成语变量做初始化
    • 具体任务:初始化N和topN
    • 代码片段:
 /*** <p>Reducer的setup()方法于reduce任务第一次调用reduce()函数之前执行;<p>* <p>作用是初始化Reducer里的成员变量,此处初始化对象就是N<p>*/protected void setup(Context context) throws IOException, InterruptedException {Configuration conf = context.getConfiguration();    //获取Reducer的上下文N = conf.getInt("N", 10); //获取主函数传入的N值}
  • reduce():

    • 执行时间:reduce任务调用一次setup()后,调用cleanup()之前,会被一次或多次执行,一个Reducer执行reduce()函数的次数取决于k2的数量。

    • 功能:根据map()生成的键完成规约、分组和总结

    • 具体任务:经过map任务对k2(车牌号)聚集后,每个reduce()的values为当前key(车牌号)所对应的一个Point类型的可迭代对象。我们遍历values,取出每一个Point对象,比较得出最大值后加入Reducer的成员变量topN【TreeMap<Carpoint,String> topN】(这个过程可以在reduce()内定义一个新的TreeMap对象maxSpeed【TreeMap<Carpoint,String> maxSpeed】,当其元素个数超过1时,删除最小值实现,相当于一个N=1的TopN特例),当topN的元素个数超过N时,删除最小值,则当最后一次reduce()执行完成后,reduce()对上下文进行过写入操作,但Reducer.topN存储了我们需要的记录结果,因此后续写入结果的操作需要cleanup()来完成。

    • 代码片段:

/*** @任务: 每次执行reduce()方法时,取出当前车牌号对应的速度最快的轨迹点,加入到topN中;* 如果topN包含的元素个数大于N则去掉最小的那个元素;* * @param key 车牌号* @param values 按车牌号聚集的轨迹点Point的可迭代对象* @param context Reducer的上下文*/protected void reduce(Text key, Iterable<Point> values, Context context) throws IOException, InterruptedException {SortedMap<Point, String> maxSpeed = new TreeMap<Point, String>(); //用来存储某车对应的最高速度的轨迹点try {for (Point value : values) {maxSpeed.put(value.clone(), null);    //使用clone方法是为了避免同一对象的hash码一致而引起的键值重复的错误if (maxSpeed.size() > 1) {maxSpeed.remove(maxSpeed.firstKey());}}//遍历某辆车的轨迹点,借TreeMap存储速度最大的点,相当于一个topN,N=1的操作;topN.put(maxSpeed.firstKey().clone(),null);   //使用clone方法是为了避免同一对象的hash码一致而引起的键值重复的错误if(topN.size()>N) {topN.remove(topN.firstKey());}//将速度最大的点加入到Reducer的成员变量topN中排序存储,当其元素个数大于N时,删除最小值} catch (CloneNotSupportedException e) {e.printStackTrace();}}
  • cleanup():

    • 执行时间:reduce任务执行完最后一次reduce()后,会执行一次cleanup()
    • 具体任务:将reduce()结束后构造出来的Reducer.topN里存储的所有值写入上下文;因为topN的泛型为<轨迹点,轨迹点对应的记录>,而轨迹点Point只是我们用来索引符合要求的记录的(即倒排索引),是故可以省略,只将轨迹点对应的记录的字符串写入输出文件即可。
    • 代码片段:
 /*** <p>Reducer的cleanup()方法于reduce任务最后一次调用reduce()函数之后执行;<p>* <p>作用是将通过reduce()构建好的Reducer成员变量topN里所包含的元素写入上下文中<p>*/protected void cleanup(Reducer<Text, Point, NullWritable, Text>.Context context)throws IOException, InterruptedException {for(Point k:topN.keySet()) {context.write(NullWritable.get(), new Text(k.toString()));}}

2.6 驱动器TopN(即MR程序入口)的具体设计:

设计思路:

  • 新建Configuration对象conf,用于设置全局参数;(使用conf.set(key,value)设置)
  • 检查参数,当参数args.length不为2~5时报错,否则按参数个数设置全局常量
    • args.length=2;不设置全局常量
    • args.length=3;设置N
    • args.length=4;设置N和start
    • args.length=5;设置N、start和end
  • 新建Job对象job,用于设置job相关的类
    • job的入口类:TopN.class
    • job的Mapper类:MapTopN.class
    • job的Reducer类:ReduceTopN.class
    • k2类型:Text
    • v2类型:Point
    • k3类型:NullWritable
    • v3类型:Text
  • 使用FileInputFormat.addInputPath()添加输入文件路径
  • 使用FileOutputFormat.setOutputPath()添加输出文件路径
  • 使用job.waitForCompletion(true) 设置Job运行结果提示

程序代码如下:


public class TopN {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 0.新建Configuration对象,并设置全局参数;Configuration conf = new Configuration();// 0.1 设置参数时需要注意传入参数的个数,然后使用switch-case结构来针对不同的参数个数设计switch (args.length) {case 5: conf.set("end", args[4]);    //设置字符串型全局变量:终止日期endcase 4: conf.set("start", args[3]);    //设置字符串型全局变量:起始日期startcase 3: conf.setInt("N", Integer.valueOf(args[2])); // 设置整形全局变量:topN.Ncase 2: break;default:System.err.println("参数错误!参数的个数为2~5个");System.err.println("顺序为:<输入路径>\t<输出路径>\t<topN的N>\t<起始日期>\t<结束日期>");System.err.println("最后3个参数可以省略,省略时默认值分别为:N:10;起始日期:1970-01-01 00:00:00;终止日期:now");}// 1.新建jobJob job = Job.getInstance(conf, "topN");job.setJarByClass(TopN.class);// 2.设置输入输出路径FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 3.设置map相关的类job.setMapperClass(MapTopN.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Point.class);// 4.设置reduce相关的类job.setReducerClass(ReduceTopN.class);job.setOutputKeyClass(NullWritable.class);job.setOutputValueClass(Text.class);// 5.设置Job运行提示System.exit(job.waitForCompletion(true) ? 0 : 1);}}

3.整合上述代码并运行:

输入文件InTopN.txt,文件共有168533条轨迹点的数据,但是只对应41个车牌号,其路径为于TopN项目根目录的data\data\目录下(于Windows端本地运行)

项目结构:
src中的topN包为按本博客正文部分思路实现的MapReduce程序;
src中的javaTopN为单机版TopN的实现代码
至于src中的另一个包不管就好,是博主做另一个实验留下来的(通过MapReduce利用轨迹点的车牌号和时间戳做二次排序生成轨迹的实验),有机会会在另一篇博客介绍。

我们打开TopN.java文件,修改运行参数如下图,测试传入3个参数的运行效果:

运行结果:

查看part-r-00000文件:

可见,我们筛选除了全部时间范围内(1970-01-01 00:00:00 到 now)的经过该路网的,速度最快的前3辆车位于最高时速的轨迹点信息,程序运行正常。

我们再打开TopN.java文件,修改运行参数如下图,测试传入5个参数的运行效果:


运行结果:

查看part-r-00000文件:

比较传入参数分别为【data\data\InTopN.csv data\output\topN1 3】和【data\data\ data\output\topN2 4 “2020-10-24 08:00:00” “2020-010-24 20:00:00”】时程序输出的结果:

  • 传入3个参数时的结果

牛P6BK96,2020-10-24 23:08:06,113.882553,22.607134,115.0
牛P6BK20,2020-10-24 00:06:50,113.951164,22.564068,117.0
牛P6BS55,2020-10-24 14:40:48,114.011467,22.537701,120.0

  • 传入5个参数时的结果

牛P6BK20,2020-10-24 09:39:27,113.970985,22.559868,111.0
牛P6BS17,2020-10-24 09:10:55,113.957184,22.563116,113.0
牛P6BS71,2020-10-24 09:48:04,114.034401,22.517151,114.0
牛P6BS55,2020-10-24 14:36:38,113.951385,22.564068,120.0

可以发现第二次运算的结果中不包含"牛P6BK96"和"牛P6BK20"的车辆信息,因为它们两时速最快的轨迹点没有位于我们规定的时间范围内,即"2020-10-24 08:00:00" ~“2020-010-24 20:00:00”,由此我们验证了该程序的通用型和逻辑性。

本篇博客至此就结束了,限于作者编程水平和表达能力的原因,行文和代码若有不足之处,还请评论或者私信博主反馈您的意见和建议,若对文中的内容有疑惑之处的朋友,欢迎评论或私信;需要程序源代码的朋友可以于评论区留邮箱,博主看到后会私发,以上就是全部内容,感谢观看。


  1. 《数据算法-Hadoop/Spark大数据处理技巧》 ↩︎

MapReduce实现TopN相关推荐

  1. 用Hadoop中MapReduce进行TopN排序

    数据格式: 10,3333,10,100 11,9321,1000,293 12,3881,701,20 13,6974,910,30 14,8888,11,39 订单ID 用户ID 资费 业务ID ...

  2. 大数据常见面试题 Hadoop篇(1)

    目录 1.描述一下hdfs的写流程 读流程? 2.详细讲解一下hdfs的体系结构 3.如果一个datanode出现宕机,恢复流程是什么样的? 4.通常你是如何解决Haddop的NameNode宕机的, ...

  3. MapReduce案例(数据中获取最大值TopN)

    案例: 案列: data.txt 10 9 8 7 6 5 1 2 3 4 11 12 13 14 15 20 19 18 17 16 package squencefile;import org.a ...

  4. Hadoop实战——MapReduce实现主播的播放量等数据的统计及TopN排序(第一篇)

    本次实战项目一共分三篇教学(二三篇后续更新) 第一篇:对主播文本数据的清洗,从大量数据中获取我们所需要的数据(如播放量,时长等) 第二篇:对清洗后的数据进行统计求和处理操作,按照主播id号依次整齐显示 ...

  5. MapReduce经典案例—TopN

    目录 一.问题介绍 (一)案例分析 1. TopN分析法介绍 2. 案例需求及分析 (二)案例实现 1. Map阶段实现 2. Reduce阶段实现 3. Driver程序主类实现 4. 效果测试 二 ...

  6. MapReduce数据分析(8)TopN

    八.MapReduce第八讲TopN 本次教程主要讲TreeMap方法: 在搜索引擎领域中,常常需要统计最近最热门的K个查询词,这就是典型的"TopN"问题,也就是从海量查询中统计 ...

  7. 2021年大数据Hadoop(二十二):MapReduce的自定义分组

    全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 MapReduce的自定义分组 需求 分析 实现 第一步: ...

  8. 10、Mapreduce的一些场景

    2019独角兽企业重金招聘Python工程师标准>>> 1.排序并且求 TOPOne 和TOPN 1.在map端的输出中,将需要排序的字段作为key.那么到达reduce时,相同的k ...

  9. 如何在10亿个整数中找出前1000个最大的数(TopN算法)

    面试题目:如何在10亿个整数中找出前1000个最大的数. 我们知道排序算法有很多: 冒泡算法:通过两层for循环,外层第一次循环找到数组中最大的元素放置在倒数第一个位置,第二次循环找到第二大的元素放置 ...

最新文章

  1. ORA-14452的出现原因解析及解决方法
  2. JAVA代码编写的30条建议
  3. img title属性值利用#13换行
  4. Android M App休眠 (adb shell dumpsys usagestats)
  5. vc中怎么使用SendMessage自定义消息函数
  6. python数据结构与算法
  7. python3类的继承详解_基于python3 类的属性、方法、封装、继承详解
  8. vagrant up 慢的解决方案
  9. 四川华为EC6108V9C悦me和CA高安版_卡刷固件包
  10. 混合线性模型不同模型拟合的可视化
  11. 【魔兽世界】WLK版本的常规宏教程
  12. androidx指纹验证
  13. Node.js基础入门
  14. 竟有比双十一更令人发指的福利……
  15. Episode II 计算机病毒概论
  16. 集成电路:芯片时代的到来
  17. Lenovo T460 Fn功能键切换
  18. 【Docker】win7安装docker及镜像加速
  19. 【解决】长虹电视连接不上WiFi
  20. 第一次用计算机证明的数学定理是,勾股定理是一个基本几何定理,是人类早期发现并证明的重要数学定理之一,用代数思想解决几何...

热门文章

  1. 干细胞培育实验室设计要求
  2. ARM、DSP、FPGA的特点和区别
  3. Spring框架 -- 开篇搭建脚手架
  4. [设计模式] - 中介者模式
  5. 【python】15行代码实现猫脸检测(opencv)
  6. web黑盒邮箱临界值验证
  7. JAVA: 初级项目之基于Swing界面的仿QQ(三)
  8. Python Learning Day8
  9. python3.10.5安装时openssl安装失败的问题
  10. 学校办公室主任述职报告