转载:http://blog.csdn.net/hellozpc/article/details/45771933 https://my.oschina.net/leejun2005/blog/133424

1. 概述

我们在设置MapReduce输入格式的时候,会调用这样一条语句:

job.setInputFormatClass(KeyValueTextInputFormat.class);

这条语句保证了输入文件会按照我们预设的格式被读取。KeyValueTextInputFormat即为我们设定的数据读取格式。

所有的输入格式类都继承自InputFormat,这是一个抽象类。其子类有例如专门用于读取普通文件的FileInputFormat,还有用来读取数据库的DBInputFormat等等。相关类图简单画出如下:

2. InputFormat

从InputFormat类图看,InputFormat抽象类仅有两个抽象方法:

 List<InputSplit> getSplits(), 获取由输入文件计算出输入分片(InputSplit),解决数据或文件分割成片问题。RecordReader<K,V> createRecordReader(),创建RecordReader,从InputSplit中读取数据,解决读取分片中数据问题。

在后面说到InputSplits的时候,会介绍在getSplits()时需要验证输入文件是否可分割、文件存储时分块的大小和文件大小等因素,所以总体来说,通过InputFormat,Mapreduce框架可以做到:

  验证作业输入的正确性将输入文件切割成逻辑分片(InputSplit),一个InputSplit将会被分配给一个独立的MapTask提供RecordReader实现,读取InputSplit中的“K-V对”供Mapper使用

InputFormat抽象类源码也很简单,如下供参考(文章格式考虑,删除了部分注释,添加了部分中文注释):

public abstract class InputFormat<K, V> {/*** 仅仅是逻辑分片,并没有物理分片,所以每一个分片类似于这样一个元组 <input-file-path, start, offset>*/public abstract List<InputSplit> getSplits(JobContext context)throws IOException, InterruptedException;/*** Create a record reader for a given split.*/public abstract RecordReader<K, V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException,InterruptedException;}

不同的InputFormat会各自实现不同的文件读取方式以及分片方式,每个输入分片会被单独的map task作为数据源。下面详细介绍输入分片(inputSplit)是什么。

3.InputSplit

Mappers的输入是一个一个的输入分片,称InputSplit。看源码可知,InputSplit也是一个抽象类,它在逻辑上包含了提供给处理这个InputSplit的Mapper的所有K-V对。

public abstract class InputSplit {/*** 获取Split的大小,支持根据size对InputSplit排序.*/public abstract long getLength() throws IOException, InterruptedException;/*** 获取存储该分片的数据所在的节点位置.*/public abstract String[] getLocations() throws IOException, InterruptedException;
}

下面深入看一个InputSplit的子类:FileSplit类

public class FileSplit extends InputSplit implements Writable {private Path file;private long start;private long length;private String[] hosts;/*** Constructs a split with host information* * @param file*            the file name* @param start*            the position of the first byte in the file to process* @param length*            the number of bytes in the file to process* @param hosts*            the list of hosts containing the block, possibly null*/public FileSplit(Path file, long start, long length, String[] hosts) {this.file = file;this.start = start;this.length = length;this.hosts = hosts;}/** The number of bytes in the file to process. */@Overridepublic long getLength() {return length;}@Overridepublic String[] getLocations() throws IOException {if (this.hosts == null) {return new String[] {};} else {return this.hosts;}}// 略掉部分方法
}

从源码中可以看出,FileSplit有四个属性:文件路径,分片起始位置,分片长度和存储分片的hosts。用这四项数据,就可以计算出提供给每个Mapper的分片数据。在InputFormat的getSplit()方法中构造分片,分片的四个属性会通过调用FileSplit的Constructor设置。

再看一个InputSplit的子类:CombineFileSplit。源码如下:

public class CombineFileSplit extends InputSplit implements Writable {private Path[] paths;private long[] startoffset;private long[] lengths;private String[] locations;private long totLength;public CombineFileSplit(Path[] files, long[] start, long[] lengths,String[] locations) {initSplit(files, start, lengths, locations);}private void initSplit(Path[] files, long[] start, long[] lengths,String[] locations) {this.startoffset = start;this.lengths = lengths;this.paths = files;this.totLength = 0;this.locations = locations;for (long length : lengths) {totLength += length;}}public long getLength() {return totLength;}/** Returns all the Paths where this input-split resides */public String[] getLocations() throws IOException {return locations;}//省略了部分构造函数和方法,深入学习请阅读源文件
}

