【用户画像】实现宽表合并,pivot概述,源码实现并发布任务
文章目录
- 一 完成标签宽表的合并
- 1 任务目标
- 2 设计分析
- 3 实现步骤
- 二 代码实现
- 1 新建模块
- 2 补充方法
- 3 main方法之建表
- 三 pivot概述
- 1 例一
- 2 pivot原理
- 3 例二
- 4 例三
- 四 生成insert select语句
- 五 完整代码
- 六 任务发布
- 1 发布合并宽表任务
- 2 新增标签任务
- 3 新增代码
- 4 发布标签任务
一 完成标签宽表的合并
1 任务目标
当所有的单独标签任务都计算完成时,为了更加方便的查询及导出数据,要拼接出一张以用户ID为主键的宽表。宽表的每一列用三级标签编号作为列名,如下图:
这张大宽表,包含了所有的标签,有多少个标签,就会有多少列。
实现思路草图:如何将高表转换为宽表
2 设计分析
- 读取所有启动的标签任务中的标签列表
- 读取标签列表中的标签编码和标签值类型,获得字段名和字段值,拼接成建表语句
- 根据标签列表组合多表合并,同时进行行转列,组合成insert select 语句。
3 实现步骤
- 读取到标签集合的定义 ,目的是提取标签的编码作为宽表的标签字段
- 每天执行产生一张宽表
- 把多个标签表union成高表 ,再对高表进行行转列操作,变成宽表
- 写入画像库中的宽表
二 代码实现
1 新建模块
在父工程下新建task-merge模块,在poml文件中增加配置,如下
<dependencies><dependency><groupId>com.hzy.userprofile</groupId><artifactId>taskcommon</artifactId><version>1.0-SNAPSHOT</version></dependency></dependencies><build><plugins><!-- 该插件用于将Scala代码编译成class文件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.4.6</version><executions><execution><!-- 声明绑定到maven的compile阶段 --><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
建立scala 源码目录
新建类 com.hzy.userprofile.app.TaskMergeApp
2 补充方法
在task-common中的TagInfoDAO中补充方法
def getTagInfoList(): List[TagInfo] ={// 取出启用的三级标签val tagListSql="select tg.id,tag_code,tag_name,parent_tag_id,tag_type,tag_value_type,tag_value_limit,tag_task_id,tag_comment,tg.create_time" +" from tag_info tg join task_info tk on tg.tag_task_id=tk.id where tk.task_status='1' "val tagList: List[TagInfo] = MySqlUtil.queryList(tagListSql, classOf[TagInfo],true)tagList}
3 main方法之建表
package com.hzy.userprofile.appimport java.util.Propertiesimport com.hzy.userprofile.bean.TagInfo
import com.hzy.userprofile.dao.TagInfoDAO
import com.hzy.userprofile.util.MyPropertiesUtilobject TaskMergeApp {/*** 1 读取到标签集合的定义 ,目的是提取标签的编码作为宽表的标签字段* 2 每天执行产生一张宽表* 3 把多个标签表union成高表 ,再对高表进行行转列操作,变成宽表* 4 写入画像库中的宽表* @param args*/def main(args: Array[String]): Unit = {//通过程序配置taskId和taskDateval taskId: String = args(0)val taskDate: String = args(1)// 1 读取到标签集合的定义 ,目的是提取标签的编码作为宽表的标签字段// 获取标签的集合val tagList: List[TagInfo] = TagInfoDAO.getTagInfoList()println(tagList)// 2 宽表的定义 每次执行,需要把当天的宽表清除一次// create table user_tag_merge_20221015(uid string, tagcode1 string, tagcode2 string, ....)// comment '标签宽表'// ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'// location '$hdfsPath/$upDBName/$tableName'// 将task-sql中的所有配置文件复制到task-merge中,并将task-merge下的target目录删除,使程序重新编译val properties: Properties = MyPropertiesUtil.load("config.properties")val hdfsPath: String = properties.getProperty("hdfs-store.path")val dwDBName: String = properties.getProperty("data-warehouse.dbname")val upDBName: String = properties.getProperty("user-profile.dbname")val tagCodeSql = tagList.map(_.tagCode.toLowerCase() + "string").mkString(",")val tableName = s"user_tag_merge_" + taskDate.replace("-","")// 可能会出现建表语句成功,但最终执行失败,所有的任务都要支持幂等性// 需要添加支持删除操作// 为了保证操作的幂等性,即任何一个任务只要执行失败了,都能够重新运行// 在重新运行时,不会带来数据翻倍,存在表不能创建等问题// 只要是批处理,都要考虑重新运行的问题val dropTableSQL = s"drop table if exists $tableName"val createTableSql =s"""| create table $tableName (uid string, $tagCodeSql)| comment '标签宽表'| ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'| location '$hdfsPath/$upDBName/$tableName'|""".stripMarginprintln(dropTableSQL)println(createTableSql)}
}
三 pivot概述
pivot是Spark-SQL,Oracle特有关键词,即旋转,将指列的字段值,旋转成为多个列。并且可以指定某些列成为旋转列的聚合值。
1 例一
进入spark-sql,执行以下语句
创建一个五列的表,插入几条数据,如下:
CREATE TABLE test_order_info
(uname STRING, product STRING, age INT, city string, amount decimal);INSERT INTO test_order_info VALUES( 'zhang3' , 'tv' , 22 , 'bj', 3000),( 'li4', 'notebook', 41, 'bj', 8000),( 'wang5', 'phone', 32, 'sh', 4000),( 'zhao6', 'notebook', 22, 'sz', 3000),( 'zhang3', 'phone', 22, 'bj', 3000), ( 'li4', 'tv', 41, 'sz', 4000) ;
现在想以用户名,年龄两个维度求在各个商品上的花费
uname | age | tv_amount | notebook_amount | phone_amount | … |
---|---|---|---|---|---|
zhang3 | 22 | ?? | ?? | ?? | |
li4 | |||||
wang5 | |||||
zhao6 | |||||
… |
在pivot中需要将所有的字段划分为三种
2 pivot原理
以例一为基础,需要把整个表的所有字段整理成3种列:
- 维度列:uname , age
- 旋转列:product
- 聚合列:amount
- 既不是旋转,也不是聚合,也不是维度:city
格式:
select * from tablename pivot ( sum(聚合列) as 列标识 for 旋转列 in( 旋转列值1 ,旋转列值2,旋转列值3) )
除了旋转列和聚合列,默认都是维度列,如果存在这三种以外的字段,需要提前用子查询去除。
执行之前先执行以下语句添加表头:
# 测试需要使用sparksql,hive不支持pivot
[hzy@hadoop101 ~]$ spark-sql --hiveconf hive.cli.print.header=true
sql实现
select * from
(
select uname,age ,product,amount from test_order_info
) oi
pivot ( sum( amount) as amt for product in ( 'tv','notebook','phone'));
3 例二
现想求不同商品在不同城市的收入和平均年龄,如下表
product | bj_amount | bj_avg_age | sh_amount | sh_avg_age | sz_amount | sz_avg_age | … |
---|---|---|---|---|---|---|---|
notebook | ?? | ?? | ?? | ?? | ?? | ?? | |
phone | |||||||
tv | |||||||
… |
整理为三种列
- 维度列:product
- 旋转列:city
- 聚合列:amount,age
实现sql
select * from
(
select uname,age ,product,amount from test_order_info
) oi
pivot ( sum( amount) as amt for product in ( 'tv','notebook','phone'))
4 例三
create table test_user_tags (uid string, tag_code STRING, tag_value STRING);INSERT INTO test_user_tags VALUES( '101','gender' ,'f' ),( '102', 'gender', 'm' ),( '103', 'gender', 'm' ),( '104', 'gender', 'f' ),( '105', 'gender', 'm' ), ( '106', 'gender', 'f' ),( '101','age' ,'60' ),( '102', 'age', '70' ),( '103', 'age', '80' ),( '104', 'age', '70' ),( '105', 'age', '90' ), ( '106', 'age', '90' ) ,( '101','amount' ,'422' ),( '102', 'amount', '4443' ),( '103', 'amount', '12000' ),( '104', 'amount', '6664' ),( '105', 'amount', '900' ), ( '106', 'amount', '2000' ) ;
求
UID | gender | age | amount | … |
---|---|---|---|---|
101 | ?? | ?? | ?? | |
102 | ||||
103 | ||||
… |
- 维度列:uid
- 旋转列:tag_code
- 聚合列:tag_value
- 聚合函数:concat_ws(‘’,collect_list())
select * from test_user_tags
pivot ( concat_ws(',',collect_list(tag_value)) as tv for tag_code in ('gender','age','amount' ))
四 生成insert select语句
/* 3 把多个标签表union成高表 ,再对高表进行行转列操作,变成宽表把多个标签表union成高表select uid,tag_value from xxx1 where dt = '$taskDate'union allselect uid,tag_value from xxx2 where dt = '$taskDate'union all...*/val tagUnionSQL: String = tagList.map (tagInfo => s"select uid,'${tagInfo.tagCode.toLowerCase}' tag_code,tag_value from ${tagInfo.tagCode.toLowerCase} where dt = '$taskDate'").mkString(" union all ")println(tagUnionSQL)// pivot行转列/*** select * from* ( tagUnionSQL ) tag_union* pivot ( concat_ws(',',collect_list(tag_value)) as tv for tag_code in ('gender','age','amount' ))*/val tagCodeSQL = tagList.map("'" + _.tagCode.toLowerCase + "'").mkString(",")val selectSQL =s"""| select * from| ( $tagUnionSQL) tag_union| pivot ( concat_ws('',collect_list(tag_value)) as tv for tag_code in ($tagCodeSQL))|""".stripMarginprintln(selectSQL)// 4 写入画像库中的宽表// insert overwrite table tableName selectSQLval insertSQL = s"insert overwrite table $tableName $selectSQL"println(insertSQL)sparkSession.sql(insertSQL)
五 完整代码
package com.hzy.userprofile.appimport java.util.Propertiesimport com.hzy.userprofile.bean.TagInfo
import com.hzy.userprofile.dao.TagInfoDAO
import com.hzy.userprofile.util.MyPropertiesUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSessionobject TaskMergeApp {/*** 1 读取到标签集合的定义 ,目的是提取标签的编码作为宽表的标签字段* 2 每天执行产生一张宽表* 3 把多个标签表union成高表 ,再对高表进行行转列操作,变成宽表* 4 写入画像库中的宽表* @param args*/def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName("task_merge_app")//.setMaster("local[*]")val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()//通过程序配置taskId和taskDateval taskId: String = args(0)val taskDate: String = args(1)// 1 读取到标签集合的定义 ,目的是提取标签的编码作为宽表的标签字段// 获取标签的集合val tagList: List[TagInfo] = TagInfoDAO.getTagInfoList()println(tagList)// 2 宽表的定义 每次执行,需要把当天的宽表清除一次// create table user_tag_merge_20221015(uid string, tagcode1 string, tagcode2 string, ....)// comment '标签宽表'// ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'// location '$hdfsPath/$upDBName/$tableName'// 将task-sql中的所有配置文件复制到task-merge中,并将task-merge下的target目录删除,使程序重新编译val properties: Properties = MyPropertiesUtil.load("config.properties")val hdfsPath: String = properties.getProperty("hdfs-store.path")val dwDBName: String = properties.getProperty("data-warehouse.dbname")val upDBName: String = properties.getProperty("user-profile.dbname")val tagCodeSql = tagList.map(_.tagCode.toLowerCase() + " string ").mkString(",")val tableName = s"user_tag_merge_" + taskDate.replace("-","")// 可能会出现建表语句成功,但最终执行失败,所有的任务都要支持幂等性// 需要添加支持删除操作// 为了保证操作的幂等性,即任何一个任务只要执行失败了,都能够重新运行// 在重新运行时,不会带来数据翻倍,存在表不能创建等问题// 只要是批处理,都要考虑重新运行的问题val dropTableSQL = s"drop table if exists $tableName"val createTableSql =s"""| create table $tableName (uid string, $tagCodeSql)| comment '标签宽表'| ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'| location '$hdfsPath/$upDBName/$tableName'|""".stripMarginprintln(dropTableSQL)println(createTableSql)sparkSession.sql(s"use $upDBName")sparkSession.sql(dropTableSQL)sparkSession.sql(createTableSql)/* 3 把多个标签表union成高表 ,再对高表进行行转列操作,变成宽表把多个标签表union成高表select uid,tag_value from xxx1 where dt = '$taskDate'union allselect uid,tag_value from xxx2 where dt = '$taskDate'union all...*/val tagUnionSQL: String = tagList.map (tagInfo => s"select uid,'${tagInfo.tagCode.toLowerCase}' tag_code,tag_value from ${tagInfo.tagCode.toLowerCase} where dt = '$taskDate'").mkString(" union all ")println(tagUnionSQL)// pivot行转列/*** select * from* ( tagUnionSQL ) tag_union* pivot ( concat_ws(',',collect_list(tag_value)) as tv for tag_code in ('gender','age','amount' ))*/val tagCodeSQL = tagList.map("'" + _.tagCode.toLowerCase + "'").mkString(",")val selectSQL =s"""| select * from| ( $tagUnionSQL) tag_union| pivot ( concat_ws('',collect_list(tag_value)) as tv for tag_code in ($tagCodeSQL))|""".stripMarginprintln(selectSQL)// 4 写入画像库中的宽表// insert overwrite table tableName selectSQLval insertSQL = s"insert overwrite table $tableName $selectSQL"println(insertSQL)sparkSession.sql(insertSQL)}
}
六 任务发布
1 发布合并宽表任务
因为这个合并宽表任务是要在所有标签任务执行之后完成的。
所以要新增一个【流程任务】,在【流程任务管理】中,点击添加流程任务
填写流程任务
说明:
任务名称:生成标签宽表
执行方式选择:SPARK_JAR
任务级别:因为要保证在标签任务之后,所以要大于标签任务级别(标签任务级别默认100)就好。
注掉以下代码
val sparkConf: SparkConf = new SparkConf().setAppName("task_merge_app")//.setMaster("local[*]")
在界面会生成一个任务,流程任务和标签任务的不同
- 流程任务:计算除标签以外的任务,如宽表,导出数据,转位图等
- 标签任务:计算用户的标签
上传com.hzy.userprofile.app.TaskMergeApp jar包到服务器
2 新增标签任务
添加二级标签:
- 上级标签:用户行为
- 上级标签编码:TG_BEHAVIOR
- 标签编码:TG_BEHAVIOR_ORDER
- 标签名称:用户下单行为
- 标签类型:类目
添加三级标签
- 上级标签:用户下单行为
- 上级标签编码:TG_BEHAVIOR_ORDER
- 标签编码:TG_BEHAVIOR_ORDER_LAST30D_CT
- 标签名称:最近30天下单次数
- 标签类型:统计
- 标签值类型:整数
在三级标签上添加任务
执行方式:SPARK_SQL
任务SQL:select user_id as uid, order_last_30d_count as query_value from dwt_user_topic where dt = ‘$dt’;
任务参数:
--driver-memory=1G --num-executors=3 --executor-memory=2G --executor-cores=2 --conf spark.default.parallelism=12
下一步工作
- $dt要替换成taskDate
- 如果没有四级标签的匹配规则,直接使用query_value作为标签的值
3 新增代码
package com.hzy.userprofile.appimport java.util.Propertiesimport com.hzy.userprofile.bean.{TagInfo, TaskInfo, TaskTagRule}
import com.hzy.userprofile.constants.ConstCode
import com.hzy.userprofile.dao.{TagInfoDAO, TaskInfoDAO, TaskTagRuleDAO}
import com.hzy.userprofile.util.MyPropertiesUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSessionobject TaskSQLApp {def main(args: Array[String]): Unit = {//0 添加执行环境val sparkConf: SparkConf = new SparkConf().setAppName("task_sql_app")//.setMaster("local[*]")val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()//1 获得标签定义、标签任务的SQL、字标签的匹配规则,存储在MySQL中val taskId: String = args(0)val taskDate: String = args(1)val tagInfo: TagInfo = TagInfoDAO.getTagInfoByTaskId(taskId)val taskInfo: TaskInfo = TaskInfoDAO.getTaskInfo(taskId)val taskTagRuleList: List[TaskTagRule] = TaskTagRuleDAO.getTaskTagRuleListByTaskId(taskId)println(tagInfo)println(taskInfo)println(taskTagRuleList)//2 如果没有表,需要根据标签定义规则建立标签表val tableName = tagInfo.tagCode.toLowerCase()val tagValueType = tagInfo.tagValueType match {case ConstCode.TAG_VALUE_TYPE_LONG => "bigint"case ConstCode.TAG_VALUE_TYPE_DECIMAL => "decimal(16,2)"case ConstCode.TAG_VALUE_TYPE_STRING => "string"case ConstCode.TAG_VALUE_TYPE_DATE => "string"}val properties: Properties = MyPropertiesUtil.load("config.properties")val hdfsPath: String = properties.getProperty("hdfs-store.path")val dwDBName: String = properties.getProperty("data-warehouse.dbname")val upDBName: String = properties.getProperty("user-profile.dbname")// 创建sqlval createTableSQL =s"""| create table if not exists $upDBName.$tableName (uid string,tag_value $tagValueType)| comment '${tagInfo.tagName}'| partitioned by (dt string)| ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'| location '$hdfsPath/$upDBName/$tableName'|""".stripMarginprintln(createTableSQL)sparkSession.sql(createTableSQL)//3 根据标签定义和规则查询数据仓库,生成case when语句,将list中的数据转换成一个一个的when thenval whenThenList: List[String] = taskTagRuleList.map {taskTagRule => s" when '${taskTagRule.queryValue}' then '${taskTagRule.subTagValue}'"}//将$dt替换成任务日期val taskSQL = taskInfo.taskSql.replace("$dt",taskDate)val whenThenSQL: String = whenThenList.mkString(" ")//如果有四级标签的匹配规则,则通过规则组合成case when语句//如果没有四级标签的匹配规则,直接使用query_value作为标签的值var tagValueSQL = ""if(taskTagRuleList.size > 0){tagValueSQL = s" case query_value $whenThenSQL end "} else{tagValueSQL = s" query_value "}val selectSQL =s"""| select uid,| $tagValueSQL as tag_value| from ($taskSQL) tv|""".stripMarginprintln(selectSQL)//4 把数据写入到对应的标签表中sparkSession.sql(s"use $dwDBName")val insertSQL = s"insert overwrite table $upDBName.$tableName partition (dt='$taskDate') $selectSQL"println(insertSQL)sparkSession.sql(insertSQL)}
}
4 发布标签任务
更新sql任务通用jar包,手动调用任务
在【流程任务管理】中,点击手动调度任务。并选择调度日期。
回到【任务进程】中,能够看到除了生成了标签任务的进程,还有合并任务的进程。
调度器(内置15秒)会检查是否有TODO的任务,按照层级进行调度
如果看到任务都达到了finish可以取hive中查询每个标签表和标签宽表是否已经生成完成。
【用户画像】实现宽表合并,pivot概述,源码实现并发布任务相关推荐
- 【用户画像】标签任务开发流程(源码之实体类、工具类、配置文件、DAO层)
文章目录 一 代码实现 0 开发主线 1 实体类 (1)TagInfo (2)TaskInfo (3)TaskTagRule 2 工具类 (1)连接sql的工具类 测试 (2)专门读取properti ...
- 【php毕业设计】基于php+mysql+apache的subversion用户管理系统设计与实现(毕业论文+程序源码)——用户管理系统
基于php+mysql+apache的subversion用户管理系统设计与实现(毕业论文+程序源码) 大家好,今天给大家介绍基于php+mysql+apache的subversion用户管理系统设计 ...
- java查询mysql装载bean_jsp与javabean链接mysql数据库并查询数据表的简单实例源码
jsp与javabean链接mysql数据库并查询数据表的简单实例源码.这个简单的实例是给新手学习的,或者一些高手临时忘记怎么使用jsp操作mysql数据库时候查找的,包括了建立mysql数据库连接的 ...
- java计算机毕业设计ETC用户自驾游推荐系统MyBatis+系统+LW文档+源码+调试部署
java计算机毕业设计ETC用户自驾游推荐系统MyBatis+系统+LW文档+源码+调试部署 java计算机毕业设计ETC用户自驾游推荐系统MyBatis+系统+LW文档+源码+调试部署 本源码技术栈 ...
- 开源自定义表单提交系统源码 支持自定义字段功能强大
分享一个强大的可以自定义字段的表单提交系统源码,完全开源可以二次开发,支持新建N个表单主题,不限制数量,解决所有表单的应用场景.自定义表单模型,任何类型都支持,功能十分强大. 此套万能表单系统厉害了, ...
- 开源表单网站系统源码支持自定义字段提交
分享一个完全开源的自定义表单提交系统源码,功能强大,支持三级联动,支持在线付费报名,带完整搭建教程. 春哥万能自定义表单系统是支持自定义的万能表单系统,支持普通表单.付费报名.预约服务等三合一功能,支 ...
- 计算机毕业设计ssm基于用户激励的图书管理系统fx8il系统+程序+源码+lw+远程部署
计算机毕业设计ssm基于用户激励的图书管理系统fx8il系统+程序+源码+lw+远程部署 计算机毕业设计ssm基于用户激励的图书管理系统fx8il系统+程序+源码+lw+远程部署 本源码技术栈: 项目 ...
- 改善ERP的用户体验,个性化用户界面(Jquery 提供源码)
改善ERP的用户体验,个性化用户界面(Jquery 提供源码) 这篇文章讲述的技术问题并不多,如果你是想了解技术实现,请直接跨过文章下载源码或者看 demo 我大胆起这个名字,有点标题党.希望能对一 ...
- 用户画像 各维度表的结构图
日萌社 人工智能AI:Keras PyTorch MXNet TensorFlow PaddlePaddle 深度学习实战(不定时更新) 用户画像 总文章 ===================== ...
最新文章
- Windows和linux双系统——改动默认启动顺序
- .net获取客户端信息(二)ashx部分
- ICCV2021- 牛津大学新的预训练视频文本数据集WebVid-2M,设计用于端到端检索的联合视频和图像编码器!代码已开源!...
- Android MediaCodec 解码H264码流播放
- 【Elasticsearch】es 新的 数据类型 search_as_you_type
- 了解on的依赖的调查作业的最终目的,前期尽可能的要出更多的工数
- mac apache php.ini,Mac自带的Apache使用详解
- Struts2接受页面传值过程中出现input的问题
- (day 12 - 双指针)剑指 Offer 22. 链表中倒数第k个节点
- 关于分卷压缩文件打不开的问题
- 知网一键下载PDF文献
- 读书笔记-人月神话7
- 网络邻居计算机访问权限设置,windows7不能访问网上邻居的原因及解决方法
- 激活MyEclipse 无法运行cracker2018.jar
- 在数据为王的人工智能时代如何收集机器学习数据
- 电子入门基础知识之:电阻读数方法
- python通信信号处理_python学习笔记——信号模块signal(示例代码)
- 1024凑数篇之程序员职业生涯问答
- blackjack - pwnable
- 学习安卓开发第七天【网格视图qq相册页面】【下拉;列表框】【下拉列表框实例】【列表视图】