采用如下架构将数据实时同步到hbase中:


针对 代码 的整体 浏览 。

首先 是 SPARK ENGINE 的抽象 类。

object SparkEngine {def getSparkConf():SparkConf = {val sparkConf: SparkConf = new SparkConf().set("spark.worker.timeout" , GlobalConfigUtils.getProp("spark.worker.timeout")).set("spark.cores.max" , GlobalConfigUtils.getProp("spark.cores.max")).set("spark.rpc.askTimeout" , GlobalConfigUtils.getProp("spark.rpc.askTimeout")).set("spark.task.maxFailures" , GlobalConfigUtils.getProp("spark.task.maxFailures")).set("spark.driver.allowMutilpleContext" , GlobalConfigUtils.getProp("spark.driver.allowMutilpleContext")).set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.sql.adaptive.enabled" , "true").set("spark.streaming.kafka.maxRatePerPartition" , GlobalConfigUtils.getProp("spark.streaming.kafka.maxRatePerPartition")).set("spark.streaming.backpressure.enabled" , GlobalConfigUtils.getProp("spark.streaming.backpressure.enabled")).set("spark.streaming.backpressure.initialRate" , GlobalConfigUtils.getProp("spark.streaming.backpressure.initialRate")).set("spark.streaming.backpressure.pid.minRate","10").set("enableSendEmailOnTaskFail", "true").set("spark.buffer.pageSize" , "16m")
//      .set("spark.streaming.concurrentJobs" , "5").set("spark.driver.host", "localhost").setMaster("local[*]").setAppName("query")sparkConf.set("spark.speculation", "true")sparkConf.set("spark.speculation.interval", "300s")sparkConf.set("spark.speculation.quantile","0.9")sparkConf.set("spark.streaming.backpressure.initialRate" , "500")sparkConf.set("spark.streaming.backpressure.enabled" , "true")sparkConf.set("spark.streaming.kafka.maxRatePerPartition" , "5000")
<<<<<<<<<<<<<<<<<<<<cutting>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>sparkConf}

在 StreamApp 里面, 开始 正式 的 执行开始。 、、
对 KAFKA MANAGER 对象 的创建。

KAFKA MANAGER 的 设计 。

第一步, 定义 kafka manager , KafkaManager.scala

class KafkaManager(zkHost:String , kafkaParams:Map[String , Object]) extends Serializable

在 KAFKAMANAGER的构造函数里面 ,
创建 Kafka 客户端,初始化 ZKUTILS

val (zkClient,zkConnection) = ZkUtils.createZkClientAndConnection(zkHost , 10000 , 10000)
val zkUtils = new ZkUtils(zkClient,zkConnection , false)

随后定义 KAFKA MANAGER 创建 DirectStream的函数。

