文章目录

  • 一 DWS层-商品主题计算
    • 1 把JSON字符串数据流转换为统一数据对象的数据流
      • (1)转换订单宽表流数据
      • (2)转换支付宽表流数据
    • 2 把统一的数据结构流合并为一个流
      • (1)代码
      • (2)测试
    • 3 设定事件时间与水位线
    • 4 分组、开窗、聚合
    • 5 补充商品维度信息
      • (1)关联商品维度
      • (2)关联SPU维度
      • (3)关联品类维度
      • (4)关联品牌维度
      • (5)测试
    • 6 写入ClickHouse
      • (1)在ClickHouse中创建商品主题宽表
      • (2)为主程序增加写入ClickHouse的Sink
      • (3)整体测试
  • 二 DWS层-地区主题表(FlinkSQL)
    • 1 需求分析与思路
    • 2 在pom.xml文件中添加FlinkSQL相关依赖
    • 3 创建ProvinceStatsSqlApp,定义Table流环境
    • 4 MyKafkaUtil增加一个DDL的方法
    • 5 把数据源定义为动态表并指定水位线
      • (1)指定`WATERMARK`
      • (2)系统内置函数
      • (3)给计算列起别名
      • (4)完整代码

一 DWS层-商品主题计算

1 把JSON字符串数据流转换为统一数据对象的数据流

(1)转换订单宽表流数据

// 4.6 转换订单宽表流数据
SingleOutputStreamOperator<ProductStats> orderWideStatsDS = orderWideStrDS.map(new MapFunction<String, ProductStats>() {@Overridepublic ProductStats map(String jsonStr) throws Exception {OrderWide orderWide = JSON.parseObject(jsonStr, OrderWide.class);ProductStats productStats = ProductStats.builder().sku_id(orderWide.getSku_id()).order_sku_num(orderWide.getSku_num()).order_amount(orderWide.getSplit_total_amount()).ts(DateTimeUtil.toTs(orderWide.getCreate_time())).orderIdSet(new HashSet(Collections.singleton(orderWide.getOrder_id()))).build();return productStats;}}
);

(2)转换支付宽表流数据

// 4.7 转换支付宽表流数据
SingleOutputStreamOperator<ProductStats> paymentWideStatsDS = paymentWideStrDS.map(new MapFunction<String, ProductStats>() {@Overridepublic ProductStats map(String jsonStr) throws Exception {PaymentWide paymentWide = JSON.parseObject(jsonStr, PaymentWide.class);ProductStats productStats = ProductStats.builder().sku_id(paymentWide.getSku_id()).payment_amount(paymentWide.getSplit_total_amount()).paidOrderIdSet(new HashSet(Collections.singleton(paymentWide.getOrder_id()))).ts(DateTimeUtil.toTs(paymentWide.getCallback_time())).build();return productStats;}}
);

2 把统一的数据结构流合并为一个流

(1)代码

// TODO 5 将不同流的数据通过union合并到一起
DataStream<ProductStats> unionDS = clickAndDisplayStatsDS.union(favorStatsDS,cartStatsDS,refundStatsDS,commentStatsDS,orderWideStatsDS,paymentWideStatsDS
);unionDS.print(">>>");

(2)测试

  • 启动ZK、Kafka、logger.sh、ClickHouse、Redis、HDFS、Hbase、Maxwell

    redis-server /home/hzy/redis2022.conf
    sudo systemctl start clickhouse-server
    
  • 运行BaseLogApp

  • 运行BaseDBApp

  • 运行OrderWideApp

  • 运行PaymentWideApp

  • 运行ProductsStatsApp

  • 运行rt_applog目录下的jar包(可以获取到曝光 display_ct和点击数据click_ct)

  • 运行rt_dblog目录下的jar包(可以获取到sku_id、cart_ct、favor_ct,下单数量,下单金额、orderIdSet等)

  • 查看控制台输出(如电脑性能不够,可以将日志和业务两条线分开测试)

3 设定事件时间与水位线

// TODO 6 指定watermark以及提取时间时间字段
SingleOutputStreamOperator<ProductStats> productStatsWithWatermarkDS = unionDS.assignTimestampsAndWatermarks(WatermarkStrategy.<ProductStats>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<ProductStats>() {@Overridepublic long extractTimestamp(ProductStats productStats, long recordTimestamp) {return productStats.getTs();}})
);

4 分组、开窗、聚合

