用一个接地气的案例来介绍如何实时计算 UV 数据。大家都知道,在 ToC 的互联网公司,UV 是一个很重要的指标,对于老板、商务、运营的及时决策会产生很大的影响,笔者在电商公司,目前主要的工作就是计算 UV、销售等各类实时数据,体验就特别深刻, 因此就用一个简单demo 演示如何用 Flink SQL 消费 Kafka 中的 PV 数据,实时计算出 UV 指标后写入 Hbase。

Kafka 源数据解析输入标题

PV 数据来源于埋点数据经 FileBeat 上报清洗后,以 ProtoBuffer 格式写入下游 Kafka,消费时第一步要先反序列化 PB 格式的数据为 Flink 能识别的 Row 类型,因此也就需要自定义实现 DeserializationSchema 接口,具体如下代码, 这里只抽取计算用到的 PV 的 mid、事件时间 time_local,并从其解析得到 log_date 字段:

public class PageViewDeserializationSchema implements DeserializationSchema<Row> {public static final Logger LOG = LoggerFactory.getLogger(PageViewDeserializationSchema.class);protected SimpleDateFormat dayFormatter;private final RowTypeInfo rowTypeInfo;public PageViewDeserializationSchema(RowTypeInfo rowTypeInfo){dayFormatter = new SimpleDateFormat("yyyyMMdd", Locale.UK);this.rowTypeInfo = rowTypeInfo;}@Overridepublic Row deserialize(byte[] message) throws IOException {Row row = new Row(rowTypeInfo.getArity());MobilePage mobilePage = null;try {mobilePage = MobilePage.parseFrom(message);String mid = mobilePage.getMid();row.setField(0, mid);Long timeLocal = mobilePage.getTimeLocal();String logDate = dayFormatter.format(timeLocal);row.setField(1, logDate);row.setField(2, timeLocal);}catch (Exception e){String mobilePageError = (mobilePage != null) ? mobilePage.toString() : "";LOG.error("error parse bytes payload is {}, pageview error is {}", message.toString(), mobilePageError, e);}return null;}

编写 Flink Job 主程序输入标题

将 PV 数据解析为 Flink 的 Row 类型后,接下来就很简单了,编写主函数,写 SQL 就能统计 UV 指标了,代码如下:

public class RealtimeUV {public static void main(String[] args) throws Exception {//step1 从properties配置文件中解析出需要的Kakfa、Hbase配置信息、checkpoint参数信息Map<String, String> config = PropertiesUtil.loadConfFromFile(args[0]);String topic = config.get("source.kafka.topic");String groupId = config.get("source.group.id");String sourceBootStrapServers = config.get("source.bootstrap.servers");String hbaseTable = config.get("hbase.table.name");String hbaseZkQuorum = config.get("hbase.zk.quorum");String hbaseZkParent = config.get("hbase.zk.parent");int checkPointPeriod = Integer.parseInt(config.get("checkpoint.period"));int checkPointTimeout = Integer.parseInt(config.get("checkpoint.timeout"));StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();//step2 设置Checkpoint相关参数,用于Failover容错sEnv.getConfig().registerTypeWithKryoSerializer(MobilePage.class,ProtobufSerializer.class);sEnv.getCheckpointConfig().setFailOnCheckpointingErrors(false);sEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);sEnv.enableCheckpointing(checkPointPeriod, CheckpointingMode.EXACTLY_ONCE);sEnv.getCheckpointConfig().setCheckpointTimeout(checkPointTimeout);sEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//step3 使用Blink planner、创建TableEnvironment,并且设置状态过期时间,避免Job OOMEnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv, environmentSettings);tEnv.getConfig().setIdleStateRetentionTime(Time.days(1), Time.days(2));Properties sourceProperties = new Properties();sourceProperties.setProperty("bootstrap.servers", sourceBootStrapServers);sourceProperties.setProperty("auto.commit.interval.ms", "3000");sourceProperties.setProperty("group.id", groupId);//step4 初始化KafkaTableSource的Schema信息,笔者这里使用register TableSource的方式将源表注册到Flink中,而没有用register DataStream方式,也是因为想熟悉一下如何注册KafkaTableSource到Flink中TableSchema schema = TableSchemaUtil.getAppPageViewTableSchema();Optional<String> proctimeAttribute = Optional.empty();List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors = Collections.emptyList();Map<String, String> fieldMapping = new HashMap<>();List<String> columnNames = new ArrayList<>();RowTypeInfo rowTypeInfo = new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames());columnNames.addAll(Arrays.asList(schema.getFieldNames()));columnNames.forEach(name -> fieldMapping.put(name, name));PageViewDeserializationSchema deserializationSchema = new PageViewDeserializationSchema(rowTypeInfo);Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();Kafka011TableSource kafkaTableSource = new Kafka011TableSource(schema,proctimeAttribute,rowtimeAttributeDescriptors,Optional.of(fieldMapping),topic,sourceProperties,deserializationSchema,StartupMode.EARLIEST,specificOffsets);tEnv.registerTableSource("pageview", kafkaTableSource);//step5 初始化Hbase TableSchema、写入参数,并将其注册到Flink中HBaseTableSchema hBaseTableSchema = new HBaseTableSchema();hBaseTableSchema.setRowKey("log_date", String.class);hBaseTableSchema.addColumn("f", "UV", Long.class);HBaseOptions hBaseOptions = HBaseOptions.builder().setTableName(hbaseTable).setZkQuorum(hbaseZkQuorum).setZkNodeParent(hbaseZkParent).build();HBaseWriteOptions hBaseWriteOptions = HBaseWriteOptions.builder().setBufferFlushMaxRows(1000).setBufferFlushIntervalMillis(1000).build();HBaseUpsertTableSink hBaseSink = new HBaseUpsertTableSink(hBaseTableSchema, hBaseOptions, hBaseWriteOptions);tEnv.registerTableSink("uv_index", hBaseSink);//step6 实时计算当天UV指标sql, 这里使用最简单的group by agg,没有使用minibatch或窗口,在大数据量优化时最好使用后两种方式String uvQuery = "insert into uv_index "+ "select log_date,\n"+ "ROW(count(distinct mid) as UV)\n"+ "from pageview\n"+ "group by log_date";tEnv.sqlUpdate(uvQuery);//step7 执行JobsEnv.execute("UV Job");}
}

以上就是一个简单的使用 Flink SQL 统计 UV 的 case, 代码非常简单,只需要理清楚如何解析 Kafka 中数据,如何初始化 Table Schema,以及如何将表注册到 Flink中,即可使用 Flink SQL 完成各种复杂的实时数据统计类的业务需求,学习成本比API 的方式低很多。说明一下,笔者这个 demo 是基于目前业务场景而开发的,在生产环境中可以真实运行起来,可能不能拆箱即用,你需要结合自己的业务场景自定义相应的 kafka 数据解析类。

Flink SQL 实时计算UV指标相关推荐

