sqoop+使用mysql_Sqoop+mysql+Hive+ Ozzie数据仓库案例
--设置scd的生效时间和过期时间
SET hivevar:cur_date = CURRENT_DATE();SET hivevar:pre_date = DATE_ADD(${hivevar:cur_date},-1);SET hivevar:max_date = CAST('2050-01-01' ASDATE);--设置cdc的开始结束日期
INSERT overwrite TABLErds.cdc_timeSELECT last_load, ${hivevar:cur_date} FROMrds.cdc_time;--装载customer维度--获取源数据中被删除的客户和地址发生改变的客户,将这些数据设置为过期时间,即当前时间的前一天
UPDATEdim_customerSET expiry_date =${hivevar:pre_date}WHERE dim_customer.customer_sk IN(SELECTa.customer_skFROM (SELECTcustomer_sk,
customer_number,
customer_street_addressFROMdim_customerWHERE expiry_date =${hivevar:max_date}) aLEFT JOIN rds.customer b ON a.customer_number =b.customer_numberWHERE b.customer_number IS NULL
OR a.customer_street_address <>b.customer_street_address);--将有地址变化的插入到dim_customer表,如果有相同数据存在有不过期的数据则不插入
INSERT INTOdim_customerSELECT row_number() over (ORDER BY t1.customer_number) +t2.sk_max,
t1.customer_number,
t1.customer_name,
t1.customer_street_address,
t1.customer_zip_code,
t1.customer_city,
t1.customer_state,
t1.version,
t1.effective_date,
t1.expiry_dateFROM(SELECTt2.customer_number customer_number,
t2.customer_name customer_name,
t2.customer_street_address customer_street_address,
t2.customer_zip_code,
t2.customer_city,
t2.customer_state,
t1.version+ 1`version`,
${hivevar:pre_date} effective_date,
${hivevar:max_date} expiry_dateFROMdim_customer t1INNER JOIN rds.customer t2 ON t1.customer_number =t2.customer_numberAND t1.expiry_date =${hivevar:pre_date}LEFT JOIN dim_customer t3 ON t1.customer_number =t3.customer_numberAND t3.expiry_date =${hivevar:max_date}WHERE t1.customer_street_address <>t2.customer_street_addressAND t3.customer_sk IS NULL) t1CROSS JOIN(SELECT
COALESCE(MAX(customer_sk),0) sk_maxFROMdim_customer) t2;--处理customer_name列上的scd1,覆盖--不进行更新,将源数据中的name列有变化的数据提取出来,放入临时表--将 dim_couster中这些数据删除、--将临时表中的数据插入
DROP TABLE IF EXISTStmp;CREATE TABLE tmp AS
SELECTa.customer_sk,
a.customer_number,
b.customer_name,
a.customer_street_address,
a.customer_zip_code,
a.customer_city,
a.customer_state,
a.version,
a.effective_date,
a.expiry_dateFROMdim_customer aJOIN rds.customer b ON a.customer_number =b.customer_numberAND(a.customer_name <>b.customer_name);--删除数据
DELETE FROMdim_customerWHEREdim_customer.customer_skIN (SELECT customer_sk FROMtmp);--插入数据
INSERT INTOdim_customerSELECT * FROMtmp;--处理新增的customer记录
INSERT INTOdim_customerSELECT row_number() over (ORDER BY t1.customer_number) +t2.sk_max,
t1.customer_number,
t1.customer_name,
t1.customer_street_address,
t1.customer_zip_code,
t1.customer_city,
t1.customer_state,1,
${hivevar:pre_date},
${hivevar:max_date}FROM( SELECT t1.*
FROMrds.customer t1LEFT JOIN dim_customer t2 ON t1.customer_number =t2.customer_numberWHERE t2.customer_sk IS NULL) t1CROSS JOIN(SELECT
COALESCE(MAX(customer_sk),0) sk_maxFROMdim_customer) t2;--装载product维度--取源数据中删除或者属性发生变化的产品,将对应
UPDATEdim_productSET expiry_date =${hivevar:pre_date}WHERE dim_product.product_sk IN(SELECTa.product_skFROM(SELECTproduct_sk,
product_code,
product_name,
product_categoryFROMdim_productWHERE expiry_date =${hivevar:max_date}) aLEFT JOIN rds.product b ON a.product_code =b.product_codeWHERE b.product_code IS NULL
OR (a.product_name <> b.product_name OR a.product_category <>b.product_category));--处理product_name、product_category列上scd2的新增行
INSERT INTOdim_productSELECT row_number() over (ORDER BY t1.product_code) +t2.sk_max,
t1.product_code,
t1.product_name,
t1.product_category,
t1.version,
t1.effective_date,
t1.expiry_dateFROM( SELECTt2.product_code product_code,
t2.product_name product_name,
t2.product_category product_category,
t1.version+ 1`version`,
${hivevar:pre_date} effective_date,
${hivevar:max_date} expiry_dateFROMdim_product t1INNER JOIN rds.product t2 ON t1.product_code =t2.product_codeAND t1.expiry_date =${hivevar:pre_date}LEFT JOIN dim_product t3 ON t1.product_code =t3.product_codeAND t3.expiry_date =${hivevar:max_date}WHERE(t1.product_name <>t2.product_nameOR t1.product_category <>t2.product_category)AND t3.product_sk IS NULL) t1CROSS JOIN (SELECT COALESCE(MAX(product_sk),0) sk_maxFROMdim_product) t2;--处理新增的 product 记录
INSERT INTOdim_productSELECT row_number() over (ORDER BY t1.product_code) +t2.sk_max,
t1.product_code,
t1.product_name,
t1.product_category,1,
${hivevar:pre_date},
${hivevar:max_date}FROM( SELECT t1.*
FROMrds.product t1LEFT JOIN dim_product t2 ON t1.product_code =t2.product_codeWHERE t2.product_sk IS NULL) t1CROSS JOIN (SELECT COALESCE(MAX(product_sk),0) sk_maxFROMdim_product) t2;--装载order维度
INSERT INTOdim_orderSELECT row_number() over (ORDER BY t1.order_number) +t2.sk_max,
t1.order_number,
t1.version,
t1.effective_date,
t1.expiry_dateFROM( SELECTorder_number order_number,1`version`,
order_date effective_date,'2050-01-01'expiry_dateFROMrds.sales_order, rds.cdc_timeWHERE entry_date >= last_load AND entry_date
INSERT INTOsales_fact_sales_orderSELECTorder_sk,
customer_sk,
product_sk,
date_sk,
order_amountFROMrds.sales_order a,
dim_order b,
dim_customer c,
dim_product d,
date_dim e,
rds.cdc_time fWHERE a.order_number =b.order_numberAND a.customer_number =c.customer_numberAND a.order_date >=c.effective_dateAND a.order_date =d.effective_dateAND a.order_date =f.last_loadAND a.entry_date
INSERT overwrite TABLErds.cdc_timeSELECTcurrent_load, current_loadFROM rds.cdc_time;
sqoop+使用mysql_Sqoop+mysql+Hive+ Ozzie数据仓库案例相关推荐
- sqoop动态分区导入mysql,sqoop 导入数据到hive分区表(外表,内表) 指定分区 指定数据库 指定表...
sqoop 导入数据到hive 1.1.导入数据到hive分区表(内表),指定分区 创建hive分区表 –hive-database 指定数据库 –table 指定表 –hive-overwrite ...
- 数据同步Sqoop用法之mysql与Hive导入导出
目录 一. Sqoop介绍 二. Mysql 数据导入到 Hive 三. Hive数据导入到Mysql 四. mysql数据增量导入hive 1). 新建一张表 2). 初始化hive表: 1). 创 ...
- 使用 Sqoop 将 30W+ MySQL 数据导入 Hive
本实验完成的是,使用 Sqoop 从 MySQL 导出数据到 Hive. 整体步骤分为: 初始化 MySQL 的 30W+ 数据 安装配置 Sqoop 在 Hive 中初始化目标表 Sqoop 脚本实 ...
- Hadoop教程(五):Flume、Sqoop、Pig、Hive、OOZIE
在我们了解Flume和Sqoop之前,让我们研究数据加载到Hadoop的问题: 使用Hadoop分析处理数据,需要装载大量从不同来源的数据到Hadoop集群. 从不同来源大容量的数据加载到Hadoop ...
- hbase查询语句_Sqoop实操|Sqoop导入Parquet文件Hive查询为null问题
作者:余枫 1 问题重现 1.在MySQL中建表,一个bigint字段,二个varchar字段 2.在Hive中建Parquet表 create table test(s1 string commen ...
- 执行sqoop 用shell_Mysql和Hive之间通过Sqoop进行数据同步
文章回顾 理论 大数据框架原理简介 大数据发展历程及技术选型 实践 搭建大数据运行环境之一 搭建大数据运行环境之二 本地MAC环境配置 CPU数和内存大小 查看CPU数 sysctl machdep. ...
- sqoop导出到mysql中文乱码问题总结、utf8、gbk
sqoop导出到mysql中文乱码问题总结.utf8.gbk 今天使用sqoop1.4.5版本的(hadoop使用cdh5.4)因为乱码问题很是头痛半天.下面进行一一总结 命令: [root@sdzn ...
- 如何完美解决Sqoop导入导出MySQL数据错位问题
我发现小伙伴们在使用Sqoop把数据从MySQL导入到Hive的过程中经常会遇到数据错位的问题,虽然最后都是通过添加参数的方法来解决这个问题,但是我认为这并不是一个完美的解决方案,所以花了一点时间研究 ...
- Sqoop在导入MySQL数据时遇到Timestamp列为0000-00-00 00:00:00报错
为什么80%的码农都做不了架构师?>>> Sqoop在导入MySQL数据时遇到Timestamp列为'0000-00-00 00:00:00'时报错,解决方法是:在JDBC连接 ...
最新文章
- Spring Boot 五种热部署方式,极速开发就是生产力!
- setPreferredSize和setSize的区别及用法
- 最新大脑图谱研究表明,手部的运动区域也与整个身体相连
- CCNA 2 chapter
- OpenGL于MFC使用汇总(三)——离屏渲染
- LeetCode 674. Longest Continuous Increasing Subsequence
- 【Oracle经典】132个oracle热门精品资料——下载目录收藏 (转载)
- 【机器学习-西瓜书】二、偏差-方差分解;泛化误差
- 天工软件在正射项目中的应用与常见问题解答
- C#中This的用法
- c 连接mysql数据库_C++连接mysql数据库的两种方法
- LANDrop局域网文件传输神器
- 中国信通院发布《区块链赋能新型智慧城市白皮书(2019年)》解读(附全文下载)
- MFC之打开(开发)映美精相机
- 用 Python 计算综合测评中的专业成绩加权平均分
- 如何控制局域网网速_无线路由器如何限制局域网网速 无线路由器限制局域网网速方法【介绍】...
- Photoshop CS5 轻松匹配图像颜色
- 排序算法lowB三人组
- 近十年量化交易领域最重要的十本参考书推荐!重要!
- GitHub 标星 2.9w+,我发现了一个宝藏项目,作为编程新手有福了!