为什么介绍该类呢,因为接下来要学习《Hadoop学习(五) – 小文件处理》,深入理解该类,将有助于该节学习。

上面我们介绍的FileSplit对应的是一个输入文件,也就是说,如果用FileSplit对应的FileInputFormat作为输入格式,那么即使文件特别小,也是作为一个单独的InputSplit来处理,而每一个InputSplit将会由一个独立的Mapper Task来处理。在输入数据是由大量小文件组成的情形下,就会有同样大量的InputSplit,从而需要同样大量的Mapper来处理,大量的Mapper Task创建销毁开销将是巨大的,甚至对集群来说,是灾难性的!

CombineFileSplit是针对小文件的分片,它将一系列小文件封装在一个InputSplit内,这样一个Mapper就可以处理多个小文件。可以有效的降低进程开销。与FileSplit类似,CombineFileSplit同样包含文件路径,分片起始位置,分片大小和分片数据所在的host列表四个属性,只不过这些属性不再是一个值,而是一个列表。

需要注意的一点是,CombineFileSplit的getLength()方法,返回的是这一系列数据的数据的总长度。

现在,我们已深入的了解了InputSplit的概念,看了其源码,知道了其属性。我们知道数据分片是在InputFormat中实现的,接下来,我们就深入InputFormat的一个子类,FileInputFormat看看分片是如何进行的。
4. FileInputFormat

FileInputFormat中,分片方法代码及详细注释如下,就不再详细解释该方法:

public List<InputSplit> getSplits(JobContext job) throws IOException {// 首先计算分片的最大和最小值。这两个值将会用来计算分片的大小。// 由源码可知,这两个值可以通过mapred.min.split.size和mapred.max.split.size来设置long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));long maxSize = getMaxSplitSize(job);// splits链表用来存储计算得到的输入分片结果List<InputSplit> splits = new ArrayList<InputSplit>();// files链表存储由listStatus()获取的输入文件列表,listStatus比较特殊,我们在下面详细研究List<FileStatus> files = listStatus(job);for (FileStatus file : files) {Path path = file.getPath();FileSystem fs = path.getFileSystem(job.getConfiguration());long length = file.getLen();// 获取该文件所有的block信息列表[hostname, offset, length]BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,length);// 判断文件是否可分割,通常是可分割的,但如果文件是压缩的,将不可分割// 是否分割可以自行重写FileInputFormat的isSplitable来控制if ((length != 0) && isSplitable(job, path)) {long blockSize = file.getBlockSize();// 计算分片大小// 即 Math.max(minSize, Math.min(maxSize, blockSize));// 也就是保证在minSize和maxSize之间,且如果minSize<=blockSize<=maxSize,则设为blockSizelong splitSize = computeSplitSize(blockSize, minSize, maxSize);long bytesRemaining = length;// 循环分片。// 当剩余数据与分片大小比值大于Split_Slop时,继续分片, 小于等于时,停止分片while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {int blkIndex = getBlockIndex(blkLocations, length- bytesRemaining);splits.add(new FileSplit(path, length - bytesRemaining,splitSize, blkLocations[blkIndex].getHosts()));bytesRemaining -= splitSize;}// 处理余下的数据if (bytesRemaining != 0) {splits.add(new FileSplit(path, length - bytesRemaining,bytesRemaining,blkLocations[blkLocations.length - 1].getHosts()));}} else if (length != 0) {// 不可split,整块返回splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));} else {// 对于长度为0的文件,创建空Hosts列表,返回splits.add(new FileSplit(path, 0, length, new String[0]));}}// 设置输入文件数量job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());return splits;
}

在getSplits()方法中,我们提到了一个方法,listStatus(),我们先来看一下这个方法:

protected List<FileStatus> listStatus(JobContext job) throws IOException {// 省略部分代码...List<PathFilter> filters = new ArrayList<PathFilter>();filters.add(hiddenFileFilter);PathFilter jobFilter = getInputPathFilter(job);if (jobFilter != null) {filters.add(jobFilter);}// 创建了一个MultiPathFilter,其内部包含了两个PathFilter// 一个为过滤隐藏文件的Filter,一个为用户自定义Filter(如果制定了)PathFilter inputFilter = new MultiPathFilter(filters);for (int i = 0; i < dirs.length; ++i) {Path p = dirs[i];FileSystem fs = p.getFileSystem(job.getConfiguration());FileStatus[] matches = fs.globStatus(p, inputFilter);if (matches == null) {errors.add(new IOException("Input path does not exist: " + p));} else if (matches.length == 0) {errors.add(new IOException("Input Pattern " + p+ " matches 0 files"));} else {for (FileStatus globStat : matches) {if (globStat.isDir()) {for (FileStatus stat : fs.listStatus(globStat.getPath(), inputFilter)) {result.add(stat);}} else {result.add(globStat);}}}}// 省略部分代码
}

