--设置scd的生效时间和过期时间

SET hivevar:cur_date = CURRENT_DATE();SET hivevar:pre_date = DATE_ADD(${hivevar:cur_date},-1);SET hivevar:max_date = CAST('2050-01-01' ASDATE);--设置cdc的开始结束日期

INSERT overwrite TABLErds.cdc_timeSELECT last_load, ${hivevar:cur_date} FROMrds.cdc_time;--装载customer维度--获取源数据中被删除的客户和地址发生改变的客户,将这些数据设置为过期时间,即当前时间的前一天

UPDATEdim_customerSET expiry_date =${hivevar:pre_date}WHERE dim_customer.customer_sk IN(SELECTa.customer_skFROM (SELECTcustomer_sk,

customer_number,

customer_street_addressFROMdim_customerWHERE expiry_date =${hivevar:max_date}) aLEFT JOIN rds.customer b ON a.customer_number =b.customer_numberWHERE b.customer_number IS NULL

OR a.customer_street_address <>b.customer_street_address);--将有地址变化的插入到dim_customer表,如果有相同数据存在有不过期的数据则不插入

INSERT INTOdim_customerSELECT row_number() over (ORDER BY t1.customer_number) +t2.sk_max,

t1.customer_number,

t1.customer_name,

t1.customer_street_address,

t1.customer_zip_code,

t1.customer_city,

t1.customer_state,

t1.version,

t1.effective_date,

t1.expiry_dateFROM(SELECTt2.customer_number customer_number,

t2.customer_name customer_name,

t2.customer_street_address customer_street_address,

t2.customer_zip_code,

t2.customer_city,

t2.customer_state,

t1.version+ 1`version`,

${hivevar:pre_date} effective_date,

${hivevar:max_date} expiry_dateFROMdim_customer t1INNER JOIN rds.customer t2 ON t1.customer_number =t2.customer_numberAND t1.expiry_date =${hivevar:pre_date}LEFT JOIN dim_customer t3 ON t1.customer_number =t3.customer_numberAND t3.expiry_date =${hivevar:max_date}WHERE t1.customer_street_address <>t2.customer_street_addressAND t3.customer_sk IS NULL) t1CROSS JOIN(SELECT

COALESCE(MAX(customer_sk),0) sk_maxFROMdim_customer) t2;--处理customer_name列上的scd1,覆盖--不进行更新,将源数据中的name列有变化的数据提取出来,放入临时表--将 dim_couster中这些数据删除、--将临时表中的数据插入

DROP TABLE IF EXISTStmp;CREATE TABLE tmp AS

SELECTa.customer_sk,

a.customer_number,

b.customer_name,

a.customer_street_address,

a.customer_zip_code,

a.customer_city,

a.customer_state,

a.version,

a.effective_date,

a.expiry_dateFROMdim_customer aJOIN rds.customer b ON a.customer_number =b.customer_numberAND(a.customer_name <>b.customer_name);--删除数据

DELETE FROMdim_customerWHEREdim_customer.customer_skIN (SELECT customer_sk FROMtmp);--插入数据

INSERT INTOdim_customerSELECT * FROMtmp;--处理新增的customer记录

INSERT INTOdim_customerSELECT row_number() over (ORDER BY t1.customer_number) +t2.sk_max,

t1.customer_number,

t1.customer_name,

t1.customer_street_address,

t1.customer_zip_code,

t1.customer_city,

t1.customer_state,1,

${hivevar:pre_date},

${hivevar:max_date}FROM( SELECT t1.*

FROMrds.customer t1LEFT JOIN dim_customer t2 ON t1.customer_number =t2.customer_numberWHERE t2.customer_sk IS NULL) t1CROSS JOIN(SELECT

COALESCE(MAX(customer_sk),0) sk_maxFROMdim_customer) t2;--装载product维度--取源数据中删除或者属性发生变化的产品,将对应

