1、倒序索引实现(本地)

单词-文件名 作为key, 1作为value

1、编写InverIndexStepOne

 package cn.itcast.bigdata.mr.inverindex;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class InverIndexStepOne {static class InverIndexStepOneMapper extends Mapper<LongWritable, Text, Text, IntWritable> {Text k = new Text();IntWritable v = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] words = line.split(" ");FileSplit inputSplit = (FileSplit) context.getInputSplit();String fileName = inputSplit.getPath().getName();for (String word : words) {k.set(word + "--" + fileName);context.write(k, v);}}}static class InverIndexStepOneReducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int count = 0;for (IntWritable value : values) {count += value.get();}context.write(key, new IntWritable(count));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(InverIndexStepOne.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.setInputPaths(job, new Path("D:/srcdata/inverindexinput"));FileOutputFormat.setOutputPath(job, new Path("D:/temp/out"));// FileInputFormat.setInputPaths(job, new Path(args[0]));// FileOutputFormat.setOutputPath(job, new Path(args[1]));job.setMapperClass(InverIndexStepOneMapper.class);job.setReducerClass(InverIndexStepOneReducer.class);job.waitForCompletion(true);}}

2、编写IndexStepTwo

package cn.itcast.bigdata.mr.inverindex;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class IndexStepTwo {public static class IndexStepTwoMapper extends Mapper<LongWritable, Text, Text, Text>{@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] files = line.split("--");context.write(new Text(files[0]), new Text(files[1]));}}public static class IndexStepTwoReducer extends Reducer<Text, Text, Text, Text>{@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {StringBuffer sb = new StringBuffer();for (Text text : values) {sb.append(text.toString().replace("\t", "-->") + "\t");}context.write(key, new Text(sb.toString()));}}public static void main(String[] args) throws Exception {if (args.length < 1 || args == null) {args = new String[]{"D:/temp/out/part-r-00000", "D:/temp/out2"};}Configuration config = new Configuration();Job job = Job.getInstance(config);job.setMapperClass(IndexStepTwoMapper.class);job.setReducerClass(IndexStepTwoReducer.class);//      job.setMapOutputKeyClass(Text.class);//     job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 1:0);}}

2、找到两两人的共同好友

1、编写SharedFriendsStepOne

package cn.itcast.bigdata.mr.fensi;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class SharedFriendsStepOne {static class SharedFriendsStepOneMapper extends Mapper<LongWritable, Text, Text, Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// A:B,C,D,F,E,OString line = value.toString();String[] person_friends = line.split(":");String person = person_friends[0];String friends = person_friends[1];for (String friend : friends.split(",")) {// 输出<好友,人>context.write(new Text(friend), new Text(person));}}}static class SharedFriendsStepOneReducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text friend, Iterable<Text> persons, Context context) throws IOException, InterruptedException {StringBuffer sb = new StringBuffer();for (Text person : persons) {sb.append(person).append(",");}context.write(friend, new Text(sb.toString()));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(SharedFriendsStepOne.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setMapperClass(SharedFriendsStepOneMapper.class);job.setReducerClass(SharedFriendsStepOneReducer.class);FileInputFormat.setInputPaths(job, new Path("D:/srcdata/friends"));FileOutputFormat.setOutputPath(job, new Path("D:/temp/out"));job.waitForCompletion(true);}

}

2、编写SharedFriendsStepTwo

package cn.itcast.bigdata.mr.fensi;import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class SharedFriendsStepTwo {static class SharedFriendsStepTwoMapper extends Mapper<LongWritable, Text, Text, Text> {// 拿到的数据是上一个步骤的输出结果// A I,K,C,B,G,F,H,O,D,// 友 人,人,人@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] friend_persons = line.split("\t");String friend = friend_persons[0];String[] persons = friend_persons[1].split(",");Arrays.sort(persons);for (int i = 0; i < persons.length - 1; i++) {for (int j = i + 1; j < persons.length; j++) {// 发出 <人-人,好友> ,这样,相同的“人-人”对的所有好友就会到同1个reduce中去context.write(new Text(persons[i] + "-" + persons[j]), new Text(friend));}}}}static class SharedFriendsStepTwoReducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text person_person, Iterable<Text> friends, Context context) throws IOException, InterruptedException {StringBuffer sb = new StringBuffer();for (Text friend : friends) {sb.append(friend).append(" ");}context.write(person_person, new Text(sb.toString()));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(SharedFriendsStepTwo.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setMapperClass(SharedFriendsStepTwoMapper.class);job.setReducerClass(SharedFriendsStepTwoReducer.class);FileInputFormat.setInputPaths(job, new Path("D:/temp/out/part-r-00000"));FileOutputFormat.setOutputPath(job, new Path("D:/temp/out2"));job.waitForCompletion(true);}}

3、web日志预处理

1、创建WebLogBean类

package cn.itcast.bigdata.mr.weblogwash;public class WebLogBean {private String remote_addr;// 记录客户端的ip地址private String remote_user;// 记录客户端用户名称,忽略属性"-"private String time_local;// 记录访问时间与时区private String request;// 记录请求的url与http协议private String status;// 记录请求状态;成功是200private String body_bytes_sent;// 记录发送给客户端文件主体内容大小private String http_referer;// 用来记录从那个页面链接访问过来的private String http_user_agent;// 记录客户浏览器的相关信息private boolean valid = true;// 判断数据是否合法public String getRemote_addr() {return remote_addr;}public void setRemote_addr(String remote_addr) {this.remote_addr = remote_addr;}public String getRemote_user() {return remote_user;}public void setRemote_user(String remote_user) {this.remote_user = remote_user;}public String getTime_local() {return time_local;}public void setTime_local(String time_local) {this.time_local = time_local;}public String getRequest() {return request;}public void setRequest(String request) {this.request = request;}public String getStatus() {return status;}public void setStatus(String status) {this.status = status;}public String getBody_bytes_sent() {return body_bytes_sent;}public void setBody_bytes_sent(String body_bytes_sent) {this.body_bytes_sent = body_bytes_sent;}public String getHttp_referer() {return http_referer;}public void setHttp_referer(String http_referer) {this.http_referer = http_referer;}public String getHttp_user_agent() {return http_user_agent;}public void setHttp_user_agent(String http_user_agent) {this.http_user_agent = http_user_agent;}public boolean isValid() {return valid;}public void setValid(boolean valid) {this.valid = valid;}@Overridepublic String toString() {StringBuilder sb = new StringBuilder();sb.append(this.valid);sb.append("\001").append(this.remote_addr);sb.append("\001").append(this.remote_user);sb.append("\001").append(this.time_local);sb.append("\001").append(this.request);sb.append("\001").append(this.status);sb.append("\001").append(this.body_bytes_sent);sb.append("\001").append(this.http_referer);sb.append("\001").append(this.http_user_agent);return sb.toString();
}
}

2、编写WebLogParser

package cn.itcast.bigdata.mr.weblogwash;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;public class WebLogParser {static SimpleDateFormat sd1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);static SimpleDateFormat sd2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");public static WebLogBean parser(String line) {WebLogBean webLogBean = new WebLogBean();String[] arr = line.split(" ");if (arr.length > 11) {webLogBean.setRemote_addr(arr[0]);webLogBean.setRemote_user(arr[1]);webLogBean.setTime_local(parseTime(arr[3].substring(1)));webLogBean.setRequest(arr[6]);webLogBean.setStatus(arr[8]);webLogBean.setBody_bytes_sent(arr[9]);webLogBean.setHttp_referer(arr[10]);if (arr.length > 12) {webLogBean.setHttp_user_agent(arr[11] + " " + arr[12]);} else {webLogBean.setHttp_user_agent(arr[11]);}if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP错误webLogBean.setValid(false);}} else {webLogBean.setValid(false);}return webLogBean;}public static String parseTime(String dt) {String timeString = "";try {Date parse = sd1.parse(dt);timeString = sd2.format(parse);} catch (ParseException e) {e.printStackTrace();}return timeString;}public static void main(String[] args) {WebLogParser wp = new WebLogParser();String parseTime = wp.parseTime("18/Sep/2013:06:49:48");System.out.println(parseTime);}}

3、编写WeblogPreProcess

package cn.itcast.bigdata.mr.weblogwash;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WeblogPreProcess {static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {Text k = new Text();NullWritable v = NullWritable.get();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();WebLogBean webLogBean = WebLogParser.parser(line);//可以插入一个静态资源过滤(.....)/*WebLogParser.filterStaticResource(webLogBean);*/if (!webLogBean.isValid())return;k.set(webLogBean.toString());context.write(k, v);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(WeblogPreProcess.class);job.setMapperClass(WeblogPreProcessMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path("C:/wordcount/weblog/input"));FileOutputFormat.setOutputPath(job, new Path("C:/wordcount/weblog/output"));job.waitForCompletion(true);}
}

4、同一个订单中最大金额的订单(使用groupingcomparator分组)

pationer:

1、编写OrderBean类

 package cn.itcastcat.bigdata.secondarysort;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;/*** @author duanhaitao@itcast.cn**/public class OrderBean implements WritableComparable<OrderBean>{private Text itemid;private DoubleWritable amount;public OrderBean() {}public OrderBean(Text itemid, DoubleWritable amount) {set(itemid, amount);}public void set(Text itemid, DoubleWritable amount) {this.itemid = itemid;this.amount = amount;}public Text getItemid() {return itemid;}public DoubleWritable getAmount() {return amount;}@Overridepublic int compareTo(OrderBean o) {int cmp = this.itemid.compareTo(o.getItemid());if (cmp == 0) {cmp = -this.amount.compareTo(o.getAmount());}return cmp;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(itemid.toString());out.writeDouble(amount.get());}@Overridepublic void readFields(DataInput in) throws IOException {String readUTF = in.readUTF();double readDouble = in.readDouble();this.itemid = new Text(readUTF);this.amount= new DoubleWritable(readDouble);}@Overridepublic String toString() {return itemid.toString() + "\t" + amount.get();}}

2、编写ItemidGroupingComparator

package cn.itcastcat.bigdata.secondarysort;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;/*** 利用reduce端的GroupingComparator来实现将一组bean看成相同的key* @author duanhaitao@itcast.cn**/
public class ItemidGroupingComparator extends WritableComparator {//传入作为key的bean的class类型,以及制定需要让框架做反射获取实例对象protected ItemidGroupingComparator() {super(OrderBean.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {OrderBean abean = (OrderBean) a;OrderBean bbean = (OrderBean) b;//比较两个bean时,指定只比较bean中的orderidreturn abean.getItemid().compareTo(bbean.getItemid());}}

3、编写ItemIdPartitioner

package cn.itcastcat.bigdata.secondarysort;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable>{@Overridepublic int getPartition(OrderBean bean, NullWritable value, int numReduceTasks) {//相同id的订单bean,会发往相同的partition//而且,产生的分区数,是会跟用户设置的reduce task数保持一致return (bean.getItemid().hashCode() & Integer.MAX_VALUE) % numReduceTasks;}}

4、编写SecondarySort

package cn.itcastcat.bigdata.secondarysort;import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import com.sun.xml.bind.v2.schemagen.xmlschema.List;/*** * @author duanhaitao@itcast.cn**/
public class SecondarySort {static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{OrderBean bean = new OrderBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] fields = StringUtils.split(line, ",");bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2])));context.write(bean, NullWritable.get());}}static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{//到达reduce时,相同id的所有bean已经被看成一组,且金额最大的那个一排在第一位@Overrideprotected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get());}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(SecondarySort.class);job.setMapperClass(SecondarySortMapper.class);job.setReducerClass(SecondarySortReducer.class);job.setOutputKeyClass(OrderBean.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path("c:/wordcount/gpinput"));FileOutputFormat.setOutputPath(job, new Path("c:/wordcount/gpoutput"));//在此设置自定义的Groupingcomparator类 job.setGroupingComparatorClass(ItemidGroupingComparator.class);//在此设置自定义的partitioner类job.setPartitionerClass(ItemIdPartitioner.class);job.setNumReduceTasks(2);job.waitForCompletion(true);}}

5、运营商流量日志增强(自定义outputformat)

根据不同的内容,输出到不同的输出目录里

1、编写DBLoader类

package cn.itcast.bigdata.mr.logenhance;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;public class DBLoader {public static void dbLoader(Map<String, String> ruleMap) throws Exception {Connection conn = null;Statement st = null;ResultSet res = null;try {Class.forName("com.mysql.jdbc.Driver");conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/urldb", "root", "root");st = conn.createStatement();res = st.executeQuery("select url,content from url_rule");while (res.next()) {ruleMap.put(res.getString(1), res.getString(2));}} finally {try{if(res!=null){res.close();}if(st!=null){st.close();}if(conn!=null){conn.close();}}catch(Exception e){e.printStackTrace();}}}}

2、编写LogEnhanceOutputFormat

package cn.itcast.bigdata.mr.logenhance;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** maptask或者reducetask在最终输出时,先调用OutputFormat的getRecordWriter方法拿到一个RecordWriter* 然后再调用RecordWriter的write(k,v)方法将数据写出* * @author* */
public class LogEnhanceOutputFormat extends FileOutputFormat<Text, NullWritable> {@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {FileSystem fs = FileSystem.get(context.getConfiguration());Path enhancePath = new Path("D:/temp/en/log.dat");Path tocrawlPath = new Path("D:/temp/crw/url.dat");FSDataOutputStream enhancedOs = fs.create(enhancePath);FSDataOutputStream tocrawlOs = fs.create(tocrawlPath);return new EnhanceRecordWriter(enhancedOs, tocrawlOs);}/*** 构造一个自己的recordwriter* * @author* */static class EnhanceRecordWriter extends RecordWriter<Text, NullWritable> {FSDataOutputStream enhancedOs = null;FSDataOutputStream tocrawlOs = null;public EnhanceRecordWriter(FSDataOutputStream enhancedOs, FSDataOutputStream tocrawlOs) {super();this.enhancedOs = enhancedOs;this.tocrawlOs = tocrawlOs;}@Overridepublic void write(Text key, NullWritable value) throws IOException, InterruptedException {String result = key.toString();// 如果要写出的数据是待爬的url,则写入待爬清单文件 /logenhance/tocrawl/url.datif (result.contains("tocrawl")) {tocrawlOs.write(result.getBytes());} else {// 如果要写出的数据是增强日志,则写入增强日志文件 /logenhance/enhancedlog/log.datenhancedOs.write(result.getBytes());}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {if (tocrawlOs != null) {tocrawlOs.close();}if (enhancedOs != null) {enhancedOs.close();}}}}

3、编写LogEnhance

package cn.itcast.bigdata.mr.logenhance;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class LogEnhance {static class LogEnhanceMapper extends Mapper<LongWritable, Text, Text, NullWritable> {Map<String, String> ruleMap = new HashMap<String, String>();Text k = new Text();NullWritable v = NullWritable.get();// 从数据库中加载规则信息倒ruleMap中@Overrideprotected void setup(Context context) throws IOException, InterruptedException {try {DBLoader.dbLoader(ruleMap);} catch (Exception e) {e.printStackTrace();}}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 获取一个计数器用来记录不合法的日志行数, 组名, 计数器名称Counter counter = context.getCounter("malformed", "malformedline");String line = value.toString();String[] fields = StringUtils.split(line, "\t");try {String url = fields[26];String content_tag = ruleMap.get(url);// 判断内容标签是否为空,如果为空,则只输出url到待爬清单;如果有值,则输出到增强日志if (content_tag == null) {k.set(url + "\t" + "tocrawl" + "\n");context.write(k, v);} else {k.set(line + "\t" + content_tag + "\n");context.write(k, v);}} catch (Exception exception) {counter.increment(1);}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(LogEnhance.class);job.setMapperClass(LogEnhanceMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 要控制不同的内容写往不同的目标路径,可以采用自定义outputformat的方法job.setOutputFormatClass(LogEnhanceOutputFormat.class);FileInputFormat.setInputPaths(job, new Path("D:/srcdata/webloginput/"));// 尽管我们用的是自定义outputformat,但是它是继承制fileoutputformat// 在fileoutputformat中,必须输出一个_success文件,所以在此还需要设置输出pathFileOutputFormat.setOutputPath(job, new Path("D:/temp/output/"));// 不需要reducerjob.setNumReduceTasks(0);job.waitForCompletion(true);System.exit(0);}}

6、自定义inputformat

小文件的优化有以下几种方式:

(1)在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
(2)在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
(3)在mapreduce处理时,可采用combineInputFormat提高效率

第二种的实现:

1、编写WholeFileRecordReader

package cn.itcast.bigdata.combinefile;import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;/*** * RecordReader的核心工作逻辑:* 通过nextKeyValue()方法去读取数据构造将返回的key   value* 通过getCurrentKey 和 getCurrentValue来返回上面构造好的key和value* * * @author**/
class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {private FileSplit fileSplit;private Configuration conf;private BytesWritable value = new BytesWritable();private boolean processed = false;@Overridepublic void initialize(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {this.fileSplit = (FileSplit) split;this.conf = context.getConfiguration();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if (!processed) {byte[] contents = new byte[(int) fileSplit.getLength()];Path file = fileSplit.getPath();FileSystem fs = file.getFileSystem(conf);FSDataInputStream in = null;try {in = fs.open(file);IOUtils.readFully(in, contents, 0, contents.length);value.set(contents, 0, contents.length);} finally {IOUtils.closeStream(in);}processed = true;return true;}return false;}@Overridepublic NullWritable getCurrentKey() throws IOException,InterruptedException {return NullWritable.get();}@Overridepublic BytesWritable getCurrentValue() throws IOException,InterruptedException {return value;}/*** 返回当前进度*/@Overridepublic float getProgress() throws IOException {return processed ? 1.0f : 0.0f;}@Overridepublic void close() throws IOException {// do nothing}
}

2、编写WholeFileInputFormat

 package cn.itcast.bigdata.combinefile;import java.io.IOException;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{@Overrideprotected boolean isSplitable(JobContext context, Path file) {return false;}@Overridepublic RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,InterruptedException {WholeFileRecordReader reader = new WholeFileRecordReader();reader.initialize(split, context);return reader;}}

3、编写SmallFilesToSequenceFileConverter

package cn.itcast.bigdata.combinefile;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class SmallFilesToSequenceFileConverter extends Configured implements Tool {static class SequenceFileMapper extendsMapper<NullWritable, BytesWritable, Text, BytesWritable> {private Text filenameKey;@Overrideprotected void setup(Context context) throws IOException,InterruptedException {InputSplit split = context.getInputSplit();Path path = ((FileSplit) split).getPath();filenameKey = new Text(path.toString());}@Overrideprotected void map(NullWritable key, BytesWritable value,Context context) throws IOException, InterruptedException {context.write(filenameKey, value);}}@Overridepublic int run(String[] args) throws Exception {Configuration conf = new Configuration();/*System.setProperty("HADOOP_USER_NAME", "hadoop");*/String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: combinefiles <in> <out>");System.exit(2);}Job job = Job.getInstance(conf,"combine small files to sequencefile");job.setJarByClass(SmallFilesToSequenceFileConverter.class);job.setInputFormatClass(WholeFileInputFormat.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);job.setMapperClass(SequenceFileMapper.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {args=new String[]{"c:/wordcount/smallinput","c:/wordcount/smallout"};int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(),args);System.exit(exitCode);}
}

7、mapreduce参数配置

大数据之Mapreduce(加强)相关推荐

  1. 图解大数据 | 应用Map-Reduce进行大数据统计@实操案例

    作者:韩信子@ShowMeAI 教程地址:http://www.showmeai.tech/tutorials/84 本文地址:http://www.showmeai.tech/article-det ...

  2. 大数据-hadoop MapReduce

    大数据-hadoop MapReduce 大数据-hadoop MapReduce MapReduce 思考:为什么叫MapReduce? 对应关系 实操案例 MR计算框架:计算向数据移动如何实现? ...

  3. mapreduce 丢数据_大数据之MapReduce详解

    1.什么是Map/Reduce,看下面的各种解释: (1)MapReduce是hadoop的核心组件之一,hadoop要分布式包括两部分,一是分布式文件系统hdfs,一部是分布式计算框,就是mapre ...

  4. mapreduce编程规范_大数据之MapReduce详解

    今天要讲的是MapReduce 目录 今天先总体说下MapReduce的相关知识,后续将会详细说明对应的shuffle.mr与yarn的联系.以及mr的join操作的等知识.以下内容全是个人学习后的见 ...

  5. 华为的大数据平台—MapReduce服务

    内容: 大数据相关知识,和目前主流的解决方案 MapReduce服务 如何使用 文章整理自:https://edu.huaweicloud.com/courses 大数据的开源解决方案:Hadoop ...

  6. 大数据开发 | MapReduce介绍

    1.  MapReduce 介绍 1.1MapReduce的作用 假设有一个计算文件中单词个数的需求,文件比较多也比较大,在单击运行的时候机器的内存受限,磁盘受限,运算能力受限,而一旦将单机版程序扩展 ...

  7. ibatis 存储过程 结果集 map_大数据之MapReduce shuffle过程

    一.MapReduce计算模型 我们知道MapReduce计算模型主要由三个阶段构成:Map.shuffle.Reduce. Map是映射,负责数据的过滤分法,将原始数据转化为键值对:Reduce是合 ...

  8. 机器学习——大数据与MapReduce

    MapReduce是一个分布式计算框架 优点:可在短时间内完成大量工作 缺点:算法必须经过重写,需要对系统工程有一定的理解 使用数据类型:数值型和标称型数据 MapReduce在大量节点组成的集群上运 ...

  9. 大数据学习——MapReduce学习——字符统计WordCount

    操作背景 jdk的版本为1.8以上 ubuntu12 hadoop2.5伪分布 安装 Hadoop-Eclipse-Plugin 要在 Eclipse 上编译和运行 MapReduce 程序,需要安装 ...

最新文章

  1. 欧拉函数求一个数倒数的循环节长度
  2. 中的 隐藏鼠标菜单_如何在鼠标右键菜单中添加自定义菜单?工效率提升一倍...
  3. Writing for Myself.part2
  4. HDU4389(数位DP)
  5. ORACLE 限制特定IP访问数据库 访问白名单
  6. 程序员面试金典 - 面试题 16.26. 计算器(栈)
  7. 【bzoj2338】[HNOI2011]数矩形 计算几何
  8. Java日期与时间的处理/Date,String,Calendar转换
  9. 深入理解计算机系统(1)--hello world程序的生命周期
  10. Profession
  11. servlet 之forward和sendRedirect跳转
  12. 安装了最新版本的java 用友nc打不开_用友NC系统常见问题解决方法
  13. cam350菜单怎么切换成中文_CAM350菜单中文详解
  14. 在桌面计算机找不到光盘驱动,如何弹出DVD驱动器,没有按钮,我在计算机中找不到DVD驱动器...
  15. TMS570LS1224PWM的生成及捕获
  16. cocos creator快速上手《摘星星》官方教程续|星月爸爸
  17. 1046错误mysql_数据库实例:如何解决mysql 1046错误
  18. 小菜鸟学Python记
  19. 从今天起,做一个闲人,喂马、劈柴,周游世界。。。
  20. macbook pro下安装三系统

热门文章

  1. unity3d游戏开发之我的游戏理念
  2. react 高阶组件
  3. python 视频存储
  4. angularjs 连接mysql_使用AngularJS连接到数据库
  5. 旅行青蛙前期怎么玩 新手入门必看攻略
  6. Deconvolutional Network [deconv] 研究
  7. 学网络工程师好吗?有钱途不?
  8. 使用Clang作为编译器 —— Clang 语言扩展
  9. C#——if 和 Switch 的区别,与在内存中的占有量
  10. 在CSDN的六载时光,2021年博客之星由谁决定