1:项目背景

当一个app达到一定的体量,千人千面、个性化营销是每一个app提升留存、付费必备法宝。始终离不开营销利器,用户画像。项目从0到1构建画像体系,由T+1升级为实时。过程中不断的优化画像方案,赋能产品、业务。在个性化营销的路上越来越顺滑。

2:整体的技术方案

1:实时画像通过flink stream job

1: app上报log日志到springboot,springboot 处理相关逻辑到kafka 。

2: db binlog 通过maxwell 发送到kafka。

3:所有的日志都到kafka, 画像、推荐相关的服务不需要消费所有的event。所以需要做一次二次分流降低处理所有消息的压力。我们根据具体的业务场景,把对应需要消费的数据对应的topic配置在db中。flink通过双流join (broadcast) 处理配置信息。

4:项目中把埋点、binlog 分为两个实时流,两个流不操作任何存储。把需要同步的结果发送到kafka. 启动专门的画像同步流到hbase 或es。

5:补充说明一下,我们的画像体系要满足多个app的使用。所以一定需要一个id_mapping 服务。人与设备的关系,设备与人的关系(建议保存最近的两条)。推荐、画像相关的服务需要用到mapping关系。另外appstore也禁用idfa,用户重装、以后设备信息就会发生变化。

FlinkKafkaConsumer<Event> eventConsumer = new FlinkKafkaConsumer<>(Topics.TEST, new EventSchema(), kafkaConsumerProperties);
DataStreamSource<Event> eventStream = env.addSource(eventConsumer);//创建规则
MapStateDescriptor<String, HashMap<String, HashSet<String>>> topicRulesBroadcastState = new MapStateDescriptor<>("topicRulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<HashMap<String, HashSet<String>>>() {}));//EventRedistributeSource extends RichSourceFunction run method ,sleep 1-5分钟同步一次规则
DataStreamSource<List<TopicRedistribute>> topicInfo = env.addSource(new EventRedistributeSource());BroadcastStream<List<TopicRedistribute>> broadcast = topicInfo.broadcast(topicRulesBroadcastState);
KeyedStream<EventResult, String> eventResultStringKeyedStream = eventStream.connect(broadcast).process(new EventRedistributeProcess(topicRulesBroadcastState)).keyBy(new KeySelector<EventResult, String>() {@Overridepublic String getKey(EventResult eventResult) throws Exception {return eventResult.getTopic();}});

2:离线画像通过spark sql

1:所有的业务数据收集通过spark streaming消费kafka存储到 hdfs.

2: 凌晨1点通过dolphinscheduler 调度spark sql 跑所有分层数据。如果app整体的埋点体系构建比较完善,历史遗留无效埋点比较少,可以设置所有埋点都可以上报。如果埋点建设有一定的混乱,建议部分埋点开启。很多无效埋点会占数据块,如果小文件管理不善,可能会导致整个集群宕机的情况。

3:dwd层的数据建议把不同APP,同一个行为的数据按照project,staticdate 时间分区。这样用户同一个行为数据集中在一起,方便后续的etl .

4: dm层的数据建议设置优先级,因为同一层的数据可能存在相互依赖的情况。一定要把数据来源, sink 目标表,ddl,exec_sqls(建议 JsonArray存储),app项目,分区,exec_level(建议越小约优先执行),状态,创建人,时间,修改人,时间(方便以后问题追踪),还可以扩展到kylin,clickhouse 等olap场景是否使用。

5:ads 层的数据基本都是聚合数据,目标数据更新到hbase ,elasticsearch 建议不要通过外表的方式更新。hbase的更新建议通过bulkload更新到hbase, elasticsearch的更新建议到通过spark rdd 通过发送kafka消息更新到画像体系。

6: etl的过程一定要记录一下执行时间,以及失败以后的重试。因为每一层的数据都是层层依赖,不然执行失败以后业务的同学发现数据有问题了。数据的同学再去补充数据,是很尴尬的做法。

for (i <- execSQLList.indices) {var calculateStatus = 0val sql: String = HiveUtils.hiveStaticdateReplace(execSQLList(i), jobParams.staticdate)val start: Long = System.currentTimeMillis()try {spark.sql(sql)calculateStatus = 1} catch {case e: Exception =>calculateStatus = 2throw e} finally {val end: Long = System.currentTimeMillis()updateDwLoadInfo(job.getId, i + 1, start, end, end - start, calculateStatus,job.getCreator)}}

3:画像结果存储问题 hbase& elasticsearch

1:当时做人群画像的时候,没有人群画像的经验。在网上找一些通用化的解决方案,一方面降低技术风险,另外一方面没有独立的服务给业务同学(业务系统有几千的qps),当时不敢大刀阔斧的干。

2:存储在hbase中主要方便实时画像,通过用户id或设备id判断用户属性是否满足规则。

3:  存储在elasticsearch中,一方面通过 elasticsearch-sql 把所有的规则拼装成sql,直接查询画像结果的预估数据,另外一方面我们的画像第一版本是离线的,创建画像结果以后我们通过spark任务,把符合目标的数据存储在redis中。做到与业务系统的解耦,起码不能因为服务问题导致业务系统出现问题。

4:业务人员根据标签选择对应的规则以后,我们通过规则动态拼装成sql 。sql再解析成dsl,最终的结果通过spark job 存储在redis 中。注意elasticsearch 支持elaticsearch-sql 需要安装对应的plugin. 相关细节可以见:https://github.com/NLPchina/elasticsearch-sql

 <dependency><groupId>org.nlpcn</groupId><artifactId>elasticsearch-sql</artifactId><version>7.4.2.0</version></dependency>/*** SQL转换DSL** @param sql* @return* @throws Exception*/private String sqlToEsQuery(String sql) throws Exception {try {Settings settings = Settings.builder().build();ThreadPool threadPool = new ThreadPool(settings);Client client = new NodeClient(settings, threadPool);SearchDao searchDao = new SearchDao(client);return searchDao.explain(sql).explain().explain();} catch (Exception ex) {logger.error("error code  xxx" String.format("error in xxx sqlToEsQuery method: %s ", sql) + ex);throw new Exception(xxx, ex);}}

4:业务系统如何提交spark 任务

1:业务系统封装相关提交参数,http请求通过SparkLauncher 提交spark 任务到yarn 上执行。具体可以在github 搜索一下 SparkLauncher。

  SparkLauncher launcher = new SparkLauncher().setSparkHome("/opt/cloudera/parcels/CDH-6.1.2/lib/spark").setAppResource(sparkAppPara.getJarPath()).setMainClass(sparkAppPara.getMainClass()).setMaster(sparkAppPara.getMaster()).setDeployMode(sparkAppPara.getDeployMode()).setConf("spark.driver.memory", sparkAppPara.getDriverMemory() + "g").setConf("spark.executor.memory", sparkAppPara.getExecutorMemory() + "g").setConf("spark.executor.instances", sparkAppPara.getExecutorInstances()).setConf("spark.executor.cores", sparkAppPara.getExecutorCores()).setConf("spark.yarn.queue", "root.default");

2:后续通过团队的同学研究,可以通过livy rest 服务提交spark 任务。我们的kylin cube 构建都是通过livy 提交,整体的性能有较大的提升。b站上有很丰富的livy 介绍,大家可以自行学习。

5:实时的画像如何毫秒级别响应

1:我们的大数据部署基于cdh,datanode与hbase混布。如果yarn任务执行起来cpu或内存使用较多,hbase的响应比较慢。基于成本考虑,我们的另外一套hbase没有重新部署一套。而是将数据存储在aliyun hbase ,整体性能比较稳定rt 20ms-50ms左右。

2:hbase 存储问题搞定,hbase查询也是一个问题。到底是基于rowkey 多 column 批量查询,还是基于rowkey 直接把同一个列簇对应的所有数据查出。我们在压测的过程中各有利弊,在这里就不做发散。

基于同一个rowkey,批量查询row对应的不同 column

/*** 多列查询数据返回column名以及对应数据** @param tableName* @return* @throws IOException*/public Map<String, String> batchQueryTableByColumnForMap(String tableName, String rowKey, String family, List<String> column) throws IOException {Table table = null;Map<String, String> value = Maps.newHashMap();if (StringUtils.isBlank(tableName) || CollectionUtils.isEmpty(column)) {return value;}try {table = connection.getTable(TableName.valueOf(tableName));List<Get> getList = new ArrayList();for (String c : column) {Get get = new Get(Bytes.toBytes(rowKey));get.addColumn(Bytes.toBytes(family), Bytes.toBytes(c));getList.add(get);}Result[] results = table.get(getList);//重点在这,直接查getList<Get>for (Result result : results) {//对返回的结果集进行操作for (Cell cell : result.rawCells()) {value.put(Bytes.toString(Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())), Bytes.toString(CellUtil.cloneValue(cell)));}}} catch (IOException e) {e.printStackTrace();} finally {try {assert table != null;table.close();} catch (IOException e) {e.printStackTrace();}}return value;}

根据rowkey查询出一个列簇对应的所有字段

 /*** 获取值* expression : get 'tableName','rowkey','family:column'** @param rowKey 行id* @param family 列族名* @return string*/public  Map<String, String> getRowKeyValueMap(String tableName, String rowKey, String family) {Table table = null;Map<String, String> value = Maps.newHashMap();if (StringUtils.isBlank(tableName) || StringUtils.isBlank(family) || StringUtils.isBlank(rowKey)) {return value;}try {table = connection.getTable(TableName.valueOf(tableName));Get g = new Get(rowKey.getBytes());Result result = table.get(g);List<Cell> ceList = result.listCells();if (ceList != null && ceList.size() > 0) {for (Cell cell : ceList) {value.put(Bytes.toString(Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));}}} catch (IOException e) {e.printStackTrace();} finally {try {assert table != null;table.close();} catch (IOException e) {e.printStackTrace();}}return value;}

6:如何根据数据动态判断画像结果

1:这里要非常感谢一下阿里的qlexpress,ql判断支持丰富,大部分语法跟sql类似。我们只需要业务的同学创建规则的时候,把相关规则转为ql语法,动态的传入相关画像参数。废话不多说,上代码。(目前来看运行比较稳定)

 public static void main(String[] args) throws Exception {ExpressRunner runner = new ExpressRunner();DefaultContext<String, Object> context = new DefaultContext<String, Object>();context.put("vip_endtime",1623513700000L);context.put("vip_accumulat_count","11");context.put("last_buy_member_type",2);context.put("keyword","你视频打啊啊");context.put("from_app",12);String express = " (   ( vip_accumulat_count != null  and  vip_accumulat_count> 0 ) and  (  ( (  (vip_endtime !=null && vip_endtime >= 1623513600000 and vip_endtime < 1624204800000 )  ) )" +" and (last_buy_member_type!=null && last_buy_member_type != 1))) and  from_app ==12  and keyword  like '%你好%' ";Object r = runner.execute(express, context, null, true, true);System.out.println(r);}

3:画像的一些理解

人群画像不是万金油,如果想要做好用户留存,付费。还是需要回归到用户的核心诉求中。根据相关用户行为分析把核心功能做好,吃透。相信用户会为你提供的服务买单。欢迎大家一起交流学习。

大数据人群画像技术方案相关推荐

  1. 大数据用户画像技术原理和实践

    在大数据时代,机器要学会从比特流中解读用户,构建用户画像就变得尤其重要.本文介绍了用户画像的理论和实践,以及在实际中的应用.如何根据用户画像进行精准营销?将用户画像应用于个性化推荐?一起来寻找答案吧~ ...

  2. 应用大数据和机器学习技术实现车险全流程智能化的方案(上)

    应用大数据和机器学习技术实现车险全流程智能化的方案(上) -承保流程智能化改造 一.简要说明 以技术替代人力的思路对车险全业务流程改造,即应用车险大数据和机器学习技术全部或部分替代承保理赔管理相关业务 ...

  3. 网易大数据用户画像实践

    [与数据同行]已开通综合.数据仓库.数据分析.产品经理.数据治理及机器学习六大专业群,加微信号frank61822701 为好友后入群.新开招聘交流群,请关注[与数据同行]公众号,后台回复" ...

  4. 看完秒懂大数据用户画像!

    来自:网络 什么是用户画像? 用户画像(User Profile),作为大数据的根基,它完美地抽象出一个用户的信息全貌,为进一步精准.快速地分析用户行为习惯.消费习惯等重要信息,提供了足够的数据基础, ...

  5. 兼顾稳定和性能,58大数据平台的技术演进与实践

    http://www.infoq.com/cn/articles/58-big-data-platform-technology 主要内容分为三方面:58大数据平台目前的整体架构是怎么样的:最近一年半 ...

  6. 稳定和性能如何兼顾?58大数据平台的技术演进与实践

    作者|赵健博 编辑|尚剑 本文将为你分享58大数据平台在最近一年半内技术演进的过程,包括:58大数据平台目前的整体架构是怎么样的:最近一年半的时间内我们面临的问题.挑战以及技术演进过程:以及未来的规划 ...

  7. 大数据和云计算技术周报(第7期)

    写在第7期周报 坚持是一种品格! "大数据" 三个字其实是个marketing语言,从技术角度看,包含范围很广,计算.存储.网络都涉及,知识点广.学习难度高. #大数据和云计算技术 ...

  8. 诸葛io的技术架构图_大数据平台的技术演化之路 诸葛io平台设计实例

    作者简介:本文来自诸葛io创始人孔淼的技术分享.诸葛io是业内领先的智能数据决策平台,也是国内早期的数据分析践行者.本文将从诸葛io平台设计实例,分享大数据平台的技术演化之路. 如今,数据分析能力正逐 ...

  9. 大数据和云计算技术周报(第169期)

    导语 "大数据" 三个字其实是个marketing语言,从技术角度看,包含范围很广,计算.存储.网络都涉及,知识点广.学习难度高. 本期会给大家奉献上精彩的:MongoDB.用户画 ...

最新文章

  1. TinyML:下一轮人工智能革命
  2. POJ 1324 Holedox Moving 搜索
  3. 企业数据移动服务管理软件SAP
  4. OpenCV 3.1.0中的Mat对象使用
  5. Codeforces Round #640 (Div. 4)(ABCDE)
  6. linux 批量创建用户和删除用户
  7. (2)Mac安装Parallels无法上网
  8. git add/rm/mv文件到暂存区
  9. 罗技G304鼠标的按键宏定义
  10. Java菜鸟教程 for循环和while循环
  11. 14.程序员常用10种算法
  12. 3dmax渲染出来的图不清晰?
  13. 以讹传讹的小故事大道理
  14. 红旗linux如何开远程桌面,配置VNC服务实现红旗Linux远程桌面访问
  15. Android开发——适配终结者AutoLayout
  16. 3-2 数制转换计算器
  17. java 字节码分析_Java 字节码实践 - 解读
  18. 【stm32f407】flash编程
  19. python 发送邮件给多人
  20. 一念起,天涯咫尺;一念灭,咫尺天涯

热门文章

  1. 三菱mode bus tcp通讯_廊坊三菱MR-J4
  2. 单片机80C51(01)
  3. Android实现简易示波器
  4. 在gitlab上使用动态gif作为自己的头像(转)
  5. 乐视三合一体感摄像头--windows下的开发2
  6. 年终盘点:2021年的通信行业
  7. 【教程】【图文】使用 CCPE 批量装机、网络克隆
  8. 上善若水——甲骨文——文字文化
  9. 应用系统数据删除与恢复
  10. 扩展使用.INF文件-制作免安装的绿色软件