// TODO 7 分组 -- 按照商品id分组
KeyedStream<ProductStats, Long> keyedDS = productStatsWithWatermarkDS.keyBy(ProductStats::getSku_id);// TODO 8 开窗
WindowedStream<ProductStats, Long, TimeWindow> windowDS = keyedDS.window(TumblingEventTimeWindows.of(Time.seconds(10)));// TODO 9 聚合计算
SingleOutputStreamOperator<ProductStats> reduceDS = windowDS.reduce(new ReduceFunction<ProductStats>() {@Overridepublic ProductStats reduce(ProductStats stats1, ProductStats stats2) throws Exception {stats1.setDisplay_ct(stats1.getDisplay_ct() + stats2.getDisplay_ct());stats1.setClick_ct(stats1.getClick_ct() + stats2.getClick_ct());stats1.setCart_ct(stats1.getCart_ct() + stats2.getCart_ct());stats1.setFavor_ct(stats1.getFavor_ct() + stats2.getFavor_ct());stats1.setOrder_amount(stats1.getOrder_amount().add(stats2.getOrder_amount()));stats1.getOrderIdSet().addAll(stats2.getOrderIdSet());stats1.setOrder_ct(stats1.getOrderIdSet().size() + 0L);stats1.setOrder_sku_num(stats1.getOrder_sku_num() + stats2.getOrder_sku_num());stats1.setPayment_amount(stats1.getPayment_amount().add(stats2.getPayment_amount()));stats1.getRefundOrderIdSet().addAll(stats2.getRefundOrderIdSet());stats1.setRefund_order_ct(stats1.getRefundOrderIdSet().size() + 0L);stats1.setRefund_amount(stats1.getRefund_amount().add(stats2.getRefund_amount()));stats1.getPaidOrderIdSet().addAll(stats2.getPaidOrderIdSet());stats1.setPaid_order_ct(stats1.getPaidOrderIdSet().size() + 0L);stats1.setComment_ct(stats1.getComment_ct() + stats2.getComment_ct());stats1.setGood_comment_ct(stats1.getGood_comment_ct() + stats2.getGood_comment_ct());return stats1;}},new ProcessWindowFunction<ProductStats, ProductStats, Long, TimeWindow>() {@Overridepublic void process(Long aLong, Context context, Iterable<ProductStats> elements, Collector<ProductStats> out) throws Exception {for (ProductStats productStats : elements) {productStats.setStt(DateTimeUtil.toYMDHMS(new Date(context.window().getStart())));productStats.setEdt(DateTimeUtil.toYMDHMS(new Date(context.window().getEnd())));productStats.setTs(new Date().getTime());out.collect(productStats);}}}
);

5 补充商品维度信息

因为除了下单操作之外,其它操作,只获取到了商品的id,其它维度信息是没有的。

(1)关联商品维度

SingleOutputStreamOperator<ProductStats> productStatsWithSkuDS = AsyncDataStream.unorderedWait(reduceDS,new DimAsyncFunction<ProductStats>("DIM_SKU_INFO") {@Overridepublic void join(ProductStats productStats, JSONObject dimJsonObj) throws Exception {productStats.setSku_name(dimJsonObj.getString("SKU_NAME"));productStats.setSku_price(dimJsonObj.getBigDecimal("PRICE"));productStats.setCategory3_id(dimJsonObj.getLong("CATEGORY3_ID"));productStats.setSpu_id(dimJsonObj.getLong("SPU_ID"));productStats.setTm_id(dimJsonObj.getLong("TM_ID"));}@Overridepublic String getKey(ProductStats productStats) {return productStats.getSku_id().toString();}},60, TimeUnit.SECONDS
);

(2)关联SPU维度

SingleOutputStreamOperator<ProductStats> productStatsWithSpuDS =AsyncDataStream.unorderedWait(productStatsWithSkuDS,new DimAsyncFunction<ProductStats>("DIM_SPU_INFO") {@Overridepublic void join(ProductStats productStats, JSONObject jsonObject) throws Exception {productStats.setSpu_name(jsonObject.getString("SPU_NAME"));}@Overridepublic String getKey(ProductStats productStats) {return String.valueOf(productStats.getSpu_id());}}, 60, TimeUnit.SECONDS);

(3)关联品类维度

