一、DWS层与DWM设计

1、思路

之前已经进行分流

但只需要一些指标进行实时计算,将这些指标以主题宽表的形式输出

2、需求

访客、商品、地区、关键词四层的需求(可视化大屏展示、多维分析)

3、DWS层定位

轻度聚合、主题中管理

二、DWS层-访客主题宽表的计算

DWS表主要包含维度表和事实表

维度表主要包括渠道、地区、版本、新老用户等

事实表主要包括PV、UV、跳出次数、进入页面数(session_count)、连续访问时长等

1、需求分析

合并接收到的数据流,按时间窗口聚合,并将聚合结果写入数据库

2、实现

(1)读取kafka各个流的数据

page_log、dwm_uv、dwm_jump_user跳出用户

(2)合并读取到的数据流

使用union合并两个结构相同的数据流

需要提前调整数据结构封装主题宽表实体类(两个待合并的流也都要是这样的结构)

userJumpDStream.map实现转换

合并4条输入的流:

uniqueVisitStatsDstream.union(
pageViewStatsDstream,
sessionVisitDstream,
userJumpStatDstream
);

(3)根据维度进行聚合

设置时间标记及水位线

4个维度作为key,使用tuple4组合,进行分组,.keyBy(new KeySelector

reduce窗口内聚合,并补充时间字段

(4)写入OLAP数据库ClickHouse

专门解决大量数据统计分析的数据库,在保证了海量数据存储的能力,同时又兼顾了响应速度

先建表,使用 ReplacingMergeTree 引擎来保证幂等性

将日期变为数字作为分区类型

编写ClickhouseUtils工具类

创建 TransientSink 注解,标记不需要保存的字段

配置连接地址类,并增加写入OLAP的sink

查看控制台输出及表中数据 visitor_stats_2021

三、商品主题宽表

把多个事实表的明细数据汇总起来组合成宽表

1、需求及思路

获取数据流并转换为统一的数据对象格式

将统一数据结构合并为一个流

设定事件时间与水位线,分组、开窗、聚合

关联维度表补充数据

写入ClickHouse

2、功能实现

建商品统计实体类(各种业务数据的统计),并给必要字段添加@Builder.Default注解,各类添加@Builder注解(构造方法)

kafka中获取指定的流:FlinkKafkaConsumer<String> pageViewSource = MyKafkaUtil.getKafkaSource(pageViewSourceTopic,groupId);

对各种流数据进行结构转换,转换为构建的实体类

创建电商业务常量类 GmallConstant,类似维度表,用一个数字表示一个字符串

将统一的数据结构合并为一个流

设定事件时间与水位线

按商品id分组,10秒的窗口进行开窗window(TumblingEventTimeWindows.of(Time.seconds(10)))

补充商品维度、SKU维度、品类维度、品牌维度等信息

SingleOutputStreamOperator<ProductStats> productStatsWithTmDstream =AsyncDataStream.unorderedWait(productStatsWithCategory3Dstream,new DimAsyncFunction<ProductStats>("DIM_BASE_TRADEMARK") {@Overridepublic void join(ProductStats productStats, JSONObject jsonObject) throws
Exception {productStats.setTm_name(jsonObject.getString("TM_NAME"));}@Overridepublic String getKey(ProductStats productStats) {return String.valueOf(productStats.getTm_id());}}, 60, TimeUnit.SECONDS);
productStatsWithTmDstream.print("to save");

ClickHouse中创建商品主题宽表,添加写入ch的sink  

//TODO 7.写入到 ClickHouse
productStatsWithTmDstream.addSink(ClickHouseUtil.<ProductStats>getJdbcSink(
"insert into product_stats_2021 values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"));

查看ClickHouse表中的数据

四、地区主题表(Flink SQL)

1、需求分析

定义 Table 流环境,把数据源定义为动态表

通过 SQL 查询出结果表并转换为数据流

将数据流写入目标数据库

2、功能实现

(1)添加FlinkSQL依赖

(2)定义 Table 流环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

(3)将数据源topic定义为动态表WITH (" + MyKafkaUtil.getKafkaDDL(orderWideTopic, groupId) + ")");

WATERMARK FOR rowtime AS rowtime 是把某个虚拟字段设定为 EVENT_TIME

(4)拼接 Kafka 相关属性到 DDL

(5)做聚合运算

Env.sqlQuery("select " +……并将其转换为数据流

DataStream<ProvinceStats> provinceStatsDataStream =
tableEnv.toAppendStream(provinceStateTable, ProvinceStats.class);

(6)定义地区统计宽表实体类并写入到ClickHouse(addSink)

五、关键词主题表(Flink SQL)

1、需求分析

维度聚合决定关键词的大小

来源:用户在搜索框中的搜索、以商品为主题的统计中获取

2、搜索关键词的实现

(1)使用IK分词器对字符串进行分词

(2)编写自定义函数,将分词器加入FlinkSQL中

Flink的自定义函数包括:Scalar Function(相当于 Spark 的 UDF)、Table Function(相当于 Spark 的 UDTF)、Aggregation Functions (相当于 Spark 的 UDAF)

由于分词是一对多的拆分,应该选择TableFunction

封装 KeywordUDTF 函数,自定义UDTF,继承TableFunction

(3)定义Table流环境

(4)注册自定义函数,将数据源定义为动态表

(5)过滤非空数据 tableEnv.sqlQuery

(6)利用 UDTF 进行拆分(SQL内部)LATERAL TABLE(ik_analyze(fullword)) as T(keyword)");

(7)聚合,根据各个关键词出现次数进行 ct

(8)转换为流并写入 ClickHouse

建表、封装实体类、添加sink

六、总结

1、DWS 层主要是基于 DWD 和 DWM 层的数据进行轻度聚合统计

2、利用 union 操作实现多流的合并

3、窗口聚合操作

4、对 clickhouse 数据库的写入操作

5、FlinkSQL 实现业务

6、分词器的使用

7、在 FlinkSQL 中自定义函数的使用

【实时数仓】Day04-DWS层业务:DWS设计、访客宽表、商品主题宽表、流合并、地区主题表、FlinkSQL、关键词主题表、分词...相关推荐

  1. 【实时数仓】DWD层需求分析及实现思路、idea环境搭建、实现DWD层处理用户行为日志的功能

    文章目录 一 DWD层需求分析及实现思路 1 分层需求分析 2 每层的职能 3 DWD层职能详细介绍 (1)用户行为日志数据 (2)业务数据 4 DWD层数据准备实现思路 二 环境搭建 1 创建mav ...

  2. Flink电商实时数仓项目03-DWM层

    1 DWM层与DWS层的设计思路 1.1 设计思路 用户行为日志: 模拟日志jar -> nginx -> web日志服务器 -> kafka(ods_base_log) -> ...

  3. 【实时数仓】DWM层订单宽表之需求分析、订单和订单明细关联源码

    文章目录 一 DWM层-订单宽表 1 需求分析与思路 2 订单和订单明细关联代码实现 (1)从Kafka的dwd层接收订单和订单明细数据 a 创建订单实体类 b 创建订单明细实体类 c 在dwm包下创 ...

  4. 大数据项目之Flink实时数仓(数据采集/ODS层)

    项目概览 实时大屏效果

  5. 数据查询和业务流分开_滴滴实时数仓逐层剖解:实时与离线数据误差0.5%

    原标题:滴滴实时数仓逐层剖解:实时与离线数据误差< 作者介绍 潘澄,资深软件开发工程师.负责实时数据仓库建设,多年数据相关工作经验,专注数据建模.数据仓库.实时数据技术等领域. 朱峰,高级软件开 ...

  6. 实时数仓入门训练营:实时数仓助力互联网实时决策和精准营销

    简介:<实时数仓入门训练营>由阿里云研究员王峰.阿里云高级产品专家刘一鸣等实时计算 Flink 版和 Hologres 的多名技术/产品一线专家齐上阵,合力搭建此次训练营的课程体系,精心打 ...

  7. 技术分享:从双11看实时数仓Hologres高可用设计与实践

    简介:本文将会从阿里巴巴双11场景出发,分析实时数仓面临的高可用挑战以及针对性设计. 2021年阿里巴巴双11完美落下为帷幕,对消费者来说是一场购物盛宴,对背后的业务支撑技术人来说,更是一场年度大考. ...

  8. 干货!一文看Doris在作业帮实时数仓中的应用实践

    数据驱动未来.在大数据生态中,数据分析系统在数据创造价值过程中起着非常关键的作用,直接影响业务决策效率以及决策质量.Apache Doris作为一款支持对海量大数据进行快速分析的MPP数据库,在数据分 ...

  9. 实时数仓 大数据 Hadoop flink kafka

    ⼀.实时数仓建设背景 实时需求⽇趋迫切 ⽬前各⼤公司的产品需求和内部决策对于数据实时性的要求越来越迫切,需要实时数仓的能⼒来赋能.传统离 线数仓的数据时效性是 T+1,调度频率以天为单位,⽆法⽀撑实时 ...

最新文章

  1. DOS命令大全(经典收藏)
  2. 从计算机视觉(slam)和摄影测量两个维度进行BA算法原理推导
  3. nconf创建nagios实例
  4. Yarn 组件的指挥部 – 调度器Scheduler
  5. http如何像tcp一样实时的收消息?
  6. 交换机怎么使用vtp
  7. python增加一列数据_Python编程给numpy矩阵添加一列方法示例
  8. C++ 指针与引用的差别
  9. php的curl函数模拟post、get数据提交,速度非常慢的处理办法
  10. 营业执照生成_0跑动,3步注册,48小时拿证!金山发出首张全程电子化登记营业执照!...
  11. Java全网最全面试题(2022年VIP典藏版)
  12. 基于LM331的频率电压转换电路
  13. 变焦和对焦_在Randonautica内部,该应用程序可带领变焦器发现彩虹,尸体和隐藏的宝藏
  14. win10系统登录服务器密码存储位置,win10远程服务器登录密码
  15. Google 3D压缩项目Draco简析
  16. 使用python基于socket的tcp服务器聊天室
  17. 关于Java单例模式的思考
  18. 推荐Layui镜像网站
  19. C语言初步入门学习大略
  20. 【非原创】PHPMywind调用

热门文章

  1. 记录一下用过的正则表达式
  2. Matlab R2018b激活教程
  3. python判断火车票座位_火车票买不到?看我用python监控票源
  4. Pytorch随记(3)
  5. word能保存html文件,Word可以保存为网页文件 教你来回转 Word转成网页,网页转Word...
  6. 【调剂】北方民族大学2022年硕士研究生调剂公告(二)
  7. 基于ZigBee的室内无线定位系统设计
  8. 【教学类-22-02】20221210《八款字体的描字帖-4*4格整张-不用订书机》(大班主题《我是中国人-中国字》)
  9. c语言递归求最大公约数
  10. 微软亚洲研究院4人团队完成视觉识别里程碑式突破