背景:现在有两张表:customer、order,他们有个共同的cid,需要通过cid实现两张表的连接,并且通过cid进行分组排序

思路:首先通过mapper从context里面获取到文件切片,从文件切片中得到路径,从而判断是customer.txt,还是order.txt

然后依次读取每行内容,并且为每行数据打上一个标签,0表示customer,1表示order,最后组成一个新的组合key

然后调用排序对比器的方法,按照cid进行升序排列,同一个客户的order进行升序排列

然后进行reducer,将客户信息和订单信息进行拼接,最后进行cid分组,同一个客户的信息为一组,并按照cid升序排列

最后根据cid进行分区

comkey

package com.cr.reduceJoin;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class comkey_1 implements WritableComparable<comkey_1>{//0表示客户信息 1表示订单信息private int type;private int cid;private int oid;private String customerInfo = "";private String orderInfo = "";public int getType() {return type;}public void setType(int type) {this.type = type;}public int getCid() {return cid;}public void setCid(int cid) {this.cid = cid;}public int getOid() {return oid;}public void setOid(int oid) {this.oid = oid;}public String getCustomerInfo() {return customerInfo;}public void setCustomerInfo(String customerInfo) {this.customerInfo = customerInfo;}public String getOrderInfo() {return orderInfo;}public void setOrderInfo(String orderInfo) {this.orderInfo = orderInfo;}@Overridepublic int compareTo(comkey_1 o) {int type1 = o.type;int cid1 = o.cid;int oid1 = o.oid;String customerInfo1 = o.customerInfo;String orderInfo1 = o.orderInfo;//首先判断是否是同一个客户的数据if(cid1 == cid){//判断是否是同一个客户的两个订单if(type1 == type){//升序排列return oid - oid1;}else {//如果一个是客户一个是订单,让客户信息在前if(type == 0){return -1;}elsereturn  1;}}else {//如果是不同客户,升序排列return cid - cid1;}}@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(type);out.writeInt(cid);out.writeInt(oid);out.writeUTF(customerInfo);out.writeUTF(orderInfo);}@Overridepublic void readFields(DataInput in) throws IOException {this.type = in.readInt();this.cid = in.readInt();this.oid = in.readInt();this.customerInfo = in.readUTF();this.orderInfo = in.readUTF();}
}

mapper

package com.cr.reduceJoin;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class ReduceJoinMapper extends Mapper<LongWritable,Text,comkey_1,NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();//从环境变量中获取文件切片,从文件切片中获取路径InputSplit inputSplit = context.getInputSplit();FileSplit fileSplit = (FileSplit)inputSplit;String path = fileSplit.getPath().toString();comkey_1 comkey1 = new comkey_1();//整理文本信息成为自己定义的comkey的格式if(path.contains("customer")){String cid = line.substring(0, line.indexOf(","));String cusInfo = line;comkey1.setType(0);comkey1.setCid(Integer.parseInt(cid));comkey1.setCustomerInfo(cusInfo);}else {String cid = line.substring(line.lastIndexOf(",") + 1);String oInfo = line.substring(0, line.lastIndexOf(","));String oid = line.substring(0, line.indexOf(","));comkey1.setType(1);comkey1.setCid(Integer.parseInt(cid));comkey1.setOid(Integer.parseInt(oid));comkey1.setOrderInfo(oInfo);}//写入上下文context.write(comkey1,NullWritable.get());}
}

组合key排序对比器

