使用MapReduce生成HFile文件,通过BulkLoader方式(跳过WAL验证)批量加载到HBase表中

package com.mengyao.bigdata.hbase;import java.io.IOException;import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** * @author mengyao * HBase-1.0.1.1、Hadoop-2.6.0**/
public class BulkLoadApp {private static Configuration conf = HBaseConfiguration.create();private static String inPath;private static String outPath;private static String tableName;static {conf.set("hbase.zookeeper.quorum", "bdata200,bdata202,bdata203");conf.set("hbase.zookeeper.property.clientPort", "2181");}static class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {private ImmutableBytesWritable row;@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();//id,username,email,birthday,mobile,phone,modifiedString[] fields = line.split("\t");String id = fields[0];String username = fields[1];String mail = fields[2];String birthday = fields[3];String mobile = fields[4];String phone = fields[5];String regtime = fields[6];String rowKey = DigestUtils.md5Hex(id);row = new ImmutableBytesWritable(Bytes.toBytes(rowKey));Put put = new Put(Bytes.toBytes(rowKey), System.currentTimeMillis());if (!StringUtils.isEmpty(id)) {put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("id"), Bytes.toBytes(id));                }if (!StringUtils.isEmpty(username)) {put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("username"), Bytes.toBytes(username));}if (!StringUtils.isEmpty(mail)) {put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("mail"), Bytes.toBytes(mail));}if (!StringUtils.isEmpty(birthday) || !birthday.equals("0000-00-00")) {put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("birthday"), Bytes.toBytes(birthday));}if (!StringUtils.isEmpty(mobile)) {put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("mobile"), Bytes.toBytes(mobile));}if (!StringUtils.isEmpty(phone)) {put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("phone"), Bytes.toBytes(phone));}if (!StringUtils.isEmpty(regtime)) {put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("modified"), Bytes.toBytes(regtime));}context.write(row, put);}}static int createJob(String[] args) throws Exception {inPath = args[0];outPath = args[1];tableName = args[2];Connection connection = ConnectionFactory.createConnection(conf);Table table = connection.getTable(TableName.valueOf(tableName));Job job=Job.getInstance(conf);job.setJarByClass(BulkLoadApp.class);job.setMapperClass(BulkLoadMapper.class);job.setNumReduceTasks(0);job.setMapOutputKeyClass(ImmutableBytesWritable.class);job.setMapOutputValueClass(Put.class);job.setOutputFormatClass(HFileOutputFormat2.class);HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf(tableName)));FileInputFormat.addInputPath(job,new Path(inPath));FileOutputFormat.setOutputPath(job,new Path(outPath));return job.waitForCompletion(true)?0:1;}/*** use commond:*         1、hadoop jar MyJar INPUT_FILE OUTPUT_DIR TABLE_NAME*             hadoop jar bigdata.jar /tag/data/user/haier_user.csv /tag/data/user/haier_user_out tbl_shopuser*         2、hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles OUTPUT_DIR TABLE_NAME*             hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /tag/data/user/haier_user_out tbl_shopuser* @param args* @throws Exception*/@SuppressWarnings("deprecation")public static void main(String[] args) throws Exception {if (args.length!=3) {System.out.println("Usage: "+BulkLoadApp.class.getName()+" Input paramters <INPUT_PATH> <OUTPUT_PATH> <TABLE_NAME>");} else {int status = createJob(args);if (status == 0) {LoadIncrementalHFiles loadHFiles = new LoadIncrementalHFiles(conf);loadHFiles.doBulkLoad(new Path(outPath), new HTable(conf, Bytes.toBytes(tableName)));}System.exit(status);}}}

转载于:https://www.cnblogs.com/mengyao/p/6774046.html

使用MapReduce将HDFS数据导入到HBase(三)相关推荐

  1. 使用MapReduce将HDFS数据导入Mysql

    使用MapReduce将Mysql数据导入HDFS代码链接 将HDFS数据导入Mysql,代码示例 package com.zhen.mysqlToHDFS;import java.io.DataIn ...

  2. linux中将hdfs数据导入hbase,将数据文件导入到HBase中

    假设有一个TSV格式的数据文件test.dat(TSV是指数据文件中的每个字段是以制表符隔开的) 首先,将test.dat上传到hdfs上,具体的命令如下: hadoop fs -copyFromLo ...

