Hive特殊分隔符处理
补充:hive 读取数据的机制:
1、 首先用 InputFormat<默认是:org.apache.hadoop.mapred.TextInputFormat >的一个具体实 现类读入文件数据,返回一条一条的记录(可以是行,或者是你逻辑中的“行”)
2、 然后利用 SerDe<默认:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe>的一个具体 实现类,对上面返回的一条一条的记录进行字段切割
Hive 对文件中字段的分隔符默认情况下只支持单字节分隔符,如果数据文件中的分隔符是多 字符的,如下所示:
01||huangbo
02||xuzheng
03||wangbaoqiang
1.使用RegexSerDe通过正则表达式来抽取字段
create table t_bi_reg(id string,name string) row format serde 'org.apache.hadoop.hive.serde2.RegexSerDe' with serdeproperties('input.regex'='(.*)\\|\\|(.*)','output.format.string'='%1$s %2$s') stored as textfile; hive>select * from t_bi_reg; |
2、通过自定义 InputFormat 解决特殊分隔符问题
其原理是在 inputformat 读取行的时候将数据中的“多字节分隔符”替换为 hive 默认的分隔 符(ctrl+A 亦即 \001)或用于替代的单字符分隔符,以便 hive 在 serde 操作时按照默认的 单字节分隔符进行字段抽取
com.ghgj.hive.delimit2.BiDelimiterInputFormat
package com.ghgj.hive.delimit2;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat; public class BiDelimiterInputFormat extends TextInputFormat { @Override public RecordReader<LongWritable, Text> getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter)throws IOException { reporter.setStatus(genericSplit.toString());
BiRecordReader reader = new BiRecordReader(job,(FileSplit)genericSplit);
// MyRecordReader reader = new MyRecordReader(job,(FileSplit)genericSplit);
return reader; }
}
com.ghgj.hive.delimit2.BiRecordReader
package com.ghgj.hive.delimit2;
import java.io.IOException;
import java.io.InputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.SplitCompressionInputStream;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader; public class BiRecordReader implements RecordReader<LongWritable, Text> { private static final Log LOG = LogFactory.getLog(LineRecordReader.class .getName()); private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private LineReader in; int maxLineLength; private Seekable filePosition; private CompressionCodec codec; private Decompressor decompressor; /** * A class that provides a line reader from an input stream. * @deprecated Use {@link org.apache.hadoop.util.LineReader} instead. */ @Deprecated public static class LineReader extends org.apache.hadoop.util.LineReader { LineReader(InputStream in) { super(in); } LineReader(InputStream in, int bufferSize) { super(in, bufferSize); } public LineReader(InputStream in, Configuration conf) throws IOException { super(in, conf); } } public BiRecordReader(Configuration job, FileSplit split) throws IOException { this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); codec = compressionCodecs.getCodec(file); // open the file and seek to the start of the split FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); if (isCompressedInput()) { decompressor = CodecPool.getDecompressor(codec); if (codec instanceof SplittableCompressionCodec) { final SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec) .createInputStream(fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); in = new LineReader(cIn, job); start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; // take pos from compressed stream } else { in = new LineReader(codec.createInputStream(fileIn, decompressor), job); filePosition = fileIn; } } else { fileIn.seek(start); in = new LineReader(fileIn, job); filePosition = fileIn; } // If this is not the first split, we always throw away first record // because we always (except the last split) read one extra line in // next() method. if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start; } private boolean isCompressedInput() { return (codec != null); } private int maxBytesToConsume(long pos) { return isCompressedInput() ? Integer.MAX_VALUE : (int) Math.min( Integer.MAX_VALUE, end - pos); } private long getFilePosition() throws IOException { long retVal; if (isCompressedInput() && null != filePosition) { retVal = filePosition.getPos(); } else { retVal = pos; } return retVal; } public BiRecordReader(InputStream in, long offset, long endOffset, int maxLineLength) { this.maxLineLength = maxLineLength; this.in = new LineReader(in); this.start = offset; this.pos = offset; this.end = endOffset; this.filePosition = null; } public BiRecordReader(InputStream in, long offset, long endOffset, Configuration job) throws IOException { this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); this.in = new LineReader(in, job); this.start = offset; this.pos = offset; this.end = endOffset; this.filePosition = null; } public LongWritable createKey() { return new LongWritable();
} public Text createValue() { return new Text(); } /** Read a line. */ public synchronized boolean next(LongWritable key, Text value) throws IOException { // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) while (getFilePosition() <= end) { key.set(pos); int newSize = in.readLine(value, maxLineLength,Math.max(maxBytesToConsume(pos), maxLineLength)); String str = value.toString().replaceAll("\\|\\|", "\\|"); value.set(str); pos += newSize; if (newSize == 0) { return false; } if (newSize < maxLineLength) { return true; } // line too long. try again LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } return false; } /** * Get the progress within the split */ public float getProgress() throws IOException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (getFilePosition() - start) / (float) (end - start)); } } public synchronized long getPos() throws IOException { return pos; } public synchronized void close() throws IOException { try { if (in != null) { in.close(); } } finally { if (decompressor != null) { CodecPool.returnDecompressor(decompressor); } } } }
注意:上述代码中的 api 全部使用 hadoop 的老 api 接口 org.apache.hadoop.mapred„. 然后将工程打包,并拷贝至 hive 安装目录的 lib 文件夹中,并重启 hive,使用以下语句建表 即可:
注:还需要在 hive 中使用 add jar,才能在执行 hql 查询该表时把自定义 jar 包传递给 maptask hive>add jar /home/hadoop/apps/hive/lib/myinput.jar
Hive特殊分隔符处理相关推荐
- hive 的分隔符、orderby sort by distribute by的优化
一.Hive 分号字符 分号是SQL语句结束标记,在HiveQL中也是,可是在HiveQL中,对分号的识别没有那么智慧,比如: select concat(cookie_id,concat(';',' ...
- 一文搞定 Hive 表分隔符
Hive 中没有定义专门的数据格式,数据格式可以由用户指定,用户定义数据格式需要指定三个属性:列分隔符(通常为空格."\t"."\x001″).行分隔符("\n ...
- 【Hive】分隔符 『 单字节分隔符 | 多字节分隔符』
文章目录 1. 概述 2. 单字节分隔符 方法:使用delimited关键字 3. 其它复杂情况 方式一:写MR程序进行字符替换转为单字节分隔符问题(不推荐) 方式二:自定义InputFormat转为 ...
- Hive多字节分隔符解决方案
目录 1 应用场景 1.1 Hive中的分隔符 1.2 特殊数据 2 问题与需求 2.1问题 2.2 情况二:数据中包含了分隔符 3 解决方案 3.1 解决方案一:替换分隔符 3.2 解决方案二:Re ...
- 大数据Hive多字节分隔符
目录 1 应用场景 1.1 Hive中的分隔符 1.2 特殊数据 2.2 需求 3 解决方案一:替换分隔符 3.1 方案概述 3.2 程序开发 3.3 重新建表加载数据 3.4 查看结果 3.5 总结 ...
- java hive默认分隔符,HIVE 默认分隔符 以及linux系统中特殊字符的输入和查看方式...
一.查看特殊字符 Vim中可见特殊字符会直接显示,不可见特殊字符会显示为该字符在命令行的输入方式, 例如\r显示为^M.通过:help digraph-table可看到所有Vim中可输入的特殊字符, ...
- hive一次加载多个文件_0738-6.2.0-如何在Hive中使用多分隔符
文档编写目的 Hive在0.14及以后版本支持字段的多分隔符,参考: https://cwiki.apache.org/confluence/display/Hive/MultiDelimitSer ...
- hive 十六进制转十进制_Hive使用十六进制分隔符异常分析
温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看. 1.问题描述 通过sqoop抽取Mysql表数据到hive表,发现hive表所有列显示为null Hive表的分隔符为"\u0 ...
- hive 十六进制转十进制_0026-Hive使用十六进制分隔符异常分析
温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看. 1.问题描述 通过sqoop抽取Mysql表数据到hive表,发现hive表所有列显示为null Hive表的分隔符为"u00 ...
最新文章
- HTML的标签描述21
- 谁扛起张一鸣的游戏野心?
- 内存很空却频繁gc_NonRegisteringDriver造成的内存频繁FullGc
- Java:将条件移至消息文件
- Web安全之Cookie劫持
- (转)unity web 缓存解决方案
- Linux C socket 编程之TCP
- linux镜像下载和vmware虚拟主机部署
- C语言sb代码,10个重要的算法C语言实现源代码
- Epicor 客制化 - 常用对象
- 1756冗余_冗余电源1756-PA75R
- 无锡梅里旅游策划方案——中国第一锡宫!
- 火车订票管理系统/火车购票网站系统/火车订票网站/火车票售卖系统
- Deep Projective 3D Semantic Segmentation
- 基于 Debian 的 GNU/Linux的Parrot 3.11 已发布
- EasyPoi导出之复杂合并单元格
- 群体创新更能促进社会进步
- python人民币转大写_python 人民币数字转大写中文
- python怎么导入sql数据库,##使用python将excel表中数据导入sql server数据库
- F28335第十一篇——串行外设接口(SPI)