  1. 实战 | flink sql 实时 TopN

    实战 | flink sql 实时 TopN 1.背景篇 2.难点剖析篇-此类指标建设.保障的难点 2.1.数据建设 2.2.数据保障 2.3.数据服务保障 3.数据建设篇-具体实现方案详述 3.1. ...

  2. 日均百亿级日志处理:微博基于Flink的实时计算平台建设

    来自:DBAplus社群 作者介绍 吕永卫,微博广告资深数据开发工程师,实时数据项目组负责人. 黄鹏,微博广告实时数据开发工程师,负责法拉第实验平台数据开发.实时数据关联平台.实时算法特征数据计算.实 ...

  3. 基于Flink打造实时计算平台为企业赋能

    点击上方 "zhisheng"关注, 星标或置顶一起成长 Flink 从入门到精通 系列文章 https://zhuanlan.zhihu.com/p/143169143 随着互联 ...

  4. 基于 Apache Flink 的实时计算数据流业务引擎在京东零售的实践和落地

    摘要:本文整理自京东零售-技术研发与数据中心张颖&闫莉刚在 ApacheCon Asia 2022 的分享.内容主要包括五个方面: 京东零售实时计算的现状 实时计算框架 场景优化:TopN 场 ...

  5. Flink SQL 实时大屏(实时查询存量数据-批转流)