  3. 通过Spark生成HFile,并以BulkLoad方式将数据导入到HBase

    在实际生产环境中,将计算和存储进行分离,是我们提高集群吞吐量.确保集群规模水平可扩展的主要方法之一,并且通过集群的扩容.性能的优化,确保在数据大幅增长时,存储不能称为系统的瓶颈. 具体到我们实际的项目 ...

  4. Doris hdfs数据导入doris动态分区表

    @羲凡--只为了更好的活着 Doris hdfs数据导入doris动态分区表 本文重点 1.动态分区表创建 2.读取路径作为分区参数 3.预聚合增加count列--set (cnt = 1) 4.br ...

  5. HBase 系列 (七)数据导入到HBase数据库的N种方式

    文章目录 Shell导入到HBase: Hive与HBase集成: Sqoop将Mysql移到HBase: JavaAPI控制HBase: Shell导入到HBase: ##通过hbase shell ...

  6. hive数据导入到hbase

    方式一: ​ hbase中建表,然后hive中建外部表,这样当hive中写入数据后,hbase中的表也会同时更新 创建hbase表 create 'classes','user' --表名是class ...

  7. mysql 导入百万级数据 几种 java_Java 修行第034天--执行计划及其使用--Oracle数据导入导出--第三章MySQL使用...

    执行计划中牢记几句话: -- 尽量避免是*代替所有列,编写查询语句时使用具体列名代替*,可以防止全表扫描 -- 尽可能少的使用like关键字进行模糊查询 -- 建立适当的索引可以提高查询效率 十三. ...

  8. Excel数据导入到hbase实战

    表的设计 列蔟:推荐1-2个,能使用1个就不是使用2个 版本的设计:如果我们的项目不需要保存历史的版本,直接按照默认配置VERSIONS=1就OK.如果项目中需要保存历史的变更信息,就可以将VERSI ...

  9. HBase 数据导入功能实现方式解释

    https://www.ibm.com/developerworks/cn/opensource/os-cn-data-import/index.html 预备知识:启动 HBase 清单 1. 修改 ...

  10. 大数据技术之HBase(超级详细)

    大数据技术之HBase 第1章 HBase简介 1.1 什么是HBase HBase的原型是Google的BigTable论文,受到了该论文思想的启发,目前作为Hadoop的子项目来开发维护,用于支持 ...

最新文章

  1. 这就是爱?英特尔处理器将整合AMD HBM2 GPU
  2. 用js操作table、tr、td 「字体样式及TD背景图片」
  3. Telnet 对memcached进行数据操作
  4. Android APK文件学习调研
  5. 下一代安全工具:SHA-3
  6. mysql pdo prepare_PDO::prepare
  7. 运营系统性能管理的重要性
  8. 响应式mysql_Spring Data R2DBC响应式操作MySQL
  9. ASP.NET2.0一次发送多封邮件
  10. 股票历史数据下载接口汇总(动态更新)
  11. 汽车称重软件系统配置(一)
  12. 获取UI控件位置信息
  13. 第一天 渗透的基本概念
  14. python 战舰_419. 甲板上的战舰(Python)
  15. 如何将Ai文件转换成PDF文件
  16. 天兔lepus部署文档
  17. 自建服务器同步软件,黑群晖自建anki服务器电脑anki软件设置同步
  18. c#语言定义文档pdf,C#如何更改Word的语言设置.pdf
  19. 【工具】windows--word转PDF
  20. Android Studio调用百度地图(二):实现地图显示后台定位和步行导航

热门文章

  1. outlook搜索栏跑到上面去了_南昌搜索引擎seo优化
  2. linux命令文本处理(一)grep
  3. wxPython控件学习之TextCtrl(二)多行及样式文本框
  4. zabbix监控之模板使用、网络发现及邮件报警功能
  5. 软件公司的管理规范化了、编制都齐全了,一般小公司是承受不了的这么庞大的开支的...
  6. ORB-SLAM 解读(一)ORB关键点提取
  7. 编写Spark SQL查询程序
  8. 2019春年第三次课程设计实验报告
  9. 玩大了,开源协议修改引发MongoDB“大动荡”?
  10. Struts2学习(三)———— 输入校验和拦截器