NLineInputFormat是一个很有意思的FileInputFormat的子类,有时间可以了解一下。

5. PathFilter

PathFilter文件筛选器接口,使用它我们可以控制哪些文件要作为输入,哪些不作为输入。PathFilter有一个accept(Path)方法,当接收的Path要被包含进来,就返回true,否则返回false。可以通过设置mapred.input.pathFilter.class来设置用户自定义的PathFilter。

public interface PathFilter {/*** Tests whether or not the specified abstract pathname should be* included in a pathname list.** @param  path  The abstract pathname to be tested* @return  <code>true</code> if and only if <code>pathname</code>*          should be included*/boolean accept(Path path);
}FileInputFormat类有hiddenFileFilter属性:private static final PathFilter hiddenFileFilter = new PathFilter() {public boolean accept(Path p) {String name = p.getName();return !name.startsWith("_") && !name.startsWith(".");}
};hiddenFileFilter过滤掉隐藏文件。FileInputFormat类还有一个内部类:private static class MultiPathFilter implements PathFilter {private List<PathFilter> filters;public MultiPathFilter(List<PathFilter> filters) {this.filters = filters;}public boolean accept(Path path) {for (PathFilter filter : filters) {if (!filter.accept(path)) {return false;}}return true;}
}

MultiPathFilter类类似于一个PathFilter代理,其内部有一个PathFilter list属性,只有符合其内部所有filter的路径,才被作为输入。在FileInputFormat类中,它被listStatus()方法调用,而listStatus()又被getSplits()方法调用来获取输入文件,也即实现了在获取输入分片前进行文件过滤。

至此,我们已经利用PathFilter过滤了文件,利用FileInputFormat 的getSplits方法,计算出了Mapreduce的Map的InputSplit。作业的输入分片有了,而这些分片,是怎么被Map读取的呢?

这由InputFormat中的另一个方法createRecordReader()来负责。FileInputFormat没有对于这个方法的实现,而是交给子类自行去实现它。

6. RecordReader

RecordReader将读入到Map的数据拆分成