    最近接到一个需求,关于flink实时大屏需求.每半小时展示历史每天当前半小时(每天00:00:00-00:30:00之间) 的数据的最大值.最小值.中位数.上四分位数.下四分位数. 需求描述 每半小时 ...

  6. 汽车之家基于 Flink 的实时计算平台 3.0 建设实践

    ▼ 关注「Apache Flink」,获取更多技术干货 ▼ 摘要:本文整理自汽车之家实时计算平台负责人邸星星在 Flink Forward Asia 2021 平台建设专场的演讲.主要内容包括: 应用 ...

  7. 【Flink】基于 Flink CEP 实时计算商品订单流失量

    1.概述 转载:https://blog.csdn.net/tzs_1041218129/article/details/108786597 假设有个需求需要实时计算商品的订单流失量,规则如下: 用户 ...

  8. 大数据是如何基于 Flink 进行实时计算的?

    Flink 因天然流式计算特性及强大处理性能,成为炙手可热的大数据处理框架,在 BAT.头条.顺丰等国内头部公司都有其相关应用. 眼下,Flink 可以说是 DT 时代程序员的加分项,更是大数据开发求 ...

  9. Flink SQL 流计算可视化 UI 平台

    点击上方 "zhisheng"关注, 星标或置顶一起成长 Flink 从入门到精通 系列文章 一.简介 flink-streaming-platform-web系统是基于flink ...

最新文章

  1. ECCV 2020 | 首届GigaVision挑战赛揭榜,双赛道冠军技术干货分享
  2. 深度学习入门之PyTorch学习笔记
  3. win10 uwp 改变鼠标
  4. 如何不让你的APP在模拟器中运行。
  5. 给ApplicationContext容器中添加组件的方法(@Bean的使用)
  6. 对话周鸿祎:从程序员创业谈起
  7. 手部精细动作有哪些_3-6岁手部精细动作训练游戏!促进孩子大脑发育
  8. 阶段1 语言基础+高级_1-3-Java语言高级_09-基础加强_第2节 反射_8_反射_Class对象功能_获取Field...
  9. 好玩的WPF第二弹:电子表字体显示时间+多彩呼吸灯特效按钮
  10. 2021金三银四面试季!mysql下载安装教程5.7.27
  11. xp系统打印机服务器报错,WinXP系统打印机显示Spoolsv.exe 应用程序错误的解决方法...
  12. 涛涛的若依学习笔记——登录
  13. 图漾科技招聘|机器视觉算法、嵌入式驱动开发高级工程师等岗位
  14. 三维旋转矩阵;东北天坐标系(ENU);地心地固坐标系(ECEF);大地坐标系(Geodetic);经纬度对应圆弧距离
  15. memset(G, 0x3f, sizeof(G))涵义
  16. 假如时光可以倒流……
  17. 钓鱼网站攻击量增近九成
  18. python:pygame的各种使用方法
  19. CSS和js和HTML一起做出网页版别踩白块游戏
  20. [架构设计] 创建型模型

热门文章

  1. QT QSpinBox 整数计数器控件 使用详解
  2. python之psutil库的简单使用:监控Linux系统性能
  3. 企业发放的奖金根据利润提成。 利润(i)低于或等于10万元时,奖金可提10%; 利润高于10万元,低于20万元时,低于10万元的部分按10%提成,高于10万元的部分,可提成7.5%; 2
  4. bugku~Misc~三色绘恋
  5. APP 开发中的费用一般用在哪里?
  6. 数据库设计之反规范化
  7. httpclient发送Get请求和Post请求
  8. 文件加密软件的局限性都有哪些呢,用户知道吗?
  9. 详解概率论基础: 从贝叶斯开始
  10. Machine Learning for Technical Debt Identification