在Hadoop利用MapReduce(简称MR),实现分布式计算,顾名思义MR包含两个操作:

  1. Map操作:map在编程语言中有映射含义,即对每条数据执行相同的操作,从而实现将原始的输入数据转化为key-value形式。也就是说,通过map操作可以进行数据准备。
  2. Reduce操作:对map操作的结果(即中间结果)进行汇总,如求和、求平均等,从而得到最终结果 —— 一个新的key-value集合。
  3. 其中,map操作的结果将存储在本地磁盘,又叫中间结果;reduce操作的结果将存入本地HDFS,然后按照副本放置策略将副本存储到其他节点。

1. 结合气象数据理解MR的流程

  • 考虑某个国家的气象数据(数据片段)如下,第一个红色框中的数据表示所属年份,第二个红色框中的数据表示当时的温度。其中,+表示温度为零上,-温度为零下

  • 其中,为了存储以为小数的数据,温度是被扩大10倍后的结果。例如,00221表示22.1摄氏度。

  • 针对map函数,输入的是key-value形式的数据,其中key是数据的行号,value是真实数据。其中,使用粗体表示需要处理的年份和温度

  • map函数负责对每天数据进行处理,提取出年份(作为key)和气温(value,取整数),形成新的key-value

    (1950, 0)
    (1950, 22)
    (1950, -11)
    (1949, 111)
    (1949, 78)
    
  • 将map函数的结果传给reduce函数前,Hadoop框架会自动对其进行shuffle处理,使这些数据按照key进行排序和分组。

    (1949, [111, 78])]
    (1950, [0, 22, -11])
    
  • reduce函数收到上面的中间结果后,会按照key遍历其对应的集合,从而获取值最大的value作为最高温度

  • 整个流程示意如下:

总结:

  • MR通过map进行数据预处理,得到中间结果存入本地磁盘;接着,使用shuffle将中间结果按照key进行排序和分组,使到达reduce的数据是有序的数据集合;然后,reduce函数按照指定的方式对数据进行汇总,并汇总后的数据写入HDFS。
  • MR更关注的是用户可编程的map和reduce操作,实际上shuffle操作也至关重要。它可以将map的输出进行归纳归纳整理,减少map与reduce间的数据传输量

2. 编写自己的MR程序

参考博客:编写自己的MapReduce程序 - 基于Hadoop-2.10.1

3. 进一步学习MR的数据流

一些基本术语:

  • MR的Job: 客户端需要指定的一个工作单元,包括输入数据、MR程序和配置信息等
  • 两种任务: Hadoop会将job分成很多个任务来执行,包括map任务和reduce任务。任务调度由yarn负责,如果调度失败,会在另一个节点上重新调度 —— (这也是为啥,我之前MR任务失败,它进行过重试
3.1 split和map任务
  • 对于一个MR作业,输入的数据会被划分成等大的数据块,被称为Split(分片)
  • 每个split对应一个map任务,map任务通过运行用户自定义的map函数来实现对split的处理
  • 关于split大小:
    ① split越小,并行处理split的时间会远小于处理整体数据的时间;同时,节点间的负载均衡效果也会更好
    ② split过分细分,管理split和创建map任务时间将会决定整个作业的执行时间
    ③ split默认是hdfs一个数据块的大小,128MB。这样,一个split不会跨越多个节点,能满足数据本地化原理,可以提高map任务的效率 —— 因为,跨节点时难以保证一个节点存在多个数据块,需要在节点间进行网络传输
  • split与map任务可能的三种情况:
    ① 存储split的节点有空闲slot,map任务可以在节点上运行
    ② 存储split的所有节点均没有空闲slot,map任务优先在同一机架的空闲节点上运行。这时,split会在同一机架的两个节点间传输
    ② 同一机架也没有空闲节点,map任务只能在不同机架的空闲节点上运行。这时split会跨机架传输
3.2 不同类型的数据流
  • 关于任务数: map任务的数据量由split的数量决定,而reduce任务的数量需要独立指定。
  • 关于分区: 如果存在多个reduce任务,map任务会为每个reduce任务创建分区。一般,按照输出数据的key将数据分配到不同的分区。同时,同一key的所有输出均在同一分区。

多个map任务,一个reduce任务

  1. 所有map任务的输出,会先存储到本地磁盘。作业完成后,才会删除中间结果。如果在执行map任务的节点在将中间结果传输给reduce任务之前失败,Hadoop会在其他节点重试map任务
  2. 排序过的map输出,通过网络传递给reduce节点,并在reduce端进行合并。
  3. reduce对合并后的数据进行处理,处理后的结果优先存到本地HDFS,其他副本按照放置策略存储到其他节点。


多个map任务,多个reduce任务

  1. map任务的输出按照一定的策略分配到不同的分区
  2. map输出到reduce输入之间的数据处理,就是shuffle


无reduce任务

  1. 有时,map任务处理后的数据,无需进行shuffle,可能也就无需reduce任务
  2. 此时,map任务会将输出优先写入本地hdfs和其他节点的hjdfs
3.3 combiner函数
  • 根据map任务和reduce任务之间的数据流,map任务和reduce任务之间的数据传输会占用一定的网络带宽

  • 减少map任务和reduce任务之间的数据传输,对提高作业的执行效率是非常有用的

  • Hadoop允许为map任务指定一个combiner函数,但是combiner函数执行任意次或不执行combiner函数,reduce任务的输出结果不会有变化

  • 例如,不使用combiner时,传入reduce任务的数据为:

    (1949, [111, 22, 33, 44, 78])
    # 最后为
    (1949, 111)
    
  • 如果,直接对map任务执行combiner

    # 每个map函数的输出
    (1949, [111, 33])
    (1949, [22, 44, 78])# combiner后的输出
    (1949, 111)
    (1949, 78)# 最后为
    (1949, 111)
    
  • 但是,如果不是求取最大值,而是求mean。最后结果33与整体结果44不符

    # 每个map函数的输出
    (1949, [111, 33])
    (1949, [22, 44, 78])# combiner后的输出
    (1949, 33)
    (1949, 44)# 最后为
    (1949, 33)
    
  • 总结: 是否使用combiner函数需要斟酌

4. Hadoop Streaming —— 允许使用任何语言编写MR程序