package com.cr.reduceJoin;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;/*** 组合key排序对比器*/
public class Comkey2Comparator extends WritableComparator {protected Comkey2Comparator(){super(comkey_1.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {comkey_1 a1 = (comkey_1) a;comkey_1 b1 = (comkey_1) b;return a1.compareTo(b1);}
}

reducer

package com.cr.reduceJoin;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.util.Iterator;public class JoinReducer  extends Reducer<comkey_1,NullWritable,Text,NullWritable>{@Overrideprotected void reduce(comkey_1 key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {//获取迭代器Iterator<NullWritable> iterator = values.iterator();//第一条一定是客户信息iterator.next();int type = key.getType();int cid = key.getCid();String customerInfo = key.getCustomerInfo();//从第二条开始就是订单信息while (iterator.hasNext()){iterator.next();String orderInfo = key.getOrderInfo();//按照客户+订单的方式进行拼接context.write(new Text(customerInfo + "," + orderInfo),NullWritable.get());}}
}

cid分组对比器

package com.cr.reduceJoin;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;/*
CID分组对比器*/
public class CIDgroupComparator extends WritableComparator {protected CIDgroupComparator(){super(comkey_1.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {comkey_1 a1 = (comkey_1) a;comkey_1 b1 = (comkey_1) b;return a1.getCid() - b1.getCid();}
}

根据cid分区

package com.cr.reduceJoin;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;public class CIDpartition extends Partitioner<comkey_1,NullWritable>{//自定义分区类,按照cid分区@Overridepublic int getPartition(comkey_1 comkey_1, NullWritable nullWritable, int i) {return comkey_1.getCid() % i ;}
}

app

package com.cr.reduceJoin;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class ReduceJoinApp {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();conf.set("fs.defaultFS","file:///");Job job = Job.getInstance(conf);job.setJobName("ReduceJoinApp");job.setJarByClass(ReduceJoinApp.class);FileInputFormat.addInputPath(job,new Path(args[0]));Path out = new Path(args[1]);FileSystem fs = FileSystem.get(conf);if (fs.exists(out)) {fs.delete(out, true);}FileOutputFormat.setOutputPath(job,new Path(args[1]));job.setMapperClass(ReduceJoinMapper.class);job.setReducerClass(JoinReducer.class);job.setMapOutputKeyClass(comkey_1.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);job.setPartitionerClass(CIDpartition.class);job.setGroupingComparatorClass(CIDgroupComparator.class);job.setSortComparatorClass(Comkey2Comparator.class);job.setNumReduceTasks(2);job.waitForCompletion(true);}
}

part-r-000                                   part-r-001

转载于:https://www.cnblogs.com/flyingcr/p/10326875.html

reduce端的连接实现相关推荐

  1. 深入理解Spark 2.1 Core (十一):Shuffle Reduce 端的原理与源码分析

    我们曾经在<深入理解Spark 2.1 Core (一):RDD的原理与源码分析 >讲解过: 为了有效地实现容错,RDD提供了一种高度受限的共享内存,即RDD是只读的,并且只能通过其他RD ...

  2. 32查运行内存的map文件_Spark Shuffle调优之调节map端内存缓冲与reduce端内存占比

    本文首先介绍Spark中的两个配置参数: spark.shuffle.file.buffer map端内存缓冲 spark.shuffle.memoryFraction reduce端内存占比 很多博 ...

  3. 83998 连接服务器出错_服务端 TCP 连接的 TIME_WAIT 问题分析与解决

    民工哥技术之路 写在开头,大概 4 年前,听到运维同学提到 TIME_WAIT 状态的 TCP 连接过多的问题,但是当时没有去细琢磨:最近又听人说起,是一个新手进行压测过程中,遇到的问题,因此,花点时 ...

  4. 一云多端,连接万物:智能接入网关2.0升级技术揭秘

    一云多端,连接万物:全新发布SAG-APP和SAG-vCPE 阿里云提供了多种产品形态支持各种场景上云,SAG2.0发布了SAG-APP支持各种操作系统的终端接入上云,同时发布了SAG-vCPE支持3 ...

  5. BZip2Codec压缩、Map端压缩控制、Reduce端压缩控制……都在这份Hadoop整合压缩知识点里了!...

    作者 | Tai_Park 责编 | Carol 来源 | CSDN 博客 封图 | CSDN付费下载于东方 IC 今天来聊聊 Hadoop 的压缩. 压缩:原始数据通过压缩手段产生目标数据,要求输入 ...

  6. 基础原理系列:服务端 TCP 连接的 TIME_WAIT 问题

    几个方面: 问题描述:什么现象?什么影响? 问题分析 解决方案 底层原理 1.问题描述 模拟高并发的场景,会出现批量的 TIME_WAIT 的 TCP 连接: 短时间后,所有的 TIME_WAIT 全 ...

  7. FTP 编写 2:客户端与服务端的连接

    FTP 编写 2:客户端与服务端的连接     首先编写客户端与服务端能进行简单的连接,这个较为容易:     服务端的编写流程是:启动 Winsock.建立套接字.绑定套接字.监听.接收连接.关闭连 ...

  8. 第2节 mapreduce深入学习:15、reduce端的join算法的实现

    reduce端的join算法: 例子: 商品表数据 product:  pid p0001,小米5,1000,2000 p0002,锤子T1,1000,3000 订单表数据 order:        ...

  9. 镭速快答:网页端无法连接镭速,提示未启动怎么办?

    大文件传输软件镭速传输,拥有自主研发的Raysync超高速传输协议,轻松满足TB级别大文件和海量小文件极速传输需求,正在为IT.金融.影视.生物基因.制造业等众多领域的2W+企业提供了高性能.稳定安全 ...

最新文章

  1. CSP 2019-09-1 小明种苹果 Python实现+详解
  2. Groovy基本句法
  3. 东京奥运会的官网的最上面是广告栏
  4. [导入]ASP.NET生成高质量缩略图通用函数(c#代码)
  5. 原生sql 查询返回一个实体_python连接SQLServer执行给定的查询SQL语句,并返回结果数据...
  6. [转]SQL Server 索引基础知识(2)----聚集索引,非聚集索引
  7. java连接rabbitmq_RabbitMQ教程(工作模式篇)
  8. 《父亲家书》选:如何处理考研与工作机会(下)
  9. OpenCV图像处理(3)——盒维数计算
  10. java jquery用的多吗_[Java教程]如果不用jQuery,Ajax你还能写出多少?
  11. uni-app的生命周期
  12. HttpWatch工具简介及使用技巧(转)
  13. c语言摄氏度字符,c语言摄氏度与华氏温度如何转换
  14. linux怎么打出管道命令这个符号,linux 管道命令 竖线 ‘ | ’
  15. Adobe系列軟件PC + MAC 2021
  16. 无监督学习------分类学习-----朴素贝叶斯(navie bayes)
  17. Dive_into_Deep_Learning
  18. 公路建设过程中路肩石水渠机的处理点
  19. gta5怎么设置画质最好_GTA5:如何让你的游戏画质更好,游戏更顺手,一波设置教给大家!...
  20. 字长16位的计算机 它表示是,计算机应用基础(第2版)在线作业

热门文章

  1. visual studio 2008 intellisense does not work
  2. 傳統經理人的迷思(转)
  3. SwiftUI 打开高德地图
  4. [转]python3_unboundlocalerror报错原因
  5. python word 英语音标_(完整word版)英语音标大全,推荐文档
  6. 如何用绩效考核搞垮一个团队?
  7. 如何将枯燥的大数据呈现为可视化的图?
  8. 光引发剂主要用途_光引发剂分类及用途
  9. python字节码执行函数_做一个字节码追踪器,从内部理解 Python 的执行过程
  10. linux下mysql5.7创建用户_Linux下mysql5.7 创建、删除用户与授权