一、spark本地或者集群创建hbase表(不需要Kerberos认证)、并且插入数据

1、环境准备:idea 16+scala-2.10.4+cdh-spark-1.6.1+jdk-1.7+hbase-1.2.0-cdh5.8.0

2、 (1)创建maven工程:pdf-hbase

(2)新建一个文件夹libs,导入cdh的spark jar,添加jar到项目中

spark-assembly-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar

spark-examples-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar

(3)编写pom.xml文件,导入相关的依赖包

3、编写实现的spark代码

ScalaHbaseCreate.scala类如下:

package controller.spark
import org.apache.hadoop.hbase.client.{Get, HBaseAdmin, HTable, Put}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor}
import org.apache.spark.{SparkContext, _}
/*** Document:本类作用---->spark创建Hbase(生产环境)* User: yangjf* Date: 2016/8/18  17:10* 测试通过* 集群执行:* spark-submit --class controller.spark.ScalaHbaseCreate--master yarn-cluster --executor-memory 1G --num-executors 4 --driver-memory 1g --executor-cores 2  /linux的jar路径*/
object ScalaHbaseCreate {def main(args: Array[String]): Unit = {//本地执行
val sparkConf =newSparkConf().setMaster("local[2]").setAppName("HBaseTest").set("spark.serializer","org.apache.spark.serializer.KryoSerializer");//集群执行
//    val sparkConf = new SparkConf().setMaster("yarn-cluster").setAppName("HBaseTest").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");valsc =newSparkContext(sparkConf)//"greatgas:"是命名空间,所以表名称=命名空间+自定义名vartable_name ="gas:test_enn_222"valconf = HBaseConfiguration.create()/**生产环境*/conf.set("hbase.zookeeper.quorum","host17.slave.cluster.enn.cn:2181");//nameservice是hadoop配置文件core-site.xml文件”fs.defaultFS”属性的别名,这里不能使用ip:8020conf.set("hbase.rootdir","hdfs://nameservice/hbase");        conf.set("hbase.zookeeper.property.clientPort","2181")conf.set(TableInputFormat.INPUT_TABLE, table_name)val hadmin = new HBaseAdmin(conf)if (!hadmin.isTableAvailable(table_name)) {print("Table Not Exists! Create Table")val tableDesc = new HTableDescriptor(table_name)tableDesc.addFamily(new HColumnDescriptor("basic".getBytes()))hadmin.createTable(tableDesc)}else{print("Table  Exists!  not Create Table")}val table = new HTable(conf, table_name);//插入5条数据for(i <-1to 5) {var put = new Put(Bytes.toBytes("row"+ i))put.add(Bytes.toBytes("basic"), Bytes.toBytes("name"), Bytes.toBytes("value " + i))table.put(put)}table.flushCommits()//Scan操作valhbaseRDD = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])//统计RDD个数valcount = hbaseRDD.count()println("HBase RDD Count:" + count)//缓存hbaseRDD.cache()val hBaseRDD = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])val g = newGet("row1".getBytes)val result = table.get(g)val value = Bytes.toString(result.getValue("basic".getBytes,"name".getBytes))println("GET id001 :"+value)hbaseRDD.cache()print("------------------------scan----------------")val res = hbaseRDD.take(count.toInt)for (j <- 1 until count.toInt) {println("j: " + j)var rs = res(j - 1)._2var kvs = rs.rawfor (kv <- kvs)println("rowkey:" + new String(kv.getRow()) +" cf:" + new String(kv.getFamily()) +" column:" + new String(kv.getQualifier()) +" value:" + new String(kv.getValue()))}println("-------------------------")//take操作println("--take1"+ hBaseRDD.take(1))//统计println("--count"+ hBaseRDD.count())}
}

4、进入集群查看是否创建成功,并且插入5条记录:

5、如果需要在集群上运行spark,则需要更改local[2]为yarn-cluster,然后打包jar,

上传到liunx机器上使用spark-submit提交,命令如下:

#spark-submit --classcontroller.spark.ScalaHbaseCreate--master yarn-cluster --executor-memory 1G --num-executors 4 --driver-memory 1g --executor-cores 2  /linux的jar路径

二、、spark本地或者集群创建hbase表(需要Kerberos认证)、并且插入数据

1、需要提供2个文件

(1)e_lvbin.keytab:需要在集群上生成,然后取到windows本地

(2)krb5.conf:可以从集群上/etc/目录下,取到windows本地

2、本地创建scala类

ScalaHbaseCreateTest

packagecontroller.spark
import org.apache.hadoop.hbase.client.{Get, HBaseAdmin, HTable, Put}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{SparkContext, _}
/*** Document:本类作用---->spark创建Hbase链接,并且插入数据(测试环境)* User: yangjf* Date: 2016/8/18  17:10* 测试结果:测试成功*/
object ScalaHbaseCreateTest {def main(args: Array[String]): Unit = {System. setProperty("java.security.krb5.conf","F:/krb5.conf");//该文件目录:/etc/krb5.confvalsparkConf =newSparkConf().setMaster("local[2]").setAppName("HBaseTest").set("spark.serializer","org.apache.spark.serializer.KryoSerializer");
//    val sparkConf = new SparkConf().setMaster("yarn-cluster").setAppName("HBaseTest").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");valsc =newSparkContext(sparkConf)//定义表名称="命名空间"+"表名"vartable_name ="greatgas:test_enn_123"valconf = HBaseConfiguration.create()/**测试环境*/conf.set("hbase.zookeeper.quorum","slave-31.dev.cluster.enn.cn:2181");//zookeeper地址conf.set("hbase.rootdir","hdfs://mycluster/hbase")//mycluster是别名,不能使用ip:8020conf.set("hadoop.security.authentication","kerberos");conf.set("hbase.security.authentication","kerberos");conf.set("hbase.security.authorization","true");conf.set("hbase.master.kerberos.principal","hbase/_HOST@ENN.CN");conf.set("hbase.thrift.kerberos.principal","hbase/_HOST@ENN.CN");conf.set("hbase.regionserver.kerberos.principal","hbase/_HOST@ENN.CN");val user = "e_lvbin@ENN.CN";val keyPath = "F:/e_lvbin.keytab"//UserGroupInformation.setConfiguration(conf);UserGroupInformation.loginUserFromKeytab(user, keyPath);//获取Hbase的masterconf.set("hbase.zookeeper.property.clientPort","2181")conf.set(TableInputFormat.INPUT_TABLE, table_name)val hadmin = new HBaseAdmin(conf)if (!hadmin.isTableAvailable(table_name)) {print("Table Not Exists! Create Table")val tableDesc = new HTableDescriptor(table_name)tableDesc.addFamily(new HColumnDescriptor("basic".getBytes()))hadmin.createTable(tableDesc)}else{print("Table  Exists!  not Create Table")}val table = new HTable(conf, table_name);for (i <- 1 to 5) {var put = new Put(Bytes.toBytes("row"+ i))put.add(Bytes.toBytes("basic"), Bytes.toBytes("name"), Bytes.toBytes("value " + i))table.put(put)}table.flushCommits()//Sc操作,转换为RDDvalhbaseRDD = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])val count = hbaseRDD.count()println("HBase RDD Count:" + count)hbaseRDD.cache()//核心valhBaseRDD = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])val g = newGet("row1".getBytes)val result = table.get(g)val value = Bytes.toString(result.getValue("basic".getBytes,"name".getBytes))println("GET id001 :"+value)hbaseRDD.cache()print("------------------------scan----------")val res = hbaseRDD.take(count.toInt)for (j <- 1 until count.toInt) {println("j: " + j)var rs = res(j - 1)._2var kvs = rs.rawfor (kv <- kvs)println("rowkey:" + new String(kv.getRow()) +" cf:" + new String(kv.getFamily()) +" column:" + new String(kv.getQualifier()) +" value:" + new String(kv.getValue()))}println("-------------------------")println("--take1" + hBaseRDD.take(1))println("--count" + hBaseRDD.count())}
}

3、本地运行该scala类

4、进入集群查看是否创建表并且插入5条数据

5、如果需要集群下执行该类,需要修改本类代码、需要将认证文件放到

目录下

新建一个类ScalaHbaseCreateTestCluster.scala,内容如下:

packagecontroller.spark
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{SparkContext, _}
import java.io.{FileOutputStream, IOException, InputStream}
import org.apache.hadoop.hbase.security.User/*** Document:本类作用---->spark创建Hbase链接,并且插入数据(测试环境),集群上创建表* User: yangjf* Date: 2016/8/18  17:10* 测试结果:测试未通过,原因没有找到认证文件--------------------------------------------该方法测试未通过!------------------------------*spark-submit --class controller.spark.ScalaHbaseCreateTestCluster --master yarn-cluster --executor-memory 1G --num-executors 4 --driver-memory 1g --executor-cores 2 /data/e_lvbin/yjf_spark/pdf-hbase.jar*/
object ScalaHbaseCreateTestCluster {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("yarn-cluster").setAppName("HBaseTest").set("spark.serializer","org.apache.spark.serializer.KryoSerializer");val sc = newSparkContext(sparkConf)//定义表名称="命名空间"+"表名"vartable_name ="greatgas:test_enn_1234"System.setProperty("java.security.krb5.conf","/etc/krb5.conf");//机器上文件目录:/etc/krb5.confvalconf = HBaseConfiguration.create()/**测试环境*/conf.set("hbase.zookeeper.quorum","slave-31.dev.cluster.enn.cn:2181");conf.set("hbase.rootdir","hdfs://mycluster/hbase");conf.set("hadoop.security.authentication","kerberos");conf.set("hbase.security.authentication","kerberos");conf.set("hbase.security.authorization","true");conf.set("hbase.master.kerberos.principal","hbase/_HOST@ENN.CN");conf.set("hbase.thrift.kerberos.principal","hbase/_HOST@ENN.CN");conf.set("hbase.regionserver.kerberos.principal","hbase/_HOST@ENN.CN");val user ="e_lvbin@ENN.CN";val keyPath =  "e_lvbin.keytab";/*** 集群上由于没有一个固定路径,所以需要读入内存,再从内存中获取认证文件* 所以,此时需要将认证文件e_lvbin.keytab放到项目的resources目录下*/val in: InputStream =ScalaHbaseCreateTestCluster.getClass.getClassLoader.getResourceAsStream(keyPath)println("---------"+in+"-----------")val out: FileOutputStream =newFileOutputStream(keyPath)val bytes: Array[Byte] =newArray[Byte](in.available);in.read(bytes)out.write(bytes)out.close()println("---------"+keyPath+"-----------")UserGroupInformation.setConfiguration(conf);UserGroupInformation.loginUserFromKeytab(user, keyPath);//获取Hbase的masterconf.set("hbase.zookeeper.property.clientPort","2181")conf.set("hbase.zookeeper.property.clientPort","2181");conf.set("hbase.rpc.timeout","10000");conf.set("hbase.client.retries.number","5");conf.set("hbase.client.pause","5000");conf.set("hbase.client.scanner.timeout.period","50000");conf.set(TableInputFormat.INPUT_TABLE, table_name)val hadmin = new HBaseAdmin(conf)if (!hadmin.isTableAvailable(table_name)) {print("Table Not Exists! Create Table")val tableDesc = new HTableDescriptor(table_name)tableDesc.addFamily(new HColumnDescriptor("basic".getBytes()))hadmin.createTable(tableDesc)}else{print("Table  Exists!  not Create Table")}val table = new HTable(conf, table_name);for (i <- 1 to 5) {var put = new Put(Bytes.toBytes("row"+ i))put.add(Bytes.toBytes("basic"), Bytes.toBytes("name"), Bytes.toBytes("value " + i))table.put(put)}table.flushCommits()//Sc操作,转换为RDDvalhbaseRDD = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])val count = hbaseRDD.count()println("HBase RDD Count:" + count)hbaseRDD.cache()//核心valhBaseRDD = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])val g = newGet("row1".getBytes)val result = table.get(g)val value = Bytes.toString(result.getValue("basic".getBytes,"name".getBytes))println("GET id001 :"+value)hbaseRDD.cache()print("------------------------scan----------")val res = hbaseRDD.take(count.toInt)for (j <- 1 until count.toInt) {println("j: " + j)var rs = res(j - 1)._2var kvs = rs.rawfor (kv <- kvs)println("rowkey:" + new String(kv.getRow()) +" cf:" + new String(kv.getFamily()) +" column:" + new String(kv.getQualifier()) +" value:" + new String(kv.getValue()))}println("-------------------------")println("--take1" + hBaseRDD.take(1))println("--count" + hBaseRDD.count())}
}

注意:以上操作一定要保证环境正确,否则运行可能会报找不到scala方法的异常!

[HBase基础]-- spark创建hbase表(非分区表)相关推荐