public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {/*** 由一个InputSplit初始化*/public abstract void initialize(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException;/*** 顾名思义,读取分片下一个<key, value>对*/public abstract boolean nextKeyValue() throws IOException,InterruptedException;/*** Get the current key*/public abstract KEYIN getCurrentKey() throws IOException,InterruptedException;/*** Get the current value.*/public abstract VALUEIN getCurrentValue() throws IOException,InterruptedException;/*** 跟踪读取分片的进度*/public abstract float getProgress() throws IOException,InterruptedException;/*** Close the record reader.*/public abstract void close() throws IOException;
}

从源码可以看出,一个RecordReader主要来完成这几项功能。接下来,通过一个具体的RecordReader实现类,来详细了解一下各功能的具体操作。

public class LineRecordReader extends RecordReader<LongWritable, Text> {private CompressionCodecFactory compressionCodecs = null;private long start;private long pos;private long end;private LineReader in;private int maxLineLength;private LongWritable key = null;private Text value = null;// initialize函数即对LineRecordReader的一个初始化// 主要是计算分片的始末位置,打开输入流以供读取K-V对,处理分片经过压缩的情况等public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException {FileSplit split = (FileSplit) genericSplit;Configuration job = context.getConfiguration();this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);start = split.getStart();end = start + split.getLength();final Path file = split.getPath();compressionCodecs = new CompressionCodecFactory(job);final CompressionCodec codec = compressionCodecs.getCodec(file);// 打开文件,并定位到分片读取的起始位置FileSystem fs = file.getFileSystem(job);FSDataInputStream fileIn = fs.open(split.getPath());boolean skipFirstLine = false;if (codec != null) {// 文件是压缩文件的话,直接打开文件in = new LineReader(codec.createInputStream(fileIn), job);end = Long.MAX_VALUE;} else {//if (start != 0) {skipFirstLine = true;--start;// 定位到偏移位置,下次读取就会从便宜位置开始fileIn.seek(start);}in = new LineReader(fileIn, job);}if (skipFirstLine) { // skip first line and re-establish "start".start += in.readLine(new Text(), 0,(int) Math.min((long) Integer.MAX_VALUE, end - start));}this.pos = start;}public boolean nextKeyValue() throws IOException {if (key == null) {key = new LongWritable();}key.set(pos);// key即为偏移量if (value == null) {value = new Text();}int newSize = 0;while (pos < end) {newSize = in.readLine(value, maxLineLength,Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),maxLineLength));// 读取的数据长度为0,则说明已读完if (newSize == 0) {break;}pos += newSize;// 读取的数据长度小于最大行长度,也说明已读取完毕if (newSize < maxLineLength) {break;}// 执行到此处,说明该行数据没读完,继续读入}if (newSize == 0) {key = null;value = null;return false;} else {return true;}}// 省略了部分方法
}

数据从InputSplit分片中读出已经解决,但是RecordReader是如何被Mapreduce框架利用的呢?我们先看一下Mapper类

7. Mapper

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {public class Context extends MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {public Context(Configuration conf, TaskAttemptID taskid,RecordReader<KEYIN, VALUEIN> reader,RecordWriter<KEYOUT, VALUEOUT> writer,OutputCommitter committer, StatusReporter reporter,InputSplit split) throws IOException, InterruptedException {super(conf, taskid, reader, writer, committer, reporter, split);}}/*** 预处理,仅在map task启动时运行一次*/protected void setup(Context context) throws IOException,InterruptedException {}/*** 对于InputSplit中的每一对<key, value>都会运行一次*/@SuppressWarnings("unchecked")protected void map(KEYIN key, VALUEIN value, Context context)throws IOException, InterruptedException {context.write((KEYOUT) key, (VALUEOUT) value);}/*** 扫尾工作,比如关闭流等*/protected void cleanup(Context context) throws IOException,InterruptedException {}/*** map task的驱动器*/public void run(Context context) throws IOException, InterruptedException {setup(context);while (context.nextKeyValue()) {map(context.getCurrentKey(), context.getCurrentValue(), context);}cleanup(context);}
}

重点看一下Mapper.class中的run()方法,它相当于map task的驱动。

run()方法首先调用setup()进行初始操作
然后循环对每个从context.nextKeyValue()获取的“K-V对”调用map()函数进行处理
最后调用cleanup()做最后的处理

事实上,content.nextKeyValue()就是使用了相应的RecordReader来获取“K-V对”。Mapper.class中的Context类,它继承自MapContext类,使用一个RecordReader进行构造。下面我们再看这个MapContext。

public class MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extendsTaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {private RecordReader<KEYIN, VALUEIN> reader;private InputSplit split;public MapContext(Configuration conf, TaskAttemptID taskid,RecordReader<KEYIN, VALUEIN> reader,RecordWriter<KEYOUT, VALUEOUT> writer, OutputCommitter committer,StatusReporter reporter, InputSplit split) {super(conf, taskid, writer, committer, reporter);this.reader = reader;this.split = split;}/*** Get the input split for this map.*/public InputSplit getInputSplit() {return split;}@Overridepublic KEYIN getCurrentKey() throws IOException, InterruptedException {return reader.getCurrentKey();}@Overridepublic VALUEIN getCurrentValue() throws IOException, InterruptedException {return reader.getCurrentValue();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {return reader.nextKeyValue();}}

从MapContent类中的方法可见,content.getCurrentKey(),content.getCurrentValue()以及nextKeyValue(),其实都是对RecordReader方法的封装,即MapContext是直接使用传入的RecordReader来对InputSplit进行“K-V对”读取的。

至此,我们已经清楚的知道Mapreduce的输入文件是如何被过滤、读取、分片、读出“K-V对”,然后交给Mapper类来处理的。

自定义 hadoop MapReduce InputFormat 切分输入文件

在上一篇中,我们实现了按 cookieId 和 time 进行二次排序,现在又有新问题:假如我需要按 cookieId 和 cookieId&time 的组合进行分析呢?此时最好的办法是自定义 InputFormat,让 mapreduce 一次读取一个 cookieId 下的所有记录,然后再按 time 进行切分 session,逻辑伪码如下:

for OneSplit in MyInputFormat.getSplit() // OneSplit 是某个 cookieId 下的所有记录for session in OneSplit // session 是按 time 把 OneSplit 进行了二次分割for line in session // line 是 session 中的每条记录,对应原始日志的某条记录

1、原理:

InputFormat是MapReduce中一个很常用的概念,它在程序的运行中到底起到了什么作用呢?
InputFormat其实是一个接口,包含了两个方法:


public interface InputFormat<K, V> {InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context)  throws IOException;
}

这两个方法有分别完成着以下工作:方法 getSplits 将输入数据切分成splits,splits的个数即为map tasks的个数,splits的大小默认为块大小,即64M方法 方法 getRecordReader 将每个 split 解析成records, 再依次将record解析成K,V对
也就是说 InputFormat完成以下工作:
InputFile –> splits –> K,V

系统常用的 InputFormat 又有哪些呢?

其中Text InputFormat便是最常用的,它的

InputFormat(interface), FileInputFormat(abstract class), TextInputFormat(class),
RecordReader (interface), Line RecordReader(class)的关系FileInputFormat implements  InputFormatTextInputFormat extends  FileInputFormatTextInputFormat.get RecordReader calls  Line RecordReaderLine RecordReader  implements  RecordReader

对于InputFormat接口,上面已经有详细的描述
再看看 FileInputFormat,它实现了 InputFormat接口中的 getSplits方法,而将 getRecordReader与isSplitable留给具体类(如 TextInputFormat )实现, isSplitable方法通常不用修改,所以只需要在自定义的 InputFormat中实现
getRecordReader方法即可,而该方法的核心是调用 Line RecordReader(即由LineRecorderReader类来实现 ” 将每个s plit解析成records, 再依次将record解析成

  public interface RecordReader<K, V> {boolean   next(K key, V value) throws IOException;K   createKey();V   createValue();long   getPos() throws IOException;public void   close() throws IOException;float   getProgress() throws IOException;
}

因此自定义InputFormat的核心是自定义一个实现接口RecordReader类似于LineRecordReader的类,该类的核心也正是重写接口RecordReader中的几大方法,
定义一个InputFormat的核心是定义一个类似于LineRecordReader的,自己的RecordReader
2、代码:

package MyInputFormat;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class TrackInputFormat extends FileInputFormat<LongWritable, Text> {@SuppressWarnings("deprecation")@Overridepublic RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {return new TrackRecordReader();}@Overrideprotected boolean isSplitable(JobContext context, Path file) {CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file);return codec == null;}}
package MyInputFormat;import java.io.IOException;
import java.io.InputStream;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;/*** Treats keys as offset in file and value as line.* * @deprecated Use*             {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader}*             instead.*/
public class TrackRecordReader extends RecordReader<LongWritable, Text> {private static final Log LOG = LogFactory.getLog(TrackRecordReader.class);private CompressionCodecFactory compressionCodecs = null;private long start;private long pos;private long end;private NewLineReader in;private int maxLineLength;private LongWritable key = null;private Text value = null;// ----------------------// 行分隔符,即一条记录的分隔符private byte[] separator = "END\n".getBytes();// --------------------public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException {FileSplit split = (FileSplit) genericSplit;Configuration job = context.getConfiguration();this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);start = split.getStart();end = start + split.getLength();final Path file = split.getPath();compressionCodecs = new CompressionCodecFactory(job);final CompressionCodec codec = compressionCodecs.getCodec(file);FileSystem fs = file.getFileSystem(job);FSDataInputStream fileIn = fs.open(split.getPath());boolean skipFirstLine = false;if (codec != null) {in = new NewLineReader(codec.createInputStream(fileIn), job);end = Long.MAX_VALUE;} else {if (start != 0) {skipFirstLine = true;this.start -= separator.length;//// --start;fileIn.seek(start);}in = new NewLineReader(fileIn, job);}if (skipFirstLine) { // skip first line and re-establish "start".start += in.readLine(new Text(), 0,(int) Math.min((long) Integer.MAX_VALUE, end - start));}this.pos = start;}public boolean nextKeyValue() throws IOException {if (key == null) {key = new LongWritable();}key.set(pos);if (value == null) {value = new Text();}int newSize = 0;while (pos < end) {newSize = in.readLine(value, maxLineLength,Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),maxLineLength));if (newSize == 0) {break;}pos += newSize;if (newSize < maxLineLength) {break;}LOG.info("Skipped line of size " + newSize + " at pos "+ (pos - newSize));}if (newSize == 0) {key = null;value = null;return false;} else {return true;}}@Overridepublic LongWritable getCurrentKey() {return key;}@Overridepublic Text getCurrentValue() {return value;}/*** Get the progress within the split*/public float getProgress() {if (start == end) {return 0.0f;} else {return Math.min(1.0f, (pos - start) / (float) (end - start));}}public synchronized void close() throws IOException {if (in != null) {in.close();}}public class NewLineReader {private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;private int bufferSize = DEFAULT_BUFFER_SIZE;private InputStream in;private byte[] buffer;private int bufferLength = 0;private int bufferPosn = 0;public NewLineReader(InputStream in) {this(in, DEFAULT_BUFFER_SIZE);}public NewLineReader(InputStream in, int bufferSize) {this.in = in;this.bufferSize = bufferSize;this.buffer = new byte[this.bufferSize];}public NewLineReader(InputStream in, Configuration conf)throws IOException {this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));}public void close() throws IOException {in.close();}public int readLine(Text str, int maxLineLength, int maxBytesToConsume)throws IOException {str.clear();Text record = new Text();int txtLength = 0;long bytesConsumed = 0L;boolean newline = false;int sepPosn = 0;do {// 已经读到buffer的末尾了,读下一个bufferif (this.bufferPosn >= this.bufferLength) {bufferPosn = 0;bufferLength = in.read(buffer);// 读到文件末尾了,则跳出,进行下一个文件的读取if (bufferLength <= 0) {break;}}int startPosn = this.bufferPosn;for (; bufferPosn < bufferLength; bufferPosn++) {// 处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题)if (sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]) {sepPosn = 0;}// 遇到行分隔符的第一个字符if (buffer[bufferPosn] == separator[sepPosn]) {bufferPosn++;int i = 0;// 判断接下来的字符是否也是行分隔符中的字符for (++sepPosn; sepPosn < separator.length; i++, sepPosn++) {// buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半if (bufferPosn + i >= bufferLength) {bufferPosn += i - 1;break;}// 一旦其中有一个字符不相同,就判定为不是分隔符if (this.buffer[this.bufferPosn + i] != separator[sepPosn]) {sepPosn = 0;break;}}// 的确遇到了行分隔符if (sepPosn == separator.length) {bufferPosn += i;newline = true;sepPosn = 0;break;}}}int readLength = this.bufferPosn - startPosn;bytesConsumed += readLength;// 行分隔符不放入块中if (readLength > maxLineLength - txtLength) {readLength = maxLineLength - txtLength;}if (readLength > 0) {record.append(this.buffer, startPosn, readLength);txtLength += readLength;// 去掉记录的分隔符if (newline) {str.set(record.getBytes(), 0, record.getLength()- separator.length);}}} while (!newline && (bytesConsumed < maxBytesToConsume));if (bytesConsumed > (long) Integer.MAX_VALUE) {throw new IOException("Too many bytes before newline: "+ bytesConsumed);}return (int) bytesConsumed;}public int readLine(Text str, int maxLineLength) throws IOException {return readLine(str, maxLineLength, Integer.MAX_VALUE);}public int readLine(Text str) throws IOException {return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);}}
}
package MyInputFormat;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class TestMyInputFormat {public static class MapperClass extends Mapper<LongWritable, Text, Text, Text> {public void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException {System.out.println("key:\t " + key);System.out.println("value:\t " + value);System.out.println("-------------------------");}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration conf = new Configuration();Path outPath = new Path("/hive/11");FileSystem.get(conf).delete(outPath, true);Job job = new Job(conf, "TestMyInputFormat");job.setInputFormatClass(TrackInputFormat.class);job.setJarByClass(TestMyInputFormat.class);job.setMapperClass(TestMyInputFormat.MapperClass.class);job.setNumReduceTasks(0);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(args[0]));org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, outPath);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

3、测试数据:
cookieId   time   url    cookieOverFlag
1       a   1_hao123
1       a   1_baidu
1       b   1_google   2END
2       c   2_google
2       c   2_hao123
2       c   2_google   1END
3       a   3_baidu
3       a   3_sougou
3       b   3_soso     2END
4、结果:

key:     0
value:   1  a   1_hao123
1   a    1_baidu
1   b    1_google   2
-------------------------
key:     47
value:   2  c    2_google
2   c    2_hao123
2   c    2_google   1
-------------------------
key:     96
value:   3  a    3_baidu
3   a    3_sougou
3   b    3_soso 2
-------------------------

FileInputFormat详解相关推荐

  1. Mapreduce源码分析(一):FileInputFormat切片机制,源码详解

    FileInputFormat切片机制,源码详解 1.InputFormat:抽象类 只有两个抽象方法 public abstract List<InputSplit> getSplits ...

  2. Hadoop学习之Mapreduce执行过程详解

    一.MapReduce执行过程 MapReduce运行时,首先通过Map读取HDFS中的数据,然后经过拆分,将每个文件中的每行数据分拆成键值对,最后输出作为Reduce的输入,大体执行流程如下图所示: ...

  3. Hadoop 新 MapReduce 框架 Yarn 详解

    Hadoop MapReduceV2(Yarn) 框架简介 原 Hadoop MapReduce 框架的问题 对于业界的大数据存储及分布式处理系统来说,Hadoop 是耳熟能详的卓越开源分布式文件存储 ...

  4. Hadoop之InputFormat数据输入详解

    Hadoop之InputFormat数据输入详解 Job提交流程和切片源码详解 FileInputFormat切片机制 CombineTextInputFormat切片机制 InputFormat接口 ...

  5. spark 调度模块详解及源码分析

    spark 调度模块详解及源码分析 @(SPARK)[spark] spark 调度模块详解及源码分析 一概述 一三个主要的类 1class DAGScheduler 2trait TaskSched ...

  6. MapReduce:Shuffle过程详解

    MapReduce:Shuffle过程详解 1.Map任务处理 1.1 读取HDFS中的文件.每一行解析成一个<k,v>.每一个键值对调用一次map函数.                & ...

  7. Spark核心编程系列(一)——RDD详解

    目录 Spark核心编程系列--RDD详解(一) RDD概念 RDD与IO之间的关系 RDD的核心属性 RDD执行原理 基础编程 RDD创建 RDD的并行度与分区 参考 Spark核心编程系列--RD ...

  8. hadoop生态圈详解

    1.hadoop架构及作用 (1)Common hadoop核心组件,基本jar包依赖,公共工具程序 (2)HDFS(Hadoop Distributed File System) 分布式文件系统,解 ...

  9. Hadoop第五天--HDFS详解

    文章部分选自:https://blog.csdn.net/gwd1154978352/article/details/81095592 自己的话:层楼终究误少年,自由早晚乱余生 眼泪你别问,joker ...

最新文章

  1. 半导体物理与器件pdf施敏_西湖大学工学院先进固态半导体实验室行政助理招聘,户籍专业不限!...
  2. Activity向Fragment传值
  3. IIS6、7添加反向代理的步骤
  4. HTML基础(part1)--预备知识
  5. --4、实验室设备管理(表)
  6. 请尽快升级到 Windows Vista - 因为你需要
  7. MSN再次出现大面积瘫痪 微软称尚未查明原因
  8. Mybatis一对一和一对多配置
  9. PingingLab传世经典系列《CCNA完全配置宝典》-2.9 OSPF基本配置
  10. ThinkPHP5集成JS-SDK实现微信自定义分享功能
  11. html5 ios 音乐加载失败,Cocos2d-JS HTML5的cc.audioEngine音乐音效在iOS设备上不响应静音键的问题...
  12. 计算机win7音量调节,如何解决Win7电脑声音小的问题?
  13. 2021昆明icpc B 状压+期望dp,一点几何模拟
  14. Excel怎么批量导入图片
  15. @OneToMany---ManyToOne
  16. 网管系统 php,智和网管平台-真正开放源码的网元管理系统(EMS)
  17. python 数据分析 14天_python数据分析简介
  18. 前端 给确定按钮加一个确认提示
  19. BT下载的工作原理及小知识
  20. Retrofit 最简单的快速入门及自己封装

热门文章

  1. 学认地形图坐标网和注记
  2. 十天学会php之第十天
  3. day018 mysql单表多表
  4. win10+python+Flask设置SSL证书,使用https服务
  5. MySQL(学习笔记)——数据库视图、触发器及存储过程
  6. 09 【实操篇-磁盘查看和分区类】
  7. 程序员的专业主义精神——评《程序员的职业素养》
  8. 软件测试:HttpUnit简单介绍和案例应用
  9. 滤波电容,去耦电容,旁路电容的区别和应用
  10. Android开发实用小工具十三——BMI指数计算工具