关注交流微信公众号:小满锅

背景

前几天思考了一个问题,在很多业务场景下,需要关注流量的来源或是某个业务哪个入口的流量最大,带来的效益最多,那么就涉及到流量的归因了。比如说,我是一个bilibili up主,那么我想知道我的某个视频到底是首页推荐的流量比较多,还是用户搜索带来的比较多。我觉得得分为两种情况

  • 应用埋点质量非常差的情况下,那么在一些APP或者H5发展之初,是不会太去注重埋点的质量,当流量密码时代到来了,才发现这是一个风口,埋点标准化改造就是一个必不可少的环节。那么在改造之前,就只能靠数据自己去归因,即按照时间窗口,根据用户的行为顺序将用户的行为串起来进行归因。
  • 应用埋点质量非常好的情况下,这种就是埋点改造之后了,大量的用户信息可以在埋点中直接的体现,比如上面说的归因,埋点本身可以记录用户的行为路径的。

微信公众号:小满锅

归因实现方案

离线

针对归因的离线方案,现如今已经有很多的窗口实现方式。这里简单列个伪代码。

-- 通过模拟session的方式,对用户的行为进行归因。即对同一个连续会话窗口的KEY排序,然后归到一个元素上。
SELECTFIRST_VALUE(refer) OVER(PARTITION BY KEY,session ORDER BY logtime) as refer
FROM
(SELECTSUM(IF(logtime - LAG(logtime)>1000, 1, 0)) OVER(PARTITION BY KEY ORDER BY logtime) as session -- 日志相差太大时间就是一个新的sessionFROM [表名]WHERE [过滤条件]
) t

实时

实时场景其实也可以类似的实现,但是要略微做一下修改。假设B站的分享没有带归因,但是播放带了,那我们需要看分享的来源的时候,就需要和播放归因。那由于Kakfa只能保证每个PARTITION写入的时间是顺序的,不能保证写入的logtime是顺序的(因为客户端时间无法保证一定是当前时间,某些用户设置当前时间,有的可能设置未来时间了),那么在DOAWNSTREAM去处理时,我们需要对SHARE和PLAY先LEFT JOIN,然后排序,LEFT JOIN是为了实现离线的某个action的过滤(就是说离线归因对SHARE和PLAY排序,最终选择action=share即可),LEFT JOIN之后的结果就是说我的某个share可能来源于多个播放,这时候要根据不同业务场景去判断留下哪一个。

