文章目录

  • Partition分区
    • 1、默认partitioner分区
    • 2、自定义Partitioner步骤
    • 3、注意事项
    • 4、案例

Partition分区

需求:按照条件输出到不同文件中。

案例:按照手机归属地输出到不同文件中。

1、默认partitioner分区

默认分区根据key的hashCode对ReduceTasks个数取模得到。

public class HashPartitioner<K, V> extends Partitioner<K, V> {public int getPartition(K key, V value,int numReduceTasks) {return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;}
}

2、自定义Partitioner步骤

  1. 自定义类继承Partitioner类,重写getPartition()方法
  2. 在Job驱动中,设置自定义Partitioner;
  3. 自定义partition后,根据自定义Partitioner的逻辑设置相应数量的ReduceTask。

3、注意事项

  1. 如果ReduceTask的数量 > getPartition的结果数,则会多产生几个空的输出文件part-r-000xx ;
  2. 如果1<ReduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;
  3. 如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件part-r-00000 ;
  4. 分区号必须从零开始,逐一累加。

4、案例

FlowBean

package com.hpu.hadoop.partitioner;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class FlowBean implements Writable {private Integer upFlow;private Integer downFlow;private Integer sumFlow;public FlowBean(){}public Integer getUpFlow() {return upFlow;}public void setUpFlow(Integer upFlow) {this.upFlow = upFlow;}public Integer getDownFlow() {return downFlow;}public void setDownFlow(Integer downFlow) {this.downFlow = downFlow;}public Integer getSumFlow() {return sumFlow;}public void setSumFlow(Integer sunFlow) {this.sumFlow = sunFlow;}public void setSumFlow() {this.sumFlow = this.upFlow+this.downFlow;}@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(this.upFlow);out.writeInt(this.downFlow);}@Overridepublic void readFields(DataInput in) throws IOException {this.upFlow = in.readInt();this.downFlow = in.readInt();}@Overridepublic String toString() {return this.upFlow+"\t"+this.downFlow+"\t"+this.sumFlow;}
}

Mapper:

package com.hpu.hadoop.partitioner;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FlowMapper extends Mapper<LongWritable, Text,Text, FlowBean> {private Text phone;private FlowBean flowBean;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {flowBean = new FlowBean();phone = new Text();}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] split = line.split("\t");flowBean.setUpFlow(Integer.parseInt(split[split.length-3]));flowBean.setDownFlow(Integer.parseInt(split[split.length-2]));flowBean.setSumFlow();phone.set(split[1]);context.write(phone,flowBean);}
}

自定义Partitioner:

package com.hpu.hadoop.partitioner;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;public class CustomPartitioner extends Partitioner<Text,FlowBean> {@Overridepublic int getPartition(Text text, FlowBean flowBean, int numPartitions) {String Phone = text.toString();String P = Phone.substring(0, 3);if ("136".equals(P)){return 0;} else if ("137".equals(P)){return 1;} else if ("138".equals(P)){return 2;} else if ("139".equals(P)){return 3;} else {return 4;}}
}

Reducer:

package com.hpu.hadoop.partitioner;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FlowReducer extends Reducer<Text, FlowBean,Text, FlowBean> {private FlowBean flowBean;private int sumUp;private int sumDown;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {flowBean = new FlowBean();}@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {sumUp = 0;sumDown = 0;for (FlowBean value : values) {sumUp += value.getUpFlow();sumDown += value.getDownFlow();}flowBean.setUpFlow(sumUp);flowBean.setDownFlow(sumDown);flowBean.setSumFlow();context.write(key,flowBean);}
}

Driver:

package com.hpu.hadoop.partitioner;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FlowDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//1.配置Configuration conf = new Configuration();Job job = Job.getInstance(conf);//2.Driverjob.setJarByClass(FlowDriver.class);//3.Mapperjob.setMapperClass(FlowMapper.class);//4.Reducerjob.setReducerClass(FlowReducer.class);//5.KVjob.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//6.OKVjob.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 指定自定义分区器job.setPartitionerClass(CustomPartitioner.class);// 同时指定相应数量的ReduceTaskjob.setNumReduceTasks(5);//7.InPathFileInputFormat.setInputPaths(job,new Path("F:\\input\\inputflow\\phone_data.txt"));FileOutputFormat.setOutputPath(job,new Path("E:\\Test\\f4"));//8.提交job.waitForCompletion(true);}
}