SingleOutputStreamOperator<ProductStats> productStatsWithCategory3DS =AsyncDataStream.unorderedWait(productStatsWithSpuDS,new DimAsyncFunction<ProductStats>("DIM_BASE_CATEGORY3") {@Overridepublic void join(ProductStats productStats, JSONObject jsonObject) throws Exception {productStats.setCategory3_name(jsonObject.getString("NAME"));}@Overridepublic String getKey(ProductStats productStats) {return String.valueOf(productStats.getCategory3_id());}}, 60, TimeUnit.SECONDS);

(4)关联品牌维度

SingleOutputStreamOperator<ProductStats> productStatsWithTmDS =AsyncDataStream.unorderedWait(productStatsWithCategory3DS,new DimAsyncFunction<ProductStats>("DIM_BASE_TRADEMARK") {@Overridepublic void join(ProductStats productStats, JSONObject jsonObject) throws Exception {productStats.setTm_name(jsonObject.getString("TM_NAME"));}@Overridepublic String getKey(ProductStats productStats) {return String.valueOf(productStats.getTm_id());}}, 60, TimeUnit.SECONDS);productStatsWithTmDS.print(">>>");

(5)测试

  • 运行rt_applog目录下的jar包。

  • 运行rt_dblog目录下的jar包,执行两次以改变水位线,触发窗口提交操作。

6 写入ClickHouse

(1)在ClickHouse中创建商品主题宽表

create table product_stats_2022 (stt DateTime,edt DateTime,sku_id  UInt64,sku_name String,sku_price Decimal64(2),spu_id UInt64,spu_name String ,tm_id UInt64,tm_name String,category3_id UInt64,category3_name String ,display_ct UInt64,click_ct UInt64,favor_ct UInt64,cart_ct UInt64,order_sku_num UInt64,order_amount Decimal64(2),order_ct UInt64 ,payment_amount Decimal64(2),paid_order_ct UInt64,refund_order_ct UInt64,refund_amount Decimal64(2),comment_ct UInt64,good_comment_ct UInt64 ,ts UInt64
)engine =ReplacingMergeTree( ts)partition by  toYYYYMMDD(stt)order by   (stt,edt,sku_id );

(2)为主程序增加写入ClickHouse的Sink

// TODO 11 将结果写入到ClickHouse
productStatsWithTmDS.addSink(ClickhouseUtil.getJdbcSink("insert into product_stats_2022 values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
);

(3)整体测试

  • 启动ZK、Kafka、logger.sh、ClickHouse、Redis、HDFS、Hbase、Maxwell
  • 运行BaseLogApp
  • 运行BaseDBApp
  • 运行OrderWideApp
  • 运行PaymentWideApp
  • 运行ProductsStatsApp
  • 运行rt_applog目录下的jar包
  • 运行rt_dblog目录下的jar包
  • 查看控制台输出
  • 查看ClickHouse中products_stats_2022表数据

注意:一定要匹配两个数据生成模拟器的日期,否则窗口无法匹配上。

二 DWS层-地区主题表(FlinkSQL)

统计主题 需求指标 输出方式 计算来源 来源层级
地区 pv 多维分析 page_log直接可求 dwd
uv 多维分析 需要用page_log过滤去重 dwm
下单(单数,金额) 可视化大屏 订单宽表 dwm

地区主题主要是反映各个地区的销售情况。从业务逻辑上地区主题比起商品更加简单,业务逻辑也没有什么特别的就是做一次轻度聚合然后保存,所以使用flinkSQL,来完成该业务。

1 需求分析与思路

  • 定义Table流环境
  • 把数据源定义为动态表
  • 通过SQL查询出结果表
  • 把结果表转换为数据流
  • 把数据流写入目标数据库

如果是Flink官方支持的数据库,也可以直接把目标数据表定义为动态表,用insert into 写入。由于ClickHouse目前官方没有支持的jdbc连接器(目前支持Mysql、 PostgreSQL、Derby)。也可以制作自定义sink,实现官方不支持的连接器。但是比较繁琐。

2 在pom.xml文件中添加FlinkSQL相关依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.version}</artifactId><version>${flink.version}</version>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.version}</artifactId><version>${flink.version}</version>
</dependency>

3 创建ProvinceStatsSqlApp,定义Table流环境

