2019独角兽企业重金招聘Python工程师标准>>>

在关系型数据库中,要实现join操作是非常方便的,通过sql定义的join原语就可以实现。在hdfs存储的海量数据中,要实现join操作,可以通过HiveQL很方便地实现。不过HiveQL也是转化成MapReduce来完成操作,本文首先研究如何通过编写MapReduce程序来完成join操作。

一、Map-Join:在Reduce端完成的join操作

   假设存在用户数据文件users.txt和用户登录日志数据文件login_logs.txt,数据内容分别如下所示:

   用户数据文件user.txt,列:userid、name:

1    LiXiaolong
2    JetLi
3    Zhangsan
4    Lisi
5    Wangwu

   用户登录日志数据文件login_logs.txt,列:userid、login_time、login_ip:

1    2015-06-07 15:10:18    192.168.137.101
3    2015-06-07 15:12:18    192.168.137.102
3    2015-06-07 15:18:36    192.168.137.102
1    2015-06-07 15:22:38    192.168.137.101
1    2015-06-07 15:26:11    192.168.137.103

   期望计算结果:

1    LiXiaolong    2015-06-07 15:10:18    192.168.137.101
1    LiXiaolong    2015-06-07 15:22:38    192.168.137.101
1    LiXiaolong    2015-06-07 15:26:11    192.168.137.103
3    Zhangsan    2015-06-07 15:12:18    192.168.137.102
3    Zhangsan    2015-06-07 15:18:36    192.168.137.102

   计算思路:

   1) 在map阶段可以通过文件路径判断来自users.txt还是login_logs.txt,来自users.txt的数据输出<userid, 'u#'+name>,来自login_logs.txt的数据输出<userid,'l#'+login_time+'\t'+login_ip>;

   2) 在reduce阶段将来自不同表的数据区分开,然后做笛卡尔乘积,输出结果;

   实现代码:

package com.hicoor.hadoop.mapreduce;import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.LinkedList;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class ReduceJoinDemo {public static final String DELIMITER = "\t"; // 字段分隔符static class MyMappper extends Mapper<LongWritable, Text, Text, Text> {@Overrideprotected void map(LongWritable key, Text value,Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException {FileSplit split = (FileSplit) context.getInputSplit();String filePath = split.getPath().toString();// 获取记录字符串String line = value.toString();// 抛弃空记录if (line == null || line.trim().equals("")) return;String[] values = line.split(DELIMITER);// 处理user.txt数据if (filePath.contains("users.txt")) {if (values.length < 2) return;context.write(new Text(values[0]), new Text("u#" + values[1]));}// 处理login_logs.txt数据else if (filePath.contains("login_logs.txt")) {if (values.length < 3) return;context.write(new Text(values[0]), new Text("l#" + values[1] + DELIMITER + values[2]));}}}static class MyReducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values,Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException {LinkedList<String> linkU = new LinkedList<String>();  //users值LinkedList<String> linkL = new LinkedList<String>();  //login_logs值for (Text tval : values) {String val = tval.toString();  if(val.startsWith("u#")) {linkU.add(val.substring(2));} else if(val.startsWith("l#")) {linkL.add(val.substring(2));}}for (String u : linkU) {for (String l : linkL) {context.write(key, new Text(u + DELIMITER + l));}}}}private final static String FILE_IN_PATH = "hdfs://cluster1/join/in";private final static String FILE_OUT_PATH = "hdfs://cluster1/join/out";public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {System.setProperty("hadoop.home.dir", "D:\\desktop\\hadoop-2.6.0");Configuration conf = getHAContiguration();// 删除已存在的输出目录FileSystem fileSystem = FileSystem.get(new URI(FILE_OUT_PATH), conf);if (fileSystem.exists(new Path(FILE_OUT_PATH))) {fileSystem.delete(new Path(FILE_OUT_PATH), true);}Job job = Job.getInstance(conf, "Reduce Join Demo");job.setMapperClass(MyMappper.class);job.setJarByClass(ReduceJoinDemo.class);job.setReducerClass(MyReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(FILE_IN_PATH));FileOutputFormat.setOutputPath(job, new Path(FILE_OUT_PATH));job.waitForCompletion(true);}private static Configuration getHAContiguration() {Configuration conf = new Configuration();conf.setStrings("dfs.nameservices", "cluster1");conf.setStrings("dfs.ha.namenodes.cluster1", "hadoop1,hadoop2");conf.setStrings("dfs.namenode.rpc-address.cluster1.hadoop1", "172.19.7.31:9000");conf.setStrings("dfs.namenode.rpc-address.cluster1.hadoop2", "172.19.7.32:9000");// 必须配置,可以通过该类获取当前处于active状态的namenodeconf.setStrings("dfs.client.failover.proxy.provider.cluster1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");return conf;}}

二、Reduce-join:在Reduce端完成的join操作

   当join的两个表中有一个表数据量不大,可以轻松加载到各节点内存中时,可以使用DistributedCache将小表的数据加载到分布式缓存,然后MapReduce框架会缓存数据分发到需要执行map任务的节点上,在map节点上直接调用本地的缓存文件参与计算。在Map端完成join操作,可以降低网络传输到Reduce端的数据流量,有利于提高整个作业的执行效率。

   计算思路:

   假设users.txt用户表数据量较小,则将users.txt数据添加到DistributedCache分布式缓存中,在map计算中读取本地缓存的users.txt数据并将login_logs.txt中的userid数据翻译成用户名,本例无需Reduce参与。

   实现代码:

package com.hicoor.hadoop.mapreduce;import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.Scanner;
import java.util.StringTokenizer;import org.apache.commons.collections.map.HashedMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.SecondarySort.Reduce;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.webapp.example.MyApp.MyController;public class DistributedCacheDemo {public static final String DELIMITER = "\t"; // 字段分隔符static class MyMappper extends Mapper<LongWritable, Text, Text, Text> {private Map<String, String> userMaps = new HashedMap();@Overrideprotected void setup(Mapper<LongWritable,Text,Text,Text>.Context context) throws IOException ,InterruptedException {//可以通过localCacheFiles获取本地缓存文件的路径//Configuration conf = context.getConfiguration();//Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(conf);//此处使用快捷方式users.txt访问FileReader fr = new FileReader("users.txt");BufferedReader br = new BufferedReader(fr);String line;while((line = br.readLine()) != null) {//map端加载缓存数据String[] splits = line.split(DELIMITER);if(splits.length < 2) continue;userMaps.put(splits[0], splits[1]);}};@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {// 获取记录字符串String line = value.toString();// 抛弃空记录if (line == null || line.trim().equals("")) return;String[] values = line.split(DELIMITER);if(values.length < 3) return;String name = userMaps.get(values[0]);Text t_key = new Text(values[0]);Text t_value = new Text(name + DELIMITER + values[1] + DELIMITER + values[2]);context.write(t_key, t_value);}}private final static String FILE_IN_PATH = "hdfs://cluster1/join/in/login_logs.txt";private final static String FILE_OUT_PATH = "hdfs://cluster1/join/out";public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {System.setProperty("hadoop.home.dir", "D:\\desktop\\hadoop-2.6.0");Configuration conf = getHAConfiguration();// 删除已存在的输出目录FileSystem fileSystem = FileSystem.get(new URI(FILE_OUT_PATH), conf);if (fileSystem.exists(new Path(FILE_OUT_PATH))) {fileSystem.delete(new Path(FILE_OUT_PATH), true);}//添加分布式缓存文件 可以在map或reduce中直接通过users.txt链接访问对应缓存文件DistributedCache.addCacheFile(new URI("hdfs://cluster1/join/in/users.txt#users.txt"), conf);Job job = Job.getInstance(conf, "Map Distributed Cache Demo");job.setMapperClass(MyMappper.class);job.setJarByClass(DistributedCacheDemo.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(FILE_IN_PATH));FileOutputFormat.setOutputPath(job, new Path(FILE_OUT_PATH));job.waitForCompletion(true);}private static Configuration getHAConfiguration() {Configuration conf = new Configuration();conf.setStrings("dfs.nameservices", "cluster1");conf.setStrings("dfs.ha.namenodes.cluster1", "hadoop1,hadoop2");conf.setStrings("dfs.namenode.rpc-address.cluster1.hadoop1", "172.19.7.31:9000");conf.setStrings("dfs.namenode.rpc-address.cluster1.hadoop2", "172.19.7.32:9000");//必须配置,可以通过该类获取当前处于active状态的namenodeconf.setStrings("dfs.client.failover.proxy.provider.cluster1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");return conf;}}

三、使用HiveQL来完成join

   使用HiveQL可以轻松完成该任务,只需使用表连接语句,hive会自动生成并优化mapreduce程序来执行查询操作。

   实现步骤:

   1) 在/join/in/目录下创建users目录和login_logs目录,分别将users.txt和login_logs.txt移动到对应目录中;

   2) 创建users外部表:create external table users(userid int, name string) row format delimited fields terminated by '\t' location '/join/in/users';

   3) 创建login_logs外部表:create external table login_logs(userid string,login_time string,login_ip string) row format delimited fields terminated by '\t' location '/join/in/login_logs';

   4)执行连接查询并保存结果:create table user_login_logs as select A.*,B.login_time,B.login_ip from users A,login_logs B where A.userid=B.userid;

