关系代数运算

MapReduce可以在关系代数的运算上发挥重要的作用,因为关系代数运算具有数据相关性低的特性,这使得其便于进行MapReduce的并行化算法设计。

常见的关系代数运算包括选择、投影、并、交、差以及自然连接操作,都可以十分容易利用MapReduce来进行并行化。

选择操作

对于关系RRR应用条件C" role="presentation">CCC,例如:查询分数大于90分的学生。我们只需要在Map阶段对于每个输入的记录判断是否满足条件,将满足条件的记录的输出即可。Reduce阶段无需做额外的工作。

代码:
package cn.zzuli.zcs0;/*** Created by 张超帅 on 2018/8/16.*/
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;/*** 表示一个关系的属性构成* @author KING**/
public class RelationA implements WritableComparable<RelationA>{private int id;private String name;private int age;private double weight;public RelationA(){}public RelationA(int id, String name, int age, double weight){this.setId(id);this.setName(name);this.setAge(age);this.setWeight(weight);}public RelationA(String line){String[] value = line.split(",");this.setId(Integer.parseInt(value[0]));this.setName(value[1]);this.setAge(Integer.parseInt(value[2]));this.setWeight(Double.parseDouble(value[3]));}public boolean isCondition(int col, String value){if(col == 0 && Integer.parseInt(value) == this.id)return true;else if(col == 1 && name.equals(value))return true;else if(col ==2 && Integer.parseInt(value) == this.age)return true;else if(col ==3 && Double.parseDouble(value) == this.weight)return true;elsereturn false;}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}public double getWeight() {return weight;}public void setWeight(double weight) {this.weight = weight;}public String getCol(int col){switch(col){case 0: return String.valueOf(id);case 1: return name;case 2: return String.valueOf(age);case 3: return String.valueOf(weight);default: return null;}}public String getValueExcept(int col){switch(col){case 0: return name + "," + String.valueOf(age) + "," + String.valueOf(weight);case 1: return String.valueOf(id) + "," + String.valueOf(age) + "," + String.valueOf(weight);case 2: return String.valueOf(id) + "," + name + "," + String.valueOf(weight);case 3: return String.valueOf(id) + "," + name + "," + String.valueOf(age);default: return null;}}@Overridepublic String toString(){return id + "," + name + "," + age + "," + weight;}public void write(DataOutput out) throws IOException {// TODO Auto-generated method stubout.writeInt(id);out.writeUTF(name);out.writeInt(age);out.writeDouble(weight);}public void readFields(DataInput in) throws IOException {// TODO Auto-generated method stubid = in.readInt();name = in.readUTF();age = in.readInt();weight = in.readDouble();}public int compareTo(RelationA o) {if(id == o.getId() && name.equals(o.getName())&& age == o.getAge() && weight == o.getWeight())return 0;else if(id < o.getId())return -1;elsereturn 1;}
}
package cn.zzuli.zcs0;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;/*** Created by 张超帅 on 2018/8/16.*/
public class Selection {public static  class SelectionMap extends Mapper<LongWritable, Text, RelationA, NullWritable>{private int id;private String value;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {id = context.getConfiguration().getInt("col", 0);value = context.getConfiguration().get("value");}@Overrideprotected void map(LongWritable key, Text line, Context context) throws IOException, InterruptedException {RelationA record = new RelationA(line.toString());if(record.isCondition(id, value))context.write(record, NullWritable.get());}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();//selectionJob.setJobName("selectionJob");conf.setInt("col", Integer.parseInt(args[2]));conf.set("value", args[3]);Job selectionJob = Job.getInstance(conf, "selectionJob");selectionJob.setJarByClass(Selection.class);selectionJob.setMapperClass(SelectionMap.class);selectionJob.setMapOutputKeyClass(RelationA.class);selectionJob.setMapOutputKeyClass(NullWritable.class);selectionJob.setNumReduceTasks(0);selectionJob.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.addInputPath(selectionJob, new Path(args[0]));FileOutputFormat.setOutputPath(selectionJob, new Path(args[1]));selectionJob.waitForCompletion(true);System.out.println("Finished!");}
}

投影操作

例如在关系R上应用投影操作获得属性AGE的所有值。我们只需要在Map阶段将每条记录在该属性上的值作为键输出即可,此时对应该键的值为MapReduceMapReduceMapReduce一个自定义类型NullWritableNullWritableNullWritable的一个对象。而在Reduce端我们仅仅将Map端输入的键输出即可。注意,此时投影操作具有去重的功能。

package cn.zzuli.zcs0;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import java.io.IOException;/*** Created by 张超帅 on 2018/8/16.*/
public class Projection {public static class ProjectionMap extends Mapper<LongWritable, Text, Text, NullWritable> {private int col;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {col = context.getConfiguration().getInt("col", 0);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {RelationA record = new RelationA(value.toString());context.write(new Text(record.getCol(col)), NullWritable.get());}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();conf.setInt("col", Integer.parseInt(args[2]));Job projectionJob = Job.getInstance(conf, "ProgectionJob");projectionJob.setJarByClass(Projection.class);projectionJob.setMapperClass(ProjectionMap.class);projectionJob.setMapOutputKeyClass(Text.class);projectionJob.setMapOutputValueClass(NullWritable.class);projectionJob.setNumReduceTasks(0);projectionJob.setInputFormatClass(TextInputFormat.class);projectionJob.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.addInputPath(projectionJob, new Path(args[0]));FileOutputFormat.setOutputPath(projectionJob, new Path(args[1]));projectionJob.waitForCompletion(true);System.out.println("finished!");}
}

交运算

获得两张表交集的主要思想如下:如果有一个关系T和关系R为同一个模式,我们希望获得R和T的交集,那么在Map阶段我们对于R和T中的每一条数据记录r输出(r,1)(r,1)(r,1).
在Reduce阶段汇总计数,如果计数为2,我们刚将该记录输出。这里我们有一个需要额外注意的地方。我们只有将R和T表中相同的记录都发送到了同一个Reduce节点才会被其正确的判断为是交集中 的一个记录而输出,因此我们必须保证相同的记录会被发送到相同的Reduce节点。由于实现时使用了RelationA对象作为主键,这是MapReduce默认会通过对象的hashcode值来划分Map的中间结果并输出到不同的Reduce节点。因此这里我们需要重写自定义类的hashCode方法使得值相同的对象的hashcode值也一定相同。例如,这里对应关系R的类的定义如下,我们需要根据四个域的值来重写hashCode()方法使得具有相同域值的记录具有相同的哈希值。

package cn.zzuli.zcs0;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import java.io.IOException;/*** Created by 张超帅 on 2018/8/16.*/
public class Intersection {public static class IntersectionMap extends Mapper<LongWritable, Text, RelationA, IntWritable> {private IntWritable one = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {RelationA record = new RelationA(value.toString());context.write(record, one);}}public static class IntersectionReduce extends Reducer<RelationA, IntWritable, RelationA, NullWritable>{@Overrideprotected void reduce(RelationA key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for(IntWritable val : values) {sum += val.get();}if(sum == 2)context.write(key, NullWritable.get());}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job Intersectionjob = Job.getInstance(conf, "Intersection");Intersectionjob.setJarByClass(Intersection.class);Intersectionjob.setMapperClass(IntersectionMap.class);Intersectionjob.setReducerClass(IntersectionReduce.class);Intersectionjob.setMapOutputKeyClass(RelationA.class);Intersectionjob.setMapOutputValueClass(IntWritable.class);Intersectionjob.setOutputValueClass(NullWritable.class);Intersectionjob.setOutputKeyClass(RelationA.class);Intersectionjob.setInputFormatClass(TextInputFormat.class);Intersectionjob.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.addInputPath(Intersectionjob, new Path(args[0]));FileOutputFormat.setOutputPath(Intersectionjob, new Path(args[1]));Intersectionjob.waitForCompletion(true);System.out.println("OVER");}}

差运算

例如,计算R-T,也即希望找出在R中存在而在T中不存在的记录,则对于R和T中的每一条记录r在Map阶段分别输出键值对(r,R)和(r,T).在Reduce阶段检查一条记录r的多有对应值列表。如果只有R而没有T则将该条记录输出。这里与上面的交运算相似,都需要注意相同的记录应该被发送到相同的Reduce节点。

package cn.zzuli.zcs0;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import java.io.IOException;
import java.lang.*;
/*** Created by 张超帅 on 2018/8/16.*/
public class Difference {public static class DifferenceMap extends Mapper<Object, Text, RelationA, Text > {@Overrideprotected void map(Object key, Text value, Context context) throws IOException, InterruptedException {FileSplit split = (FileSplit)context.getInputSplit();String filename = split.getPath().getName();System.out.println(filename);RelationA relation = new RelationA(value.toString());context.write(relation, new Text(filename));}}public static class DifferenceReduce extends Reducer<RelationA, Text, RelationA, NullWritable> {String setR;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {setR = context.getConfiguration().get("setR");}@Overrideprotected void reduce(RelationA key, Iterable<Text> values, Context context) throws IOException, InterruptedException {StringBuffer str = new StringBuffer();for(Text val : values) {str.append(val.toString()).append(",");}if(str.toString().contains(setR))context.write(key, NullWritable.get());}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();conf.set("setR", args[2]);Job differenceJob = Job.getInstance(conf, "differenceJob");differenceJob.setJarByClass(Difference.class);differenceJob.setMapperClass(DifferenceMap.class);differenceJob.setReducerClass(DifferenceReduce.class);differenceJob.setMapOutputKeyClass(RelationA.class);differenceJob.setMapOutputValueClass(Text.class);differenceJob.setOutputKeyClass(RelationA.class);differenceJob.setOutputValueClass(NullWritable.class);differenceJob.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.addInputPath(differenceJob, new Path(args[0]));FileOutputFormat.setOutputPath(differenceJob, new Path(args[1]));differenceJob.waitForCompletion(true);System.out.println("Over");}}

自然连接

例如,我们需要在属性ID上作关系R和关系S的自然连接。在Map阶段对于每一条R和S中的记录r,我们把它的ID的值作为键,其余属性的值以及R(S中的记录为S的名称)的名称作为值输出出去。在Reduce阶段我们则将同一键中的所有值根据他们的来源(R和S)分为两组做笛卡尔乘积然后将得到的结果输出出去。

MapReduce 基础算法【关系代数运算】相关推荐