-- 代码实现
/*
share流和play流需要自己在自己平台提前配置好watermark和eventtime
**/-- 先创建一个LEFT JOIN之后的视图
CREATE VIEW share_join_play AS
(
SELECTshare.*,play.*
FROM share
LEFT JOIN play
ON(share.userid = play.useridAND share.deviceid = play.deviceidAND share.os = play.osAND share.os_ver = play.os_verAND share.app_ver = play.app_verAND share.resource_type = play.resource_typeAND share.resource_id = play.resource_idAND share.eventtime between play.eventtime - INTERVAL '15' MINUTE AND share.eventtime +INTERVAL '15' MINUTE -- 关联share前后十分钟的播放即可-- 关联10min后是为了避免埋点在某些情况下,前面的播放埋点可能没有上报或者出错,那就往后面这个资源的播放归,因为某些情况下可能用户没播放就开始分享。这个窗口的逻辑根据自己的业务场景去顶。
);-- 根据JOIN之后的视图进行排序
CREATE VIEW share_attr_play AS
(SELECTshare_referFROM(SELECTplay.refer AS share_refer, -- 将JOIN出来的所有可能refer,拿出来作为share的refer-- 优先分享前的播放归因,且优先最先的播放归因。这个根据自己业务需求可以定义多个ROW_NUMBERROW_NUMBER() OVER(PARTITION BY dt,userid,deviceid,os,os_ver,app_ver,resource_type,resource_idORDER BY IF(play.logtime - share.logtime < 0, 1, 0) DESC, ABS(play.logtime - share.logtime)) AS rnFROM share_join_play) twhere rn = 1 -- 取第一个
);-- 然后根据某些KEY去统计 这里统计每一天每个资源id的各个分享归因的次数
SELECTshare_refer, dt, resource_id, sum(1) as pv
FRON share_attr_play
GROUP BY share_refer, dt, resource_id
踩坑一

数据流的过程如下图,其中的结果可以尝试开启MiniBatch优化,和Local-GlobalAggr优化。
前面JOIN都没问题,直到RANK出来的结果,它是一个Retrace流,因为我们的数据有先来后到,日志时间控制不了的,比如有两条JOIN结果A和B,其中A的PLAY比SHARE时间早1秒,但是B的早5秒,由于数据延迟的原因,这个B数据可能晚来,那么在B来之前,这个RANK发到下游的结果应该是归到了A,所以下发了一个INSERT A。一旦B来了,那么就会下发DELETE A和INSERT B。这时候如果直接写入外部存储就会有问题。而GROUP BY恰好可以处理这种流,不过同样的,它下发的仍然是Retract流,一种是INSERT,一种是DELETE。
踩坑一:就是这里,写入的外部存储数据有问题,两种流不好区分。这里有两种处理方式

  • 第一种是如果Flink本身支持识别这种的INSERT和DELETE流的话,可以再group by time window,每隔一分钟计算一次,DELETE代表-,INSERT代表+,然后sum一下。
  • 第二种就是Flink平台自己将DELETE过滤掉,使用一种主键更新SET的外部存储,在这种情况下,每一个KEY只会由Flink的一个PARTITION发出,由它INSERT到外部主键更新的存储中,以一种覆盖的操作代替Flink的DELETE流,并且这个INSERT流是正常的累加结果。

踩坑二

主要体现在JOIN的那个地方。经过仔细排查,发现INTERVAL 15 MINUTE的时间窗口貌似没有将过期数据处理掉,由于GROUP BY需要一整天的状态,因此我设置了table.exec.state.ttl为24h,这样貌似导致join的窗口过期也时效了。导致一个没清理。
我思考了两种方式,一种是去掉table.exec.state.ttl参数,但是不清楚Group by和rank时状态啥时候清理,目前没有明确的说法。
于是我采用第二种,将两算子拆分开来,JOIN任务照常计算。RANK和Group by仍然加上table.exec.state.ttl自己手动控制状态过期清理。

踩坑三

JOIN任务JOIN不出来任何结果,发现三种不符合预期的监控表现。

  • 查看拓扑图,发现watermark已经超出当前时间了。
  • 查看监控输出QPS,一条也关联不上(输出QPS是0)。
  • 状态很快就被清理了。正常情况,双流都会保存下来30min,来等待迟到的数据。
    也就是说,在某个时候,他们都出现了未来时间,这样每个并行度发出的watermark可能正好都是未来时间,这样到了IntervalJoin时,watermark接收到的是未来时间戳,那么仔细翻查源码后发现,会根据日志时间判断是否小于Low WaterMark,如果小于就不会进行处理,并且清除掉存储在状态里面的Key。这样就解释的通。

解决方式:和业务沟通,需要将未来时间设置成当前时间,这样任务就能够正常跑了。

【Flink】实时归因场景踩坑相关推荐

  1. 微信小程序 -- 原生JS集成腾讯IM实时聊天/实时音视频(踩坑及心得)

    原生JS集成腾讯IM实时聊天/实时音视频对话功能 一.腾讯IM集成 前期准备 实例创建及初始化 IM登录 收发消息 二.腾讯音视频实时互动 跑通demo 三.同时集成即时通讯IM 和 音视频直播的 坑 ...

  2. 【Flink实时数仓踩坑记录】Recovery is suppressed by NoRestartBackoffTimeStrategy

    在编写FlinkCDC时,运行出现了Recovery is suppressed by NoRestartBackoffTimeStrategy这个错误, 网上搜了很多解决方案,都不奏效,后来偶然发现 ...

  3. 传统金融业务场景下Flink实时计算的探索与实践? by鸣宇淳

    超长文警告!本文7000字,含架构图和各种解决方案的尝试,以及详细代码.,最后还有电子书和各种分享ppt下载,请在wifi下观看.土豪随意 我是鸣宇淳,一个大数据架构师.今天给大家分享一下我在传统金融 ...

  4. 东八区转为0时区_踩坑记 | Flink 天级别窗口中存在的时区问题

    ❝ 本系列每篇文章都是从一些实际的 case 出发,分析一些生产环境中经常会遇到的问题,抛砖引玉,以帮助小伙伴们解决一些实际问题.本文介绍 Flink 时间以及时区问题,分析了在天级别的窗口时会遇到的 ...

  5. 从0.3开始搭建LeGO-LOAM+VLP雷达+小车实时建图(保姆级教程,小白踩坑日记)

    背景:SLAM小白,因为项目需要花了两天时间编译代码+连接雷达实现了交互. 踩了很多坑,简单记录一下,让后面感兴趣的朋友少走点弯路~ 肯定有很多不专业的.错误的地方,还请大家不吝赐教(噗通) 也可以见 ...

  6. Flink在快手实时多维分析场景的应用

    导读:作为短视频分享跟直播的平台,快手有诸多业务场景应用了 Flink,包括短视频.直播的质量监控.用户增长分析.实时数据处理.直播 CDN 调度等.此次主要介绍在快手使用 Flink 在实时多维分析 ...

  7. 实时计算 Flink 版应用场景解读

    简介:本文由阿里巴巴高级产品专家陈守元老师分享,详细讲解实时计算 Flink 的具体业务场景并分享实时计算 Flink 的相关应用案例. 作者:陈守元(巴真),阿里巴巴高级产品专家 摘要:本文由阿里巴 ...

  8. Niubility (分享一) Flink 在快手实时多维分析场景的应用

    摘要:作为短视频分享跟直播的平台,快手有诸多业务场景应用了 Flink,包括短视频.直播的质量监控.用户增长分析.实时数据处理.直播 CDN 调度等.此次主要介绍在快手使用 Flink 在实时多维分析 ...

  9. Flink 在风控场景实时特征落地实战

    背景介绍 风控简介 二十一世纪,信息化时代到来,互联网行业的发展速度远快于其他行业.一旦商业模式跑通,有利可图,资本立刻蜂拥而至,助推更多企业不断的入场进行快速的复制迭代,企图成为下一个"行 ...

最新文章

  1. VMware虚拟机搭MAC系统
  2. 【五线谱】拍号与音符时值 ( 五线谱拍号 | 全音符休止符 | 二分音符休止符 | 四分音符休止符 | 八分音符休止符 | 十六分音符休止符 | 三十二分音符休止符 )
  3. php mysql 统计_PHP和MySQL实现优化统计每天数据
  4. 去中心化保证金交易平台Lever完成60万美元种子轮融资,NGC Venture等领投
  5. php 绝对刷新,vue项目线上页面刷新报404 解决方法(绝对有用)
  6. 可并堆试水--BZOJ1367: [Baltic2004]sequence
  7. 虚拟化--051 vsphere linux搭建NTP服务器搭建
  8. Diagnostics: File file:/tmp/spark-***/__spark_libs__***.zip does not exist
  9. shell之脚本片断
  10. <Android开发> Android开发工具- 之-I2C TOOLS工具使用
  11. 网页版模仿Excel
  12. html5一个可拖动的图片大小,HTML5画布中的可拖动和可调整大小元素
  13. 互联网广告与计算广告学
  14. R语言获取国内的股票数据
  15. windows光标移动快捷键操作
  16. 基于多项式螺旋曲线的轨迹优化
  17. 主板维修测试软件,主板维修关键测试点(强烈推荐)
  18. 大根堆、小根堆(数组模拟操作)
  19. 北邮+校徽+logo+矢量图+透明
  20. windows下jenkins执行shell报错

热门文章

  1. basic paxos,multi paxos
  2. CnOpenData全国兴趣点(POI)数据
  3. _improve-3
  4. MySQL常用监控指标及监控方法
  5. python控制循环次数_python限制循环次数的方法
  6. 关于防御式编程 (Defensive programming )和安全编码
  7. 在迅捷CAD编辑器中怎么将CAD转换为PDF
  8. Oracle按时间分旬查询
  9. autojspro常用的代码和公共函数搜集整理,史上最全最完整,不看后悔一辈子
  10. 分享白色简约商务个人简历求职PPT模板