因为项目需求,须要通过Java程序提交Yarn的MapReduce的计算任务。与一般的通过Jar包提交MapReduce任务不同,通过程序提交MapReduce任务须要有点小变动。详见下面代码。

下面为MapReduce主程序,有几点须要提一下:

1、在程序中,我将文件读入格式设定为WholeFileInputFormat,即不正确文件进行切分。

2、为了控制reduce的处理过程。map的输出键的格式为组合键格式。

与常规的<key,value>不同,这里变为了<TextPair,Value>,TextPair的格式为<key1,key2>。

3、为了适应组合键,又一次设定了分组函数。即GroupComparator。分组规则为,仅仅要TextPair中的key1同样(不要求key2同样),则数据被分配到一个reduce容器中。这样,当同样key1的数据进入reduce容器后,key2起到了一个数据标识的作用。

package web.hadoop;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;import util.Utils;public class GEMIMain {public GEMIMain(){job = null;}public Job job;public static class NamePartitioner extendsPartitioner<TextPair, BytesWritable> {@Overridepublic int getPartition(TextPair key, BytesWritable value,int numPartitions) {return Math.abs(key.getFirst().hashCode() * 127) % numPartitions;}}/*** 分组设置类。仅仅要两个TextPair的第一个key同样。他们就属于同一组。

他们的Value就放到一个Value迭代器中, * 然后进入Reducer的reduce方法中。 * * @author hduser * */ public static class GroupComparator extends WritableComparator { public GroupComparator() { super(TextPair.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { TextPair t1 = (TextPair) a; TextPair t2 = (TextPair) b; // 比較同样则返回0,比較不同则返回-1 return t1.getFirst().compareTo(t2.getFirst()); // 仅仅要是第一个字段同样的就分成为同一组 } } public boolean runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); // 在conf中设置outputath变量,以在reduce函数中能够获取到该參数的值 conf.set("outputPath", args[args.length - 1].toString()); //设置HDFS中,每次任务生成产品的质量文件所在目录。args数组的倒数第二个原数为质量文件所在目录 conf.set("qualityFolder", args[args.length - 2].toString()); //假设在Server中执行。则须要获取web项目的根路径;假设以java应用方式调试,则读取/opt/hadoop-2.5.0/etc/hadoop/目录下的配置文件 //MapReduceProgress mprogress = new MapReduceProgress(); //String rootPath= mprogress.rootPath; String rootPath="/opt/hadoop-2.5.0/etc/hadoop/"; conf.addResource(new Path(rootPath+"yarn-site.xml")); conf.addResource(new Path(rootPath+"core-site.xml")); conf.addResource(new Path(rootPath+"hdfs-site.xml")); conf.addResource(new Path(rootPath+"mapred-site.xml")); this.job = new Job(conf); job.setJobName("Job name:" + args[0]); job.setJarByClass(GEMIMain.class); job.setMapperClass(GEMIMapper.class); job.setMapOutputKeyClass(TextPair.class); job.setMapOutputValueClass(BytesWritable.class); // 设置partition job.setPartitionerClass(NamePartitioner.class); // 在分区之后依照指定的条件分组 job.setGroupingComparatorClass(GroupComparator.class); job.setReducerClass(GEMIReducer.class); job.setInputFormatClass(WholeFileInputFormat.class); job.setOutputFormatClass(NullOutputFormat.class); // job.setOutputKeyClass(NullWritable.class); // job.setOutputValueClass(Text.class); job.setNumReduceTasks(8); // 设置计算输入数据的路径 for (int i = 1; i < args.length - 2; i++) { FileInputFormat.addInputPath(job, new Path(args[i])); } // args数组的最后一个元素为输出路径 FileOutputFormat.setOutputPath(job, new Path(args[args.length - 1])); boolean flag = job.waitForCompletion(true); return flag; } @SuppressWarnings("static-access") public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException { String[] inputPaths = new String[] { "normalizeJob", "hdfs://192.168.168.101:9000/user/hduser/red1/", "hdfs://192.168.168.101:9000/user/hduser/nir1/","quality11111", "hdfs://192.168.168.101:9000/user/hduser/test" }; GEMIMain test = new GEMIMain(); boolean result = test.runJob(inputPaths); } }

下面为TextPair类

public class TextPair implements WritableComparable<TextPair> {private Text first;private Text second;public TextPair() {set(new Text(), new Text());}public TextPair(String first, String second) {set(new Text(first), new Text(second));}public TextPair(Text first, Text second) {set(first, second);}public void set(Text first, Text second) {this.first = first;this.second = second;}public Text getFirst() {return first;}public Text getSecond() {return second;}@Overridepublic void write(DataOutput out) throws IOException {first.write(out);second.write(out);}@Overridepublic void readFields(DataInput in) throws IOException {first.readFields(in);second.readFields(in);}@Overridepublic int hashCode() {return first.hashCode() * 163 + second.hashCode();}@Overridepublic boolean equals(Object o) {if (o instanceof TextPair) {TextPair tp = (TextPair) o;return first.equals(tp.first) && second.equals(tp.second);}return false;}@Overridepublic String toString() {return first + "\t" + second;}@Override/**A.compareTo(B)* 假设比較同样,则比較结果为0* 假设A大于B,则比較结果为1* 假设A小于B。则比較结果为-1* */public int compareTo(TextPair tp) {int cmp = first.compareTo(tp.first);if (cmp != 0) {return cmp;}//此时实现的是升序排列return second.compareTo(tp.second);}
}

下面为WholeFileInputFormat,其控制数据在mapreduce过程中不被切分

package web.hadoop;import java.io.IOException;  import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {  @Override  public RecordReader<Text, BytesWritable> createRecordReader(  InputSplit arg0, TaskAttemptContext arg1) throws IOException,  InterruptedException {  // TODO Auto-generated method stub  return new WholeFileRecordReader();  }  @Override  protected boolean isSplitable(JobContext context, Path filename) {  // TODO Auto-generated method stub  return false;  }
}  

下面为WholeFileRecordReader类

package web.hadoop;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {private FileSplit fileSplit;private FSDataInputStream fis;private Text key = null;private BytesWritable value = null;private boolean processed = false;@Overridepublic void close() throws IOException {// TODO Auto-generated method stub// fis.close();}@Overridepublic Text getCurrentKey() throws IOException, InterruptedException {// TODO Auto-generated method stubreturn this.key;}@Overridepublic BytesWritable getCurrentValue() throws IOException,InterruptedException {// TODO Auto-generated method stubreturn this.value;}@Overridepublic void initialize(InputSplit inputSplit, TaskAttemptContext tacontext)throws IOException, InterruptedException {fileSplit = (FileSplit) inputSplit;Configuration job = tacontext.getConfiguration();Path file = fileSplit.getPath();FileSystem fs = file.getFileSystem(job);fis = fs.open(file);}@Overridepublic boolean nextKeyValue() {if (key == null) {key = new Text();}if (value == null) {value = new BytesWritable();}if (!processed) {byte[] content = new byte[(int) fileSplit.getLength()];Path file = fileSplit.getPath();System.out.println(file.getName());key.set(file.getName());try {IOUtils.readFully(fis, content, 0, content.length);// value.set(content, 0, content.length);value.set(new BytesWritable(content));} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();} finally {IOUtils.closeStream(fis);}processed = true;return true;}return false;}@Overridepublic float getProgress() throws IOException, InterruptedException {// TODO Auto-generated method stubreturn processed ? fileSplit.getLength() : 0;}}

转载于:https://www.cnblogs.com/blfshiye/p/5279694.html

怎样通过Java程序提交yarn的mapreduce计算任务相关推荐

  1. Linux环境下 java程序提交spark任务到Yarn报错

    文章目录 摘要 情况1:JSON解析异常 情况2:java.lang.InstantiationException spark.sql.driver 情况3: 中kafka:java.lang.NoC ...

  2. java平均分排序_编写java程序,输入10个成绩,计算最高分,最低分,平均分,并按从小到大排序,最后统计高于平均分的人有多少?...

    展开全部 代码如下:import java.util.ArrayList; import java.util.List; import java.util.Scanner; public class  ...

  3. Java程序员用代码,计算最大公约数和最小公倍数小傅哥

    作者:小傅哥 博客:bugstack.cn 源码:github.com/fuzhengwei/- 沉淀.分享.成长,让自己和他人都能有所收获!

  4. Java程序员用代码,计算最大公约数和最小公倍数

    作者:小傅哥 博客:https://bugstack.cn 源码:https://github.com/fuzhengwei/java-algorithms 沉淀.分享.成长,让自己和他人都能有所收获 ...

  5. 简述java程序的工作过程_简述 Java 程序的开发过程。_学小易找答案

    [填空题]两个啮合齿轮在啮合区内,一个齿轮的轮齿用()绘制,另一个齿轮的轮齿被遮挡的部分用()绘制,被遮挡的部分也可以省略不画. [单选题]只有建立了统一的学校集体,才能在儿童的意识中唤起舆论的强大力 ...

  6. 从 WordCount 到 MapReduce 计算模型

    概述 虽然现在都在说大内存时代,不过内存的发展怎么也跟不上数据的步伐吧.所以,我们就要想办法减小数据量.这里说的减小可不是真的减小数据量,而是让数据分散开来.分开存储.分开计算.这就是 MapRedu ...

  7. 【Hadoop】伪分布式安装---MapReduce程序运行到YARN上,编写MapReduce程序---HDFS yarn

    在我的虚拟机(Linux)上安装的Hadoop是2.7.3版本的,安装过程可以参考这个网站,也可以下载. http://hadoop.apache.org/docs/r2.7.6/hadoop-pro ...

  8. 平台搭建---Spark提交应用程序---Spark Submit提交应用程序及yarn

    本部分来源,也可以到spark官网查看英文版. spark-submit 是在spark安装目录中bin目录下的一个shell脚本文件,用于在集群中启动应用程序(如***.py脚本):对于spark支 ...

  9. 经典MapReduce作业和Yarn上MapReduce作业运行机制

    一.经典MapReduce的作业运行机制 如下图是经典MapReduce作业的工作原理: 1.1 经典MapReduce作业的实体 经典MapReduce作业运行过程包含的实体: 客户端,提交MapR ...

最新文章

  1. N32-马哥Linux第一周学习
  2. codeforces 126B Password(哈希)
  3. flume taildir 启动异常LifecycleSupervisor$MonitorRunnable.run
  4. CSS中让一个div的高度随着另外个一个统计的div的高度变化而变化的代码
  5. java gsoap_gsoap c与java web之间传输字符串中文乱码问题 | 学步园
  6. 数据结构排序法之希尔排序法(Shell Sort)
  7. JavaWeb笔记04-解决GET与POST乱码问题
  8. [置顶] J2EE (八) 策略模式+装饰模式+反射(java)
  9. 11月份写什么,由你来决定——学堂需求调查
  10. WinForm开发(36)——WinForm开发微软官方最全综合详解指南(包括bug出现的可能原因分析及解决方法)
  11. 彻底弄懂typedef struct和struct定义结构体的区别
  12. 怎么测试ftp服务器上传文件,ftp服务器文件上传测试
  13. 【技巧总结】理解XXE从基础到盲打
  14. 文科如何晋级计算机职称,职称的档次是怎么确定来的?
  15. java 获取ipv4的地址_java 获取ip地址和网络接口
  16. catia制作物料明细_CATIA导出装配文件的部件列表BOM清单到Excel文件 | 坐倚北风
  17. 洛谷 P2141 珠心算测验
  18. Linkerd 2.10(Step by Step)—设置服务配置文件
  19. 小说阅读大全(安卓)最后一个绿色版。
  20. 开发一个电商网站大概多少钱

热门文章

  1. 3650m5服务器内存选择 ibm_硬盘回收中心 服务器硬盘回收 监控硬盘回收
  2. 如何上传文件及文件夹到IPFS
  3. 异步tcp通信——APM.Core 服务端概述
  4. 开机自启动redis
  5. android RadioGroup中设置selector后出现多个别选中的RadioButton的解决办法
  6. Hive partition prune Failed
  7. Enterprise Library +Caliburn.Micro+WPF CM框架下使用企业库验证,验证某一个属性,整个页面的文本框都变红的原因...
  8. javascript Array对象基础
  9. ASP.NET 页面对象模型
  10. 系统相机裁剪比例_从照相到摄影你只差这5个技巧!人像裁剪这4大原则你一定要了解...