  1. 关系数据库基础:关系代数运算知识笔记

    1.关系代数运算符 集合运算符:并(U).差(-).交(∩).笛卡尔积(×) 专门的关系运算符:选择(∂).投影(π).连接(∞).除(÷) 算术比较符:大于(>).大于等于(≥).小于(< ...

  2. mysql将sql转为关系代数_关系数据库基础:关系代数运算知识笔记

    1.关系代数运算符 集合运算符:并(U).差(-).交(∩).笛卡尔积(×) 专门的关系运算符:选择(∂).投影(π).连接(∞).除(÷) 算术比较符:大于(>).大于等于(≥).小于(< ...

  3. MapReduce的关系代数运算

    关系代数 概念 R(A1,A2,...,An)R(A_1,A_2,...,A_n)R(A1​,A2​,...,An​)表示关系的名称是RRR,其属性是A1,A2,...,AnA_1,A_2,...,A ...

  4. MapReduce关系代数运算

    常见关系代数运算包括:选择.投影.并.交.差以及自然连接操作等,都可以十分容易利用MapReduce框架进行并行化计算 关系R NAME SEX AGE 小明 男 25 小红 女 18 小张 男 22 ...

  5. MapReduce关系代数运算——差

    MapReduce关系代数运算--差 关系沿用上一个选择运算的关系R和S,StudentR类也是一致的,本博文中就不赘述了. MapReduce程序设计 DifferenceMap import or ...

  6. MapReduce关系代数运算——投影

    MapReduce关系代数运算--投影 关系沿用上一个选择运算的关系R,StudentR类也是一致的,本博文中就不赘述了. MapReduce程序设计 Projection import org.ap ...

  7. 【数据库基础】 几种基本的关系代数运算方法

    关系代数是一种抽象的查询语言,用对关系的运算来表达查询,作为研究关系数据语言的数学工具.1 目录 基本的关系代数算法 传统的集合运算 并 ∪\cup∪ 交 ∩\cap∩ 差 −-− 笛卡尔积(广义) ...

  8. MapReduce关系代数运算——选择

    MapReduce关系代数运算 常见的MapReduce关系代数运算有:选择.投影.并.交.差以及自然连接操作等,本文将介绍选择运算.后续博文介绍其他运算. 关系R 关系R id name age g ...

  9. MapReduce关系代数运算——自然连接

    MapReduce关系代数运算--自然连接 关系沿用之前的R. 创建两个文件 表1 student id name sex age 1 Amy female 18 2 Tom male 19 3 Sa ...

最新文章

  1. Android官方命令深入分析之Device Monitor
  2. 第4章javascript变量、作用域和内存回收
  3. 如何将frm文件导入MySql数据库
  4. 图解Java 开发教程
  5. 未来的数据中心(二)
  6. 【SRH】------常见的HTTP状态码
  7. android系统action大全
  8. python导入random模块_Python内置模块之random
  9. python3捕获异常_python中异常捕获方法详解
  10. 有关性能测试结果的几点分析原则
  11. 如何完整卸载wxpython_TextCtrl的WXPython C++部分被删除
  12. Python大数据分析(一):认识大数据
  13. python计算程序运行时间毫秒_Python获取秒级时间戳与毫秒级时间戳方法
  14. MATLAB 爬取配色css数据及渐变图
  15. 51单片机外部中断实例
  16. unity编辑器拓展九——删除场景中丢失的脚本
  17. android 连接本地设备,从USB连接的Android移动设备访问PC的本地主机
  18. 证明HITTING SET 是NP完全
  19. sumo快速运行简单仿真实例详细教程
  20. vue——数字加逗号分隔

热门文章

  1. 微信小程序网络请求异常怎么办_监控微信小程序wx.request请求失败
  2. MySQL5.6数据库多种安装方法及企业级安装实践
  3. 苹果软件版测试周期,苹果推出iOS测试更新周期的第四个beta版本
  4. 来看看Google的未来工作环境设计,有你喜欢的元素吗?
  5. Docker php安装扩展步骤详解
  6. Kinect for Windows培训视频
  7. 微信小程序详解(1 小程序账号注册和安装小程序开发软件)
  8. 输入法中的全角与半角
  9. 搜狗输入法显示全角符号,切换全角操作
  10. hackme.inndy.tw的一些Writeup(5月30更新)