Partition分区相关推荐

  1. Partition分区及实例

    Partition分区及实例 Partition分区介绍 概念 自定义分区步骤 分区总结: Partition分区案例 需求 1.需求说明 2.文件 案例分析 1.需求 输入数据 期望输出数据 实现分 ...

  2. mysql的partition分区

    前言:当一个表里面存储的数据特别多的时候,比如单个.myd数据都已经达到10G了的话,必然导致读取的效率很低,这个时候我们可以采用把数据分到几张表里面来解决问题. 方式一:通过业务逻辑根据数据的大小通 ...

  3. mysql in partition_MySQL Partition分区扫盲

    MySQL从5.1.3开始支持Partition,你可以使用如下命令来确认你的版本是否支持Partition: mysql> SHOW VARIABLES LIKE '%partition%'; ...

  4. kafka专题:kafka的Topic主题、Partition分区、消费组偏移量offset等基本概念详解

    文章目录 1. kafka集群整体架构 2. kafka相关元素的基本概念 2.1 主题Topic和分区Partition 2.2 kafka消息存储在哪里? 2.3 分区副本 2.4 消费组和偏移量 ...

  5. maxvalue mysql自动分区_mysql的partition分区

    前言:当一个表里面存储的数据特别多的时候,比如单个.myd数据都已经达到10G了的话,必然导致读取的效率很低,这个时候我们可以采用把数据分到几张表里面来解决问题. 方式一:通过业务逻辑根据数据的大小通 ...

  6. Partition分区的使用案例

    Partition分区的使用案例: 将统计结果按照条件输出到不同文件中(分区) 文章目录 1)需求 2)需求分析 3)编程实现 1.创建Partitioner类 2.创建Bean类 3.创建Mappe ...

  7. mysql partition 语法_MySQL partition分区小结

    MySQL partition分区 分区概念 分区针对不同的数据库,具有不同的特性.在这里专门针对MySQL数据库而言.在MySQL数据库里,分区这个概念是从mysql 5.1才开始提供的.不过目前只 ...

  8. oracle partition 分区

    --范围分区 create table person( id int, name varchar2(20), birth date, sex char(2) ) partition by range ...

  9. mysql 表分区 django_MySQL partition分区I

    Mysql5.1已经发行很久了,本文根据官方文档的翻译和自己的一些测试,对Mysql分区表的局限性做了一些总结,因为个人能力以及测试环境的原因,有可能有错误的地方,还请大家看到能及时指出,当然有兴趣的 ...

最新文章

  1. flash程序员2012最大转变
  2. Linux日常之允许或禁止指定用户或IP进行SSH登录
  3. 设计 Redis Key
  4. Kafka设计原理看了又忘,忘了又看?
  5. gedit搭建c开发环境
  6. .NET Core + Spring Cloud:服务注册与发现
  7. 左右声道测试_小说:少年参加测试,直接挑战十只狗恐兽,众人见了惊呼:SS级...
  8. marmalade android 5.0 JNI 调用失败的解决方案
  9. Silverlight 中的 CoreCLR
  10. java判断integer是否为空_java安全编码指南之:表达式规则
  11. 数论知识(2)-------------欧拉函数
  12. 安卓加密软件_U盘或者文件夹加密
  13. 【复杂网络】当机器学习遇上复杂网络:解析微信朋友圈 Lookalike 算法
  14. 论文浏览(42) Action Genome: Actions as Composition of Spatio-temporal Scene Graphs
  15. 外挂制作--------过NP技术渐渐
  16. 基于javaweb的私人牙科诊所病历管理系统(java+jsp+css+javascript+mysql)
  17. php留言板的实验步骤,PHP实现基本留言板功能原理与步骤详解
  18. ps中usm锐化,智能锐化,像素化,分层云彩,光照效果,镜头光晕,纤维,云彩,减少杂色,蒙尘与划痕,祛斑,添加杂色,中间值
  19. 什么叫 Rss 订阅
  20. 竞价广告每次点击出价多少钱是固定的吗?

热门文章

  1. potplayer播放器
  2. linux下搭建CA认证
  3. android finish 刷新父,网页中CSS的细节处理大集合
  4. Windows10 初体验
  5. Nuke与Natron的区别是什么?
  6. 第一章踏上python之旅_无言军神第一章踏上旅途,无言军神第1章踏上旅途_武侠仙侠_读趣阁...
  7. golang 内存分配
  8. ios模拟器各种路径
  9. 【唐老狮】字符编码(ASCII,Unicode和UTF-8)
  10. Mpp 的一种 java 读写解决方案