四、总结

   通常情况下我们会使用hive来帮助我们完成join操作,map-join和reduce-join用于实现一些复杂的、特殊的需求。此外还有一种实现方式:SemiJoin,这是一种介于map-join和reduce-join之间的方法,就是在map端过滤掉一些数据,在网络中只传输参与连接的数据不参与连接的数据不必在网络中进行传输,从而减少了shuffle的网络传输量,使整体效率得到提高。

   执行效率:map-join>SemiJoin>reduce-join。

转载于:https://my.oschina.net/xiaoluobutou/blog/814485

使用MapReduce实现join操作相关推荐

  1. MapReduce实现join操作

    前阵子把MapReduce实现join操作的算法设想清楚了,但一直没有在代码层面落地.今天终于费了些功夫把整个流程走了一遭,期间经历了诸多麻烦并最终得以将其一一搞定,再次深切体会到,什么叫从计算模型到 ...

  2. MapReduce之join操作

    一  前言 在很多时候,我们可能需要处理的不是一个单独的文件,而是几个有关联的文件,比如账户信息和订单信息=> 账户信息:customerIdname address telephone 订单信 ...

  3. [MapReduce_add_4] MapReduce 的 join 操作

    0. 说明 Map 端 join && Reduce 端 join 1. Map 端 join Map 端 join:大表+小表 => 将小表加入到内存,迭代大表每一行,与之进行 ...

  4. MapReduce之Map join操作

    MapReduce之Map join操作(分布式缓存) 文章目录 MapReduce之Map join操作(分布式缓存) 案例结合 利用MapReduce中的setup方法与DistributedCa ...

  5. 5、HIVE DML操作、load数据、update、Delete、Merge、where语句、基于分区的查询、HAVING子句、LIMIT子句、Group By语法、Hive 的Join操作等

    目录: 4.2.1 Load文件数据到表中 4.2.2查询的数据插入到表中 4.2.3将Hive查询的结果存到本地Linux的文件系统目录中 4.2.4通过SQL语句的方式插入数据 4.2.5 UPD ...

  6. Hive是如何让MapReduce实现SQL操作的?

    learn from 从0开始学大数据(极客时间) 1. MapReduce 实现 SQL 的原理 SELECT pageid, age, count(1) FROM pv_users GROUP B ...

  7. Flink学习笔记:Operators之CoGroup及Join操作

    本文为<Flink大数据项目实战>学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推荐学习课程: Flink大数据项目实战:http://t.cn/EJtKhaz ...

  8. shell中join链接多个域_shell 如何实现两个表的join操作

    shell 如何实现两个表的join操作 今天研究的一个问题是:在Shell 脚本中如何实现两个表的 join 操作,这里说的两个表示的其实是 两个文件,但是文件是列表的形式,有固定的分割符号,即就相 ...

  9. 离线轻量级大数据平台Spark之JavaRDD关联join操作

    对两个RDD进行关联操作,如: 1)文件post_data.txt包含:post_id\title\content 2)文件train.txt包含:dev_id\post_id\praise\time ...

最新文章

  1. Android富文本处理
  2. 链表定义、链表的插入、链表的删除、链表的查找
  3. C#趣味程序---百鸡百钱
  4. Spring基于 XML 的声明式事务控制(配置方式)
  5. 用Tomcat构建一个简单图片服务器
  6. 破坏计算机系统既遂的标准,破坏计算机信息系统罪的量刑标准是什么
  7. python接口测试实战_Python接口测试实战01:七种武器
  8. gsonformat插件_吐血推荐珍藏的IDEA插件
  9. 工程之道 | CPU推理性能提高数十倍,MegEngine计算图、MatMul优化解析
  10. Interrupted Exception异常可能没你想的那么简单!
  11. 安卓Gallery配合ImageSwitcher不显示图片
  12. 第35章 GoogleAuthenticator插件项的定义实现
  13. 各地2022年上半年软考考试疫情防控要求汇总-2022-05更新
  14. HORAE深入思考及长久性论证
  15. 从“中国宙斯盾”亮相想到的
  16. 不必急于吹响裁判哨:“安卓禁令”后华为手机的变数与新机
  17. (小脚本) (python) 批量修改文件后缀名
  18. 东软慧聚助力汽车“芯”节能减排
  19. windows系统删除顽固文件
  20. 今天获取的云蹦迪直播软件源码全开源

热门文章

  1. (转载博文)VC++API速查
  2. JavaMail的体系结构及发送复杂邮件
  3. php查找函数所在文件
  4. 如何配置三层交换机创建VLAN 2
  5. STL之hashtable源代码剖析
  6. 路由跟踪命令.查看DNS、IP、Mac等
  7. 让chrome浏览器支持跨域
  8. python将控制台输出保存至文件
  9. hive导入导出数据案例
  10. T100 section已经修改过