  1. Java连接HBASE数据库,创建一个表,删除一张表,修改表,输出插入,修改,数据删除,数据获取,显示表信息,过滤查询,分页查询,地理hash

    准备工作 1.创建Java的Maven项目 创建好的目录结构如下: 另外注意junit的版本,最好不要太高,最开始笔者使用的junit4.12的,发现运行的时候会报错.最后把Junit的版本改成4.7 ...

  2. 数据库基础--数据库基础管理(创建库/表 以及插入修改删除表数据)

    1.啥子是数据库–存储数据的仓库 2.什么是数据–音乐,电影,文本,图片等 常见的数据库软件 Oracle 不开源 跨平台 厂商:甲骨文 MySQL 开源 跨平台 厂商:甲骨文 SQL Server ...

  3. hbase数据库介绍,HBASE的特点,表结构逻辑视图,Row Key,列族,时间戳,Cell

    HBASE数据库 1. Hbase基础 1.1 hbase数据库介绍 1.简介nosql hbase是bigtable的开源java版本.是建立在hdfs之上,提供高可靠性.高性能.列存储.可伸缩.实 ...

  4. 什么是YARN?跟HBase和Spark比优势在哪?终于有人讲明白了

    导读:HBase没有资源什么事情也做不了,Spark占用了资源却没有事情可做?YARN了解一下. 作者:朱凯 来源:大数据DT(ID:hzdashuju) 01 概述 随着Hadoop生态的发展,开源 ...