UPDATEdim_productSET expiry_date =${hivevar:pre_date}WHERE dim_product.product_sk IN(SELECTa.product_skFROM(SELECTproduct_sk,

product_code,

product_name,

product_categoryFROMdim_productWHERE expiry_date =${hivevar:max_date}) aLEFT JOIN rds.product b ON a.product_code =b.product_codeWHERE b.product_code IS NULL

OR (a.product_name <> b.product_name OR a.product_category <>b.product_category));--处理product_name、product_category列上scd2的新增行

INSERT INTOdim_productSELECT row_number() over (ORDER BY t1.product_code) +t2.sk_max,

t1.product_code,

t1.product_name,

t1.product_category,

t1.version,

t1.effective_date,

t1.expiry_dateFROM( SELECTt2.product_code product_code,

t2.product_name product_name,

t2.product_category product_category,

t1.version+ 1`version`,

${hivevar:pre_date} effective_date,

${hivevar:max_date} expiry_dateFROMdim_product t1INNER JOIN rds.product t2 ON t1.product_code =t2.product_codeAND t1.expiry_date =${hivevar:pre_date}LEFT JOIN dim_product t3 ON t1.product_code =t3.product_codeAND t3.expiry_date =${hivevar:max_date}WHERE(t1.product_name <>t2.product_nameOR t1.product_category <>t2.product_category)AND t3.product_sk IS NULL) t1CROSS JOIN (SELECT COALESCE(MAX(product_sk),0) sk_maxFROMdim_product) t2;--处理新增的 product 记录

INSERT INTOdim_productSELECT row_number() over (ORDER BY t1.product_code) +t2.sk_max,

t1.product_code,

t1.product_name,

t1.product_category,1,

${hivevar:pre_date},

${hivevar:max_date}FROM( SELECT t1.*

FROMrds.product t1LEFT JOIN dim_product t2 ON t1.product_code =t2.product_codeWHERE t2.product_sk IS NULL) t1CROSS JOIN (SELECT COALESCE(MAX(product_sk),0) sk_maxFROMdim_product) t2;--装载order维度

INSERT INTOdim_orderSELECT row_number() over (ORDER BY t1.order_number) +t2.sk_max,

t1.order_number,

t1.version,

t1.effective_date,

