补充:hive 读取数据的机制:

1、 首先用 InputFormat<默认是:org.apache.hadoop.mapred.TextInputFormat >的一个具体实 现类读入文件数据,返回一条一条的记录(可以是行,或者是你逻辑中的“行”)

2、 然后利用 SerDe<默认:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe>的一个具体 实现类,对上面返回的一条一条的记录进行字段切割 
Hive 对文件中字段的分隔符默认情况下只支持单字节分隔符,如果数据文件中的分隔符是多 字符的,如下所示:





create table t_bi_reg(id string,name string) row format serde 'org.apache.hadoop.hive.serde2.RegexSerDe' with serdeproperties('input.regex'='(.*)\\|\\|(.*)','output.format.string'='%1$s %2$s') stored as textfile; 
hive>load data local inpath '/home/hadoop /hivedata/bi.dat' into table t_bi_reg;

hive>select * from t_bi_reg;

2、通过自定义 InputFormat 解决特殊分隔符问题

其原理是在 inputformat 读取行的时候将数据中的“多字节分隔符”替换为 hive 默认的分隔 符(ctrl+A 亦即 \001)或用于替代的单字符分隔符,以便 hive 在 serde 操作时按照默认的 单字节分隔符进行字段抽取


package com.ghgj.hive.delimit2;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat; public class BiDelimiterInputFormat extends TextInputFormat { @Override public RecordReader<LongWritable, Text> getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter)throws IOException { reporter.setStatus(genericSplit.toString());
BiRecordReader reader = new BiRecordReader(job,(FileSplit)genericSplit);
// MyRecordReader reader = new MyRecordReader(job,(FileSplit)genericSplit);
return reader; }


package com.ghgj.hive.delimit2;
import java.io.IOException;
import java.io.InputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.SplitCompressionInputStream;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader; public class BiRecordReader implements RecordReader<LongWritable, Text> {  private static final Log LOG = LogFactory.getLog(LineRecordReader.class    .getName()); private CompressionCodecFactory compressionCodecs = null;  private long start;  private long pos;  private long end;  private LineReader in;  int maxLineLength;  private Seekable filePosition;  private CompressionCodec codec;  private Decompressor decompressor; /**   * A class that provides a line reader from an input stream.   * @deprecated Use {@link org.apache.hadoop.util.LineReader} instead.   */  @Deprecated  public static class LineReader extends org.apache.hadoop.util.LineReader {               LineReader(InputStream in) {    super(in);   } LineReader(InputStream in, int bufferSize) {    super(in, bufferSize);   } public LineReader(InputStream in, Configuration conf)     throws IOException { super(in, conf);   }  } public BiRecordReader(Configuration job, FileSplit split) throws IOException {           this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",             Integer.MAX_VALUE);   start = split.getStart();       end = start + split.getLength();   final Path file = split.getPath();   compressionCodecs = new CompressionCodecFactory(job);   codec = compressionCodecs.getCodec(file); // open the file and seek to the start of the split   FileSystem fs = file.getFileSystem(job);   FSDataInputStream fileIn = fs.open(split.getPath()); if (isCompressedInput()) { decompressor = CodecPool.getDecompressor(codec);    if (codec instanceof SplittableCompressionCodec) {    final SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec)               .createInputStream(fileIn, decompressor, start, end,                 SplittableCompressionCodec.READ_MODE.BYBLOCK);     in = new LineReader(cIn, job);     start = cIn.getAdjustedStart();     end = cIn.getAdjustedEnd();     filePosition = cIn; // take pos from compressed stream    } else {     in = new LineReader(codec.createInputStream(fileIn,       decompressor), job);             filePosition = fileIn;    }   } else {    fileIn.seek(start);    in = new LineReader(fileIn, job);    filePosition = fileIn;   }   // If this is not the first split, we always throw away first record   // because we always (except the last split) read one extra line in   // next() method.   if (start != 0) {    start += in.readLine(new Text(), 0, maxBytesToConsume(start));   }   this.pos = start; } private boolean isCompressedInput() {   return (codec != null);  } private int maxBytesToConsume(long pos) {   return isCompressedInput() ? Integer.MAX_VALUE : (int) Math.min(             Integer.MAX_VALUE, end - pos);  } private long getFilePosition() throws IOException {   long retVal;   if (isCompressedInput() && null != filePosition) {    retVal = filePosition.getPos();   } else {    retVal = pos;   }   return retVal;  } public BiRecordReader(InputStream in, long offset, long endOffset,    int maxLineLength) {   this.maxLineLength = maxLineLength;   this.in = new LineReader(in);   this.start = offset;   this.pos = offset;   this.end = endOffset;   this.filePosition = null;  } public BiRecordReader(InputStream in, long offset, long endOffset,               Configuration job) throws IOException {   this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",             Integer.MAX_VALUE);   this.in = new LineReader(in, job);   this.start = offset;   this.pos = offset;   this.end = endOffset;   this.filePosition = null;  } public LongWritable createKey() {   return new LongWritable();
} public Text createValue() {   return new Text();  } /** Read a line. */  public synchronized boolean next(LongWritable key, Text value)    throws IOException { // We always read one extra line, which lies outside the upper   // split limit i.e. (end - 1)   while (getFilePosition() <= end) {    key.set(pos); int newSize = in.readLine(value,             maxLineLength,Math.max(maxBytesToConsume(pos), maxLineLength));    String str = value.toString().replaceAll("\\|\\|", "\\|");    value.set(str);    pos += newSize;        if (newSize == 0) {     return false;    }    if (newSize < maxLineLength) {     return true;    } // line too long. try again    LOG.info("Skipped line of size " + newSize + " at pos "      + (pos -             newSize));   } return false;  } /**   * Get the progress within the split   */  public float getProgress() throws IOException {   if (start == end) {    return 0.0f;   } else {    return Math.min(1.0f, (getFilePosition() - start) / (float) (end - start));   }  } public synchronized long getPos() throws IOException {   return pos;  } public synchronized void close() throws IOException {   try {    if (in != null) {     in.close();    }   } finally {    if (decompressor != null) {     CodecPool.returnDecompressor(decompressor);    }   }  } } 

注意:上述代码中的 api 全部使用 hadoop 的老 api 接口 org.apache.hadoop.mapred„. 然后将工程打包,并拷贝至 hive 安装目录的 lib 文件夹中,并重启 hive,使用以下语句建表 即可:

注:还需要在 hive 中使用 add jar,才能在执行 hql 查询该表时把自定义 jar 包传递给 maptask hive>add jar /home/hadoop/apps/hive/lib/myinput.jar


  1. hive 的分隔符、orderby sort by distribute by的优化