package com.hzy.gmall.realtime.app.dws;
/*** 地区主题统计 -- SQL*/
public class ProvinceStatsSqlApp {public static void main(String[] args) throws Exception {// TODO 1 环境准备// 1.1 流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 1.2 表执行环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 1.3 设置并行度env.setParallelism(4);// TODO 2 检查点相关设置(略)env.execute();}
}

4 MyKafkaUtil增加一个DDL的方法

public static String getKafkaDDL(String topic,String groupId){String ddl = "'connector' = 'kafka'," +"  'topic' = '"+topic+"'," +"  'properties.bootstrap.servers' = '"+KAFKA_SERVER+"'," +"  'properties.group.id' = '"+groupId+"'," +"  'scan.startup.mode' = 'latest-offset'," +"  'format' = 'json'";return ddl;
}

5 把数据源定义为动态表并指定水位线

(1)指定WATERMARK

WATERMARK 定义了表的事件时间属性,其形式为 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

rowtime_column_name 把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。相当于是API格式中的提取时间字段操作。

watermark_strategy_expression 定义了 watermark 的生成策略。它允许使用包括计算列在内的任意非查询表达式来计算 watermark ;表达式的返回类型必须是 TIMESTAMP(3),表示了从 Epoch 以来的经过的时间。 返回的 watermark 只有当其不为空且其值大于之前发出的本地 watermark 时才会被发出(以保证 watermark 递增)。每条记录的 watermark 生成表达式计算都会由框架完成。 框架会定期发出所生成的最大的 watermark ,如果当前 watermark 仍然与前一个 watermark 相同、为空、或返回的 watermark 的值小于最后一个发出的 watermark ,则新的 watermark 不会被发出。 Watermark 根据 pipeline.auto-watermark-interval 中所配置的间隔发出。 若 watermark 的间隔是 0ms ,那么每条记录都会产生一个 watermark,且 watermark 会在不为空并大于上一个发出的 watermark 时发出。

使用事件时间语义时,表必须包含事件时间属性和 watermark 策略。

Flink 提供了几种常用的 watermark 策略。

  • 严格递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column

    发出到目前为止已观察到的最大时间戳的 watermark ,时间戳大于最大时间戳的行被认为没有迟到。

  • 递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND

    发出到目前为止已观察到的最大时间戳减 1 的 watermark ,时间戳大于或等于最大时间戳的行被认为没有迟到。相当于是API格式中的单调递增策略。

  • 有界乱序时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit

    发出到目前为止已观察到的最大时间戳减去指定延迟的 watermark ,例如, WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND 是一个 5 秒延迟的 watermark 策略。

其中WATERMARK FOR rowtime AS rowtime是把某个字段设定为EVENT_TIME。

详细说明。

(2)系统内置函数

将字符串转换为时间戳。

TO_TIMESTAMP(string1[, string2]) Converts date time string string1 with format string2 (by default: ‘yyyy-MM-dd HH:mm:ss’) under the session time zone (specified by TableConfig) to a timestamp.Only supported in blink planner.

详细说明。

(3)给计算列起别名

<computed_column_definition>:column_name AS computed_column_expression [COMMENT column_comment]

(4)完整代码

字段名要和kafka中的json属性完全一致。

// TODO 3 从指定的数据源(kafka)读取数据,转换为动态表,并指定水位线
String orderWideTopic = "dwm_order_wide";
String groupId = "province_stats";
tableEnv.executeSql("CREATE TABLE order_wide (" +" province_id BIGINT," +" province_name STRING," +" province_area_code STRING," +" province_iso_code STRING," +" province_3166_2_code STRING," +" order_id STRING," +" split_total_amount DOUBLE," +" create_time STRING," +" rowtime as TO_TIMESTAMP(create_time)," +" WATERMARK FOR rowtime AS rowtime - INTERVAL '3' SECOND" +" ) WITH (" + MyKafkaUtil.getKafkaDDL(orderWideTopic,groupId) +")");

【实时数仓】DWS层之商品主题计算、地区主题表(FlinkSQL)相关推荐

  1. Flink SQL搭建实时数仓DWD层

    1.实时数仓DWD层 DWD是明细数据层,该层的表结构和粒度与原始表保持一致,不过需要对ODS层数据进行清洗.维度退化.脱敏等,最终得到的数据是干净的,完整的.一致的数据. (1)对用户行为数据解析. ...

  2. 首汽约车驶向极速统一之路!出行平台如何基于StarRocks构建实时数仓?

    作者:王满,高级数据架构工程师 首汽约车(以下简称 "首约")是首汽集团为响应交通运输部号召,积极拥抱互联网,推动传统出租车行业转型升级,加强建设交通强国而打造的网约车出行平台. ...

  3. 美团点评基于 Flink 的实时数仓平台实践