t1.expiry_dateFROM( SELECTorder_number order_number,1`version`,

order_date effective_date,'2050-01-01'expiry_dateFROMrds.sales_order, rds.cdc_timeWHERE entry_date >= last_load AND entry_date

INSERT INTOsales_fact_sales_orderSELECTorder_sk,

customer_sk,

product_sk,

date_sk,

order_amountFROMrds.sales_order a,

dim_order b,

dim_customer c,

dim_product d,

date_dim e,

rds.cdc_time fWHERE a.order_number =b.order_numberAND a.customer_number =c.customer_numberAND a.order_date >=c.effective_dateAND a.order_date =d.effective_dateAND a.order_date =f.last_loadAND a.entry_date

INSERT overwrite TABLErds.cdc_timeSELECTcurrent_load, current_loadFROM rds.cdc_time;

sqoop+使用mysql_Sqoop+mysql+Hive+ Ozzie数据仓库案例相关推荐

  1. sqoop动态分区导入mysql,sqoop 导入数据到hive分区表(外表,内表) 指定分区 指定数据库 指定表...

    sqoop 导入数据到hive 1.1.导入数据到hive分区表(内表),指定分区 创建hive分区表 –hive-database 指定数据库 –table 指定表 –hive-overwrite ...

  2. 数据同步Sqoop用法之mysql与Hive导入导出

    目录 一. Sqoop介绍 二. Mysql 数据导入到 Hive 三. Hive数据导入到Mysql 四. mysql数据增量导入hive 1). 新建一张表 2). 初始化hive表: 1). 创 ...

  3. 使用 Sqoop 将 30W+ MySQL 数据导入 Hive

    本实验完成的是,使用 Sqoop 从 MySQL 导出数据到 Hive. 整体步骤分为: 初始化 MySQL 的 30W+ 数据 安装配置 Sqoop 在 Hive 中初始化目标表 Sqoop 脚本实 ...

  4. Hadoop教程(五):Flume、Sqoop、Pig、Hive、OOZIE

    在我们了解Flume和Sqoop之前,让我们研究数据加载到Hadoop的问题: 使用Hadoop分析处理数据,需要装载大量从不同来源的数据到Hadoop集群. 从不同来源大容量的数据加载到Hadoop ...

  5. hbase查询语句_Sqoop实操|Sqoop导入Parquet文件Hive查询为null问题

    作者:余枫 1 问题重现 1.在MySQL中建表,一个bigint字段,二个varchar字段 2.在Hive中建Parquet表 create table test(s1 string commen ...

  6. 执行sqoop 用shell_Mysql和Hive之间通过Sqoop进行数据同步

    文章回顾 理论 大数据框架原理简介 大数据发展历程及技术选型 实践 搭建大数据运行环境之一 搭建大数据运行环境之二 本地MAC环境配置 CPU数和内存大小 查看CPU数 sysctl machdep. ...

  7. sqoop导出到mysql中文乱码问题总结、utf8、gbk

    sqoop导出到mysql中文乱码问题总结.utf8.gbk 今天使用sqoop1.4.5版本的(hadoop使用cdh5.4)因为乱码问题很是头痛半天.下面进行一一总结 命令: [root@sdzn ...

  8. 如何完美解决Sqoop导入导出MySQL数据错位问题

    我发现小伙伴们在使用Sqoop把数据从MySQL导入到Hive的过程中经常会遇到数据错位的问题,虽然最后都是通过添加参数的方法来解决这个问题,但是我认为这并不是一个完美的解决方案,所以花了一点时间研究 ...

  9. Sqoop在导入MySQL数据时遇到Timestamp列为0000-00-00 00:00:00报错

    为什么80%的码农都做不了架构师?>>>    Sqoop在导入MySQL数据时遇到Timestamp列为'0000-00-00 00:00:00'时报错,解决方法是:在JDBC连接 ...

最新文章

  1. Spring Boot 五种热部署方式,极速开发就是生产力!
  2. setPreferredSize和setSize的区别及用法
  3. 最新大脑图谱研究表明,手部的运动区域也与整个身体相连
  4. CCNA 2 chapter
  5. OpenGL于MFC使用汇总(三)——离屏渲染
  6. LeetCode 674. Longest Continuous Increasing Subsequence
  7. 【Oracle经典】132个oracle热门精品资料——下载目录收藏 (转载)
  8. 【机器学习-西瓜书】二、偏差-方差分解;泛化误差
  9. 天工软件在正射项目中的应用与常见问题解答
  10. C#中This的用法
  11. c 连接mysql数据库_C++连接mysql数据库的两种方法
  12. LANDrop局域网文件传输神器
  13. 中国信通院发布《区块链赋能新型智慧城市白皮书(2019年)》解读(附全文下载)
  14. MFC之打开(开发)映美精相机
  15. 用 Python 计算综合测评中的专业成绩加权平均分
  16. 如何控制局域网网速_无线路由器如何限制局域网网速 无线路由器限制局域网网速方法【介绍】...
  17. Photoshop CS5 轻松匹配图像颜色
  18. 排序算法lowB三人组
  19. 近十年量化交易领域最重要的十本参考书推荐!重要!
  20. GitHub 标星 2.9w+,我发现了一个宝藏项目,作为编程新手有福了!

热门文章

  1. 诺基亚N95 - 双卡双待手机来自中国
  2. 潜水器的六自由度运动模型
  3. spring+thymeleaf实现表单验证数据双向绑定
  4. java 城市分词_Java中文分词hanlp使用
  5. 网上资料打印价格黑白A4多少钱
  6. 【移动端】竖向滑屏效果
  7. EasyExcel写多sheet
  8. 画圣诞树的matlab程序,教你怎样用Python画了一棵圣诞树,快来学习
  9. 关于Linux邮件列表的订阅与取消订阅具体方法
  10. 背包问题动态规划matlab,01背包问题动态规划详解