  5. hbase基础建表语句

    在Hadoop目录下的HBASE下执行命令 ./hbase shell 进入hbase环境 创建hbase 数据库表 create "表名", "字段A",&q ...

  6. 【Hive】如何在 Hive 中创建外部表映射 Hbase 中已存在的表

    文章目录 一.上传完整的jar文件到hive/lib中 二.修改hive-site.xml 三.修改hive-env.sh 四.在hive和hbase中分别创建相关联的表并通过hive向hbase表中 ...

  7. hive创建hbase外部表

    hive 启动命令:$HIVE_HOME/bin/hive hive 创建hbase外部表: ihr_pes为hive表名: CREATE EXTERNAL TABLE ihr_pes( id str ...

  8. shell和javaAPI两种方式创建hbase表并预分区

    在hbase里面,如果我们建表不预分区,那么一个表的数据都会被一个region处理,如果数据过多就会执行region的split,如果数据量很大这样会很费性能,所以最好我们先根据业务的数据量在建表的时 ...

  9. HBase shell 命令创建表及添加数据操作

    HBase shell 命令创建表及添加数据操作 创建表,表名hbase_test,HBase表是由Key-Value组成的,下面给出一个hbase表的格式,方便小伙伴们理解 此表有两个列族,列族1和 ...

最新文章

  1. python怎么把程序封装成函数_PYTHON中如何把固定格式代码,封装成一个函数?
  2. log4net使用具体解释
  3. opencvsharp中resize图像
  4. vue --- 2.0数据的响应式的一种实现
  5. c++ string后面会添加‘\0‘
  6. shell 做加法运算_使用shell脚本实现加法乘法运算
  7. python selenium截图_python+selenium截图操作样例
  8. 【Android】修改Android 模拟器IMEI
  9. 正则表达式入门(c#)
  10. 【难点+重点BFS】LeetCode 126. Word Ladder II
  11. 2016-05-06
  12. chm打开,显示“已取消到该网站的导航”
  13. qt无边框窗体的移动
  14. pytest常用参数
  15. 2022-2028年中国农作物行业投资战略分析及发展前景研究报告
  16. 手机照片局部放大镜_怎样发照片才能惊艳朋友圈?
  17. docker下载安装和常用命令
  18. 【小程序流量主】小程序如何快速流量主
  19. 安卓在活动左上角添加返回键
  20. MATLB|基于粒子群算法的能源管理系统EMS(考虑光伏、储能 、柴油机系统)

热门文章

  1. OPCUA从入门到精通看这里就够了
  2. C 杂志订阅管理系统
  3. 2020年编程语言展望
  4. Nginx配置实例-动静分离
  5. 一句话描述计算机网络,用恰当的关联词将下面两句话合并成一句话。1.①计算机联网,可以随...
  6. ex20 函数和文件
  7. python操作mysql数据库—坑吭
  8. NX二次开发-输入X向量Y向量输出一个3*3矩阵UF_MTX3_initialize
  9. 将shp文件转化为osm文件,并导入到sumo中建立路网
  10. pytorch学习笔记十一:损失函数