    摘要:数据仓库的建设是"数据智能"必不可少的一环,也是大规模数据应用中必然面临的挑战,而 Flink 实时数仓在数据链路中扮演着极为重要的角色.本文中,美团点评高级技术专家鲁昊为大 ...

  4. 【实时数仓】DWS层访客主题计算(续)、商品主题计算

    文章目录 一 DWS层-访客主题计算 1 写入OLAP数据库 (1)增加ClickhouseUtil a JdbcSink.<T>sink( )的四个参数说明 b ClickhouseUt ...

  5. 【实时数仓】DWS层的定位、DWS层之访客主题计算(PV、UV、跳出次数、计入页面数、连续访问时长)

    文章目录 一 DWS层与DWM层的设计 1 设计思路 2 需求梳理 3 DWS层定位 二 DWS层-访客主题计算 1 需求分析与思路 2 功能实现 (1)封装VisitorStatsApp,读取Kaf ...

  6. 数据仓库—stg层_数据仓库之Hive快速入门 - 离线实时数仓架构

    数据仓库VS数据库 数据仓库的定义: 数据仓库是将多个数据源的数据经过ETL(Extract(抽取).Transform(转换).Load(加载))理之后,按照一定的主题集成起来提供决策支持和联机分析 ...

  7. 【实时数仓】DWD层需求分析及实现思路、idea环境搭建、实现DWD层处理用户行为日志的功能

    文章目录 一 DWD层需求分析及实现思路 1 分层需求分析 2 每层的职能 3 DWD层职能详细介绍 (1)用户行为日志数据 (2)业务数据 4 DWD层数据准备实现思路 二 环境搭建 1 创建mav ...

  8. 华为云发布 GaussDB(DWS) 实时数仓,技术创新释放行业数据价值

    8 月 31 日,在华为云 TechWave 大数据专题日上,华为云发布了 GaussDB(DWS)实时数仓,工商银行.广东移动.清华大学等分享了大数据技术创新及应用实践. 围绕数据全生命周期提供整体 ...

  9. 数据查询和业务流分开_数据仓库介绍与实时数仓案例

    1.数据仓库简介 数据仓库是一个面向主题的(Subject Oriented).集成的(Integrate).相对稳定的(Non-Volatile).反映历史变化(Time Variant)的数据集合 ...

  10. 如果你也想做实时数仓…

    数据仓库也是公司数据发展到一定规模后必然会提供的一种基础服务,数据仓库的建设也是"数据智能"中必不可少的一环. 本文将从数据仓库的简介.经历了怎样的发展.如何建设.架构演变.应用案 ...

最新文章

  1. The requested profile “pom.xml“ could not be activated because it does not exist. 解决方法
  2. 药师帮完成1.33亿美元D轮融资,投资方为老虎环球基金、H Capital和DCM
  3. oracle修改filesystem,(转):oracle、filesystem、backup日常巡检脚本
  4. python字符编码与转码
  5. firefox flash插件_巧用firefox下载视频资源
  6. 解决service iptables save出错please try to use systemctl.
  7. visual svn 搭建
  8. 深度学习自学(三十六):ABCNet实时自适应贝塞尔曲线场景文字检测识别网络
  9. Essential Netty in Action 《Netty 实战(精髓)》
  10. Windows7轻松升级至专业版、旗舰版
  11. LTE网络CQI机制
  12. 使用 Shiro 配合微信小程序或者app登录,做验权
  13. 基于Linux IIO接口的波形采集
  14. 深度学习——人工神经网络中为什么ReLu要好过于tanh和sigmoid function?
  15. The way to Go 要点知识
  16. 笔记之Python网络数据采集
  17. 苹果cmsv8巴黎影视网站模板蓝色风格免费主题
  18. 计算机坤论文题目,计算机毕业论文参考文献分享
  19. ICA 分类语音分离
  20. 24 基于单片机空气PM2.5浓度粉尘颗粒物检测系统设计

热门文章

  1. 【Apache之 Karaf 介绍】
  2. 上决╇ф的遗言-后缀数组
  3. vue与java连接的url_Vue路由器链接在URL中添加对象
  4. 补单平台源码发布,支持演示
  5. 外贸采购管理对业务的影响及解决方案
  6. SSM实战-外卖项目-06-用户地址簿功能、菜品展示、购物车、下单(一个业务涉及5张表)
  7. Ubuntu上安装运行 rotated_maskrcnn
  8. RecyclerView实现Item居中效果(仿猫眼美团电影选择效果)
  9. 原生JS实现拖拽翻书特效
  10. C#应用Windows服务