 def createDirectStream[K:ClassTag , V:ClassTag](ssc:StreamingContext , topics:Seq[String]):InputDStream[ConsumerRecord[K, V]]

里面包含 2个 动作, ReadOffset 和利用KafkaUtil 创建 stream.

读取 OFFSET 函数,针对 具体TOPIC。

def createDirectStream[K:ClassTag , V:ClassTag](ssc:StreamingContext , topics:Seq[String]):InputDStream[ConsumerRecord[K, V]]{readoffset(topics, groudId).}

其 内部函数, 进行 OFFSET的 自我 管理。
最根本的技术来源如下 博客
https://blog.cloudera.com/offset-management-for-apache-kafka-with-apache-spark-streaming/

private def readOffset(topics:Seq[String] , groupId:String):Map[TopicPartition , Long]{//参考 如下 博客地址 https://www.cnblogs.com/small-k/p/8909942.html//1. 首先 得到 TOPIC 分区信息。调用 ZKUTILS 包 的 APIval topicAndPartitionMaps:mutable.Map[String,Seq[Int]]=  zkUtils.getPartitionsForTopics(topics)//2 .针对 拿到的某个topic 对应的 分区 进行 循环处理。
topicAndPartitionMaps.foreach(topicPartitions =>{// 2.1 获取 TOPIC 的目录
val zkGroupTopicsDirs: ZKGroupTopicDirs = new ZKGroupTopicDirs(groupId , topicPartitions._1)
// 对 某具体 TOPIC 的 分区对应的OFFSET 进行迭代
topicPartitions._2.foreach(partition =>{// 获取到 TOPIC对应的每个分区 的 OFFSET 路径
val offsetPath = s"${zkGroupTopicsDirs.consumerOffsetDir}/${partition}"//进行 读取 具体 路径数据的尝试,并将 读取到的数据 保存进 MAP 结构的topicPartitionMap里面
val  tryGetTopicPartition = Try{//String --->offsetval offsetTuples: (String, Stat) = zkUtils.readData(offsetPath)if(offsetTuples != null){topicPartitionMap.put(new TopicPartition(topicPartitions._1 , Integer.valueOf(partition)) , offsetTuples._1.toLong)}}
// 若 读取 失败 , 用CONSUMER 去拉取 OFFSET的最初开始的地方。
if(tryGetTopicPartition.isFailure){val consumer = new KafkaConsumer[String , Object](kafkaParams)val topicCollection = List(new TopicPartition(topicPartitions._1 , partition))consumer.assign(topicCollection)val avaliableOffset: Long = consumer.beginningOffsets(topicCollection).values().headconsumer.close()topicPartitionMap.put(new TopicPartition(topicPartitions._1 , Integer.valueOf(partition)) , avaliableOffset)}
// 有时候 因为各种原因,比如 某些消费 几天 出现 异常 没有 消费,但是没有注意,会造成 ZK里面的OFFSET和 实际 KAFKA里面 当前保存的OFFSET,因为 BROKER会定期更新 OFFSET,导致ZK OFFSET 过于太旧或者过于太新。val earliestOffsets = getEarliestOffsets(kafkaParams , topics)val latestOffsets = getLatestOffsets(kafkaParams , topics)for((k,v) <- topicPartitionMap){val current = vval earliest = earliestOffsets.get(k).getval latest = latestOffsets.get(k).getif(current < earliest || current > latest){topicPartitionMap.put(k , earliest)}
}

最终 KAFKA MANAGER 利用 ZK 对 OFFSET的管理, 然后

调用 createDirectStream 的ZKUTIL API,对外提供TOPIC PARTITIONS的整体管理。

KafkaUtil 创建 stream

val stream: InputDStream[ConsumerRecord[K, V]] = KafkaUtils.createDirectStream[K, V](ssc,PreferConsistent,ConsumerStrategies.Subscribe[K, V](topics, kafkaParams, **topicPartition**))// 针对 整个函数 def createDirectStream[K:ClassTag , V:ClassTag]的返回值为stream )

后续 开始 对于 整体 数据 利用 KAFKA MANAGER 进行 迭代 处理 和保存。

val inputDStream: InputDStream[ConsumerRecord[String, String]] = kafkaManager.createDirectStream(ssc, topics)后面 是 利用 读取 到 的 具体 TOPIC 的 PARTITION 以及 OFFSET 开始进行 迭代 的FOREACH 处理。inputDStream.foreachRDD(rdd =>{if(!rdd.isEmpty()){// 利用offsetRanges 将 和 KAFKA MANAGER 创建的 DirectStream对应的CONSUMER [K,V] 绑定起来进行迭代 处理val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRangesranges.foreach(line =>{println(s" 当前读取的topic:${line.topic} , partition:${line.partition} , fromOffset:${line.fromOffset} , untilOffset: ${line.untilOffset}")})
// 开始 对 DSTREAM  所代表 的 PARTITION 迭代 属性 进行 统计,并对 HBASE进行 实际存储操作。
val doElse = Try{val data: RDD[String] = rdd.map(line => line.value())data.foreachPartition(partition =>{//构建连接val conn = HbaseConnections.getHbaseConn})//写业务,这里 有个 内部 定义好的 STRUCTINTERPRETER的操作,后续介绍这个partition.foreach(d =>{val parse: (String  , Any) = JsonParse.parse(d)StructInterpreter.interpreter(parse._1 , parse , conn)})//注销连接HbaseConnections.closeConn(conn)// 这里是最终 提交 到 ZK 进行 保存 的地方if(doElse.isSuccess){//提交偏移量kafkaManager.persistOffset(rdd)

对于 HBASE 创建 表 ,一定要注意 创建 的时候 要自定义 Region来 Split, 否则容易造成 写 数据 造成 某些 Region 过于热点。Column 的列簇 名称 一定要简介, 不要太长,太长容易 浪费 内存。

关注重点 ,
createTable

public void createTable(HTableDescriptor desc,
byte[][] splitKeys)
throws IOException

Creates a new table with an initial set of empty regions defined by the specified split keys. The total number of regions created will be the number of split keys plus one. Synchronous operation. Note : Avoid passing empty split key.

构建 一个 SplitKeysBuilder 类 来 构造 统一 的前缀。

KAFKA 简介
https://www.cnblogs.com/zhaiyf/p/8651457.html

大数据课程 滴滴订单数据 MYSQL 转KAFKA 实时存储HBASE相关推荐

  1. 2019大数据课程_根据数据,2019年最佳免费在线课程

    2019大数据课程 As we do each year, Class Central has tallied the best courses of the previous year, based ...

  2. mysql如何实现实时存储_OpenResty + Mysql 实现日志实时存储

    应用场景和日志文件解析 本配置主要解决 Nginx 向 MySQL 中实时插入日志的问题,采用 OpenResty + Mysql 实现. 1. 刚开始的时候看了 Nginx 和 MySQL 的连接模 ...

  3. MySQL 到 MongoDB 实时数据同步实操分享

    MySQL数据怎么实时同步到 MongoDB 实践分享系列 摘要:很多 DBA 同学经常会遇到要从一个数据库实时同步到另一个数据库的问题,同构数据还相对容易,遇上异构数据.表多.数据量大等情况就难以同 ...

  4. MySQL 到 PostgreSQL 实时数据同步实操分享

    摘要:很多 DBA 和开发同学经常会遇到要从一个数据库实时同步到另一个数据库的问题,同构数据还相对容易,遇上异构数据.表多.数据量大等情况就难以同步.最近了解到一款实时数据同步工具 Tapdata C ...

  5. 2021-03-08~09~10~11~12 大数据课程笔记 day47day48day49day50day51

    @R星校长 大数据技术之Flink 第一章 初识Flink   在当前数据量激增的时代,各种业务场景都有大量的业务数据产生,对于这些不断产生的数据应该如何进行有效的处理,成为当下大多数公司所面临的问题 ...

  6. mysql 多维度分表_亿级订单数据分库分表设计方案(满足多维度查询:订单号、用户、商家、渠道)...

    根据业务初步预估订单业务量,每天500万的数据.我们将订单数据划分为了2大类型:分别为热数据和冷数据. 热数据:1个月内的订单数据,查询实时性较高; 冷数据:归档订单数据,查询频率不高; 根据实际业务 ...

  7. mysql 启用myisam_MySQL各存储引擎(INNODB,MyISAM等)的区别及其启动方法

    存储引擎是什么? MySQL中的数据用各种不同的技术存储在文件(或者内存)中.这些技术中的每一种技术都使用不同的存储机制.索引技巧.锁定水平并且最终提供广泛的不同的功能和能力.通过选择不同的技术,你能 ...

  8. (大数据工程师学习路径)第四步 SQL基础课程----SQL介绍及mysql的安装

    一.数据库和SQL介绍 数据库(Database)是按照数据结构来组织.存储和管理数据的仓库,它的产生距今已有六十多年.随着信息技术和市场的发展,数据库变得无处不在:它在电子商务.银行系统等众多领域都 ...

  9. 基于QT的滴滴网约车订单数据可视化分析

    全套资料下载地址:https://download.csdn.net/download/sheziqiong/85584944?spm=1001.2014.3001.5503 [摘要]在万物联网的当下 ...

最新文章

  1. 用jQuery写的一个翻页,并封装为插件,
  2. R语言双因素方差分析
  3. Oracle\MS SQL Server的数据库多表关联更新UPDATE与多表更新
  4. 存储控制器_SDRAM详解
  5. jwt如何防止token被窃取_JWT令牌
  6. 文件操作-使用readline分行读取大文件
  7. linux shell 自定义函数(定义、返回值、变量作用域)介绍
  8. centos7.2下编译安装git
  9. java输出回文数原代码_C++编程入门:判断回文数
  10. 20155213 实验三《敏捷开发与XP实践》实验报告
  11. 跳转指定位置(HTML)
  12. Android 高级工程师面试(二)
  13. 【Qt学习笔记】包含头文件确报错 does not name a type
  14. python打九九乘法表上三角下三角_Python-零基础自学系列之九九乘法表、打印菱形、打印对顶三角形、打印闪电、斐波拉契数列、素数...
  15. Python制作七夕表白实例项目-让你的情人心动起来
  16. java-操作 Excel
  17. 存储芯片涨价:助芯片制造商美光科技业绩连超预期
  18. C/C++编程工具及实用小软件推荐_dvlinker的博客-CSDN博客_编写c++的软件
  19. 微信公众号,服务号,小程序,微信支付对接需要注册哪些账号
  20. qq病毒java代码_QQSystem

热门文章

  1. python隐藏类的属性 重写__dir___python实例属性的显示方法-dir、__dict__
  2. antd-mobile-InputItem ios下键盘换行修改前往
  3. android项目监听蓝牙连接,android 蓝牙ACL通讯详解
  4. python打包成可执行文件
  5. 网络游戏行业的冬天来了吗?
  6. 大数据计算引擎击穿万古长夜
  7. 纯html各种心理性格测试网站
  8. [转载]ERP实施心得分享(1)
  9. tplink连接服务器失败_TP-LINK无线路由器拨号失败,服务器无响应。该怎样设置?...
  10. 2006-03-05 20:41又是唱k