22.按筛选参数对session粒度聚合数据进行过滤
本文为《Spark大型电商项目实战》 系列文章之一,主要介绍在session粒度聚合数据的基础上进行过滤,筛选参数主要有年龄范围、职业范围、城市范围、性别、搜索词、点击品类等进行筛选。
代码实现
在之前UserVisitSessionAnalyzeSpark.java
的基础上添加筛选过滤功能
package com.erik.sparkproject.spark;import java.util.Iterator;import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;import com.alibaba.fastjson.JSONObject;
import com.erik.sparkproject.conf.ConfigurationManager;
import com.erik.sparkproject.constant.Constants;
import com.erik.sparkproject.dao.ITaskDAO;
import com.erik.sparkproject.domain.Task;
import com.erik.sparkproject.impl.DAOFactory;
import com.erik.sparkproject.test.MockData;
import com.erik.sparkproject.util.*;import scala.Tuple2;/*** * @author Erik**/
public class UserVisitSessionAnalyzeSpark {public static void main(String[] args) {args = new String[]{"2"}; //构建spark上下文//首先在Constants.java中设置spark作业相关的常量//String SPARK_APP_NAME = "UserVisitSessionAnalyzeSpark";//保存Constants.java配置SparkConf conf = new SparkConf().setAppName(Constants.SPARK_APP_NAME).setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);SQLContext sqlContext = getSQLContext(sc.sc());//生成模拟测试数据mockData(sc, sqlContext);//创建需要使用的DAO组件ITaskDAO taskDAO = DAOFactory.getTaskDAO();//那么就首先得查询出来指定的任务,并获取任务的查询参数long taskid = ParamUtils.getTaskIdFromArgs(args);Task task = taskDAO.findById(taskid);JSONObject taskParam = JSONObject.parseObject(task.getTaskParam());//如果要进行session粒度的数据聚合,//首先要从user_visit_action表中,查询出来指定日期范围内的数据JavaRDD<Row> actionRDD = getActionRDDByDateRange(sqlContext, taskParam);//聚合//首先,可以将行为数据按照session_id进行groupByKey分组//此时的数据粒度就是session粒度了,然后可以将session粒度的数据与用户信息数据惊醒join//然后就可以获取到session粒度的数据,同时数据里面还包含了session对应的user信息//到这里为止,获取的数据是<sessionid,(sessionid,searchKeywords,//clickCategoryIds,age,professional,city,sex)>JavaPairRDD<String, String> sessionid2AggrInfoRDD = aggregateBySession(sqlContext, actionRDD);//接着,就要针对session粒度的聚合数据,按照使用者指定的筛选参数进行数据过滤//相当于我们自己编写的算子,是要访问外面的任务参数对象的//匿名内部类(算子函数),访问外部对象,是要给外部对象使用final修饰的JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = filterSession(sessionid2AggrInfoRDD, taskParam);//关闭spark上下文sc.close();}/*** 获取SQLContext* 如果在本地测试环境的话,那么久生成SQLC哦那text对象*如果在生产环境运行的话,那么就生成HiveContext对象* @param sc SparkContext* @return SQLContext*/private static SQLContext getSQLContext(SparkContext sc) {//在my.properties中配置//spark.local=true(打包之前改为flase)//在ConfigurationManager.java中添加//public static Boolean getBoolean(String key) {// String value = getProperty(key);// try {// return Boolean.valueOf(value);// } catch (Exception e) {// e.printStackTrace();// }// return false; //}//在Contants.java中添加//String SPARK_LOCAL = "spark.local";boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);if(local) {return new SQLContext(sc);}else {return new HiveContext(sc);} }/*** 生成模拟数据* 只有是本地模式,才会生成模拟数据* @param sc* @param sqlContext*/private static void mockData(JavaSparkContext sc, SQLContext sqlContext) {boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);if(local) {MockData.mock(sc, sqlContext);}}/*** 获取指定日期范围内的用户访问行为数据* @param sqlContext SQLContext* @param taskParam 任务参数* @return 行为数据RDD*/private static JavaRDD<Row> getActionRDDByDateRange(SQLContext sqlContext, JSONObject taskParam) {//先在Constants.java中添加任务相关的常量//String PARAM_START_DATE = "startDate";//String PARAM_END_DATE = "endDate";String startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE);String endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE);String sql = "select * "+ "from user_visit_action"+ "where date>='" + startDate + "'"+ "and date<='" + endDate + "'";DataFrame actionDF = sqlContext.sql(sql);return actionDF.javaRDD();}/*** 对行为数据按sesssion粒度进行聚合* @param actionRDD 行为数据RDD* @return session粒度聚合数据*/private static JavaPairRDD<String, String> aggregateBySession(SQLContext sqlContext, JavaRDD<Row> actionRDD) {//现在actionRDD中的元素是Row,一个Row就是一行用户访问行为记录,比如一次点击或者搜索//现在需要将这个Row映射成<sessionid,Row>的格式JavaPairRDD<String, Row> sessionid2ActionRDD = actionRDD.mapToPair(/*** PairFunction* 第一个参数,相当于是函数的输入* 第二个参数和第三个参数,相当于是函数的输出(Tuple),分别是Tuple第一个和第二个值*/new PairFunction<Row, String, Row>() {private static final long serialVersionUID = 1L;public Tuple2<String, Row> call(Row row) throws Exception {//按照MockData.java中字段顺序获取//此时需要拿到session_id,序号是2return new Tuple2<String, Row>(row.getString(2), row);}});//对行为数据按照session粒度进行分组JavaPairRDD<String, Iterable<Row>> sessionid2ActionsRDD = sessionid2ActionRDD.groupByKey();//对每一个session分组进行聚合,将session中所有的搜索词和点击品类都聚合起来//到此为止,获取的数据格式如下:<userid,partAggrInfo(sessionid,searchKeywords,clickCategoryIds)>JavaPairRDD<Long, String> userid2PartAggrInfoRDD = sessionid2ActionsRDD.mapToPair(new PairFunction<Tuple2<String, Iterable<Row>>, Long, String>() {private static final long serialVersionUID = 1L;public Tuple2<Long, String> call(Tuple2<String, Iterable<Row>> tuple)throws Exception {String sessionid = tuple._1;Iterator<Row> iterator = tuple._2.iterator();StringBuffer searchKeywordsBuffer = new StringBuffer("");StringBuffer clickCategoryIdsBuffer = new StringBuffer("");Long userid = null;//遍历session所有的访问行为while(iterator.hasNext()) {//提取每个 访问行为的搜索词字段和点击品类字段Row row = iterator.next();if(userid == null) {userid = row.getLong(1);}String searchKeyword = row.getString(5);Long clickCategoryId = row.getLong(6);//实际上这里要对数据说明一下//并不是每一行访问行为都有searchKeyword和clickCategoryId两个字段的//其实,只有搜索行为是有searchKeyword字段的//只有点击品类的行为是有clickCaregoryId字段的//所以,任何一行行为数据,都不可能两个字段都有,所以数据是可能出现null值的//所以是否将搜索词点击品类id拼接到字符串中去//首先要满足不能是null值//其次,之前的字符串中还没有搜索词或者点击品类idif(StringUtils.isNotEmpty(searchKeyword)) {if(!searchKeywordsBuffer.toString().contains(searchKeyword)) {searchKeywordsBuffer.append(searchKeyword + ",");}}if(clickCategoryId != null) {if(!clickCategoryIdsBuffer.toString().contains(String.valueOf(clickCategoryId))) {clickCategoryIdsBuffer.append(clickCategoryId + ",");}} }//StringUtils引入的包是import com.erik.sparkproject.util.trimComma;String searchKeywords = StringUtils.trimComma(searchKeywordsBuffer.toString());String clickCategoryIds = StringUtils.trimComma(clickCategoryIdsBuffer.toString());//返回的数据即是<sessionid, partAggrInfo>//但是,这一步聚合后,其实还需要将每一行数据,根对应的用户信息进行聚合//问题来了,如果是跟用户信息进行聚合的话,那么key就不应该是sessionid,而应该是userid//才能够跟<userid, Row>格式的用户信息进行聚合//如果我们这里直接返回<sessionid, partAggrInfo>,还得再做一次mapToPair算子//将RDD映射成<userid,partAggrInfo>的格式,那么就多此一举//所以,我们这里其实可以直接返回数据格式就是<userid,partAggrInfo>//然后在直接将返回的Tuple的key设置成sessionid//最后的数据格式,还是<sessionid,fullAggrInfo>//聚合数据,用什么样的格式进行拼接?//我们这里统一定义,使用key=value|key=vale//在Constants.java中定义spark作业相关的常量//String FIELD_SESSION_ID = "sessionid";//String FIELD_SEARCH_KEYWORDS = "searchKeywords";//String FIELD_CLICK_CATEGORY_IDS = "clickCategoryIds";String partAggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionid + "|"+ Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKeywords + "|"+ Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategoryIds;return new Tuple2<Long, String>(userid, partAggrInfo);}});//查询所有用户数据String sql = "select * from user_info";JavaRDD<Row> userInfoRDD = sqlContext.sql(sql).javaRDD();JavaPairRDD<Long, Row> userid2InfoRDD = userInfoRDD.mapToPair(new PairFunction<Row, Long, Row>(){private static final long serialVersionUID = 1L;public Tuple2<Long, Row> call(Row row) throws Exception {return new Tuple2<Long, Row>(row.getLong(0), row);}});//将session粒度聚合数据,与用户信息进行joinJavaPairRDD<Long, Tuple2<String, Row>> userid2FullInfoRDD = userid2PartAggrInfoRDD.join(userid2InfoRDD);//对join起来的数据进行拼接,并且返回<sessionid,fullAggrInfo>格式的数据JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = userid2FullInfoRDD.mapToPair(new PairFunction<Tuple2<Long, Tuple2<String, Row>>, String, String>() {private static final long serialVersionUID = 1L;public Tuple2<String, String> call(Tuple2<Long, Tuple2<String, Row>> tuple) throws Exception {String partAggrInfo = tuple._2._1;Row userInfoRow = tuple._2._2;String sessionid = StringUtils.getFieldFromConcatString(partAggrInfo, "\\|", Constants.FIELD_SESSION_ID);int age = userInfoRow.getInt(3);String professional = userInfoRow.getString(4);String city = userInfoRow.getString(5);String sex = userInfoRow.getString(6);//在Constants.java中添加以下常量//String FIELD_AGE = "age";//String FIELD_PROFESSIONAL = "professional";//String FIELD_CITY = "city";//String FIELD_SEX = "sex";String fullAggrInfo = partAggrInfo + "|"+ Constants.FIELD_AGE + "=" + age + "|"+ Constants.FIELD_PROFESSIONAL + "=" + professional + "|"+ Constants.FIELD_CITY + "=" + city + "|"+ Constants.FIELD_SEX + "=" + sex ;return new Tuple2<String, String>(sessionid, fullAggrInfo);}});return sessionid2FullAggrInfoRDD;}/*** 过滤session数据* @param sessionid2AggrInfoRDD* @return*/private static JavaPairRDD<String, String> filterSession(JavaPairRDD<String, String> sessionid2AggrInfoRDD, final JSONObject taskParam) {//为了使用后面的ValieUtils,所以,首先将所有的筛选参数拼接成一个连接串String startAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE);String endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE);String professionals = ParamUtils.getParam(taskParam, Constants.PARAM_PROFESSIONALS);String cities = ParamUtils.getParam(taskParam, Constants.PARAM_CITIES);String sex = ParamUtils.getParam(taskParam, Constants.PARAM_SEX);String keywords = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS);String categoryIds = ParamUtils.getParam(taskParam, Constants.PARAM_CATEGORY_IDS);String _parameter = (startAge != null ? Constants.PARAM_START_AGE + "=" + startAge + "|" : "")+ (endAge != null ? Constants.PARAM_END_AGE + "=" + endAge + "|" : "")+ (professionals != null ? Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" : "")+ (cities != null ? Constants.PARAM_CITIES + "=" + cities + "|" : "")+ (sex != null ? Constants.PARAM_SEX + "=" + sex + "|" : "")+ (keywords != null ? Constants.PARAM_KEYWORDS + "=" + keywords + "|" : "")+ (categoryIds != null ? Constants.PARAM_CATEGORY_IDS + "=" + categoryIds : "");if (_parameter.endsWith("\\|")) {_parameter = _parameter.substring(0, _parameter.length() - 1);}final String parameter = _parameter;//根据筛选参数进行过滤JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = sessionid2AggrInfoRDD.filter(new Function<Tuple2<String, String>, Boolean>() {private static final long serialVersionUID = 1L;public Boolean call(Tuple2<String, String> tuple) throws Exception {//首先,从tuple中,获取聚合数据String aggrInfo = tuple._2;//接着,依次按照筛选条件进行过滤//按照年龄范围进行过滤(startAge、endAge)//先在Constants.java中添加常量//String PARAM_START_AGE = "startAge";//String PARAM_END_AGE = "endage";//String PARAM_PROFESSIONALS = "professionals";//String PARAM_CITIES = "cities";//String PARAM_SEX = "sex";//String PARAM_KEYWORDS = "keywords";//String PARAM_CATEGORY_IDS = "categoryIds";if(!ValidUtils.between(aggrInfo, Constants.FIELD_AGE, parameter, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE)) {return false;}//按照职业范围进行过滤(professionals)if(!ValidUtils.in(aggrInfo, Constants.FIELD_PROFESSIONAL, parameter, Constants.PARAM_PROFESSIONALS)) {return false;}//按照城市范围进行过滤(cities)if(!ValidUtils.in(aggrInfo, Constants.FIELD_CITY, parameter, Constants.PARAM_CATEGORY_IDS)) {return false;}//按照性别过滤if(!ValidUtils.equal(aggrInfo, Constants.FIELD_SEX, parameter, Constants.PARAM_SEX)) {return false;}//按照搜索词过滤if(!ValidUtils.in(aggrInfo, Constants.FIELD_SEARCH_KEYWORDS, parameter, Constants.PARAM_KEYWORDS)) {return false; }//按照点击品类id进行搜索if(!ValidUtils.in(aggrInfo, Constants.FIELD_CLICK_CATEGORY_IDS, parameter, Constants.PARAM_CATEGORY_IDS)) {return false;} return true;} }); return null;}
}
《Spark 大型电商项目实战》源码:https://github.com/Erik-ly/SprakProject
本文为《Spark大型电商项目实战》系列文章之一,
更多文章:Spark大型电商项目实战:http://blog.csdn.net/u012318074/article/category/6744423
22.按筛选参数对session粒度聚合数据进行过滤相关推荐
- R把天数据按照不同时间粒度聚合数据(Aggregate)
R把天数据按照不同时间粒度聚合数据(Aggregate) 目录 R把天数据按照不同时间粒度聚合数据(Aggregate) 聚合天数据
- 5.22 使用筛选功能快速过滤指定条件的数据 [原创Excel教程]
原文:http://coolketang.com/staticOffice/5a97f32bac502e0032eb0624.html 1. 本节课将为您演示强大的筛选功能的使用.使用筛选功能可以实现 ...
- navicat创建计算列_Tableau Part 9 计算字段amp;粒度聚合比率amp;表计算
1.计算字段 使用函数和运算符来构造公式的字段,能拖放到功能区创建视图,还可以用来创建新的计算字段,返回值分为数值型.字符型等等 用数据源中的字段创建一个新的字段,值是由计算过程来确定的,新的字段会被 ...
- SQL:求筛选时间段内每天各分组的聚合数据
任务场景: 报表需求,必须由SQL处理完成,页面筛选条件为日期段,需要将所选日期按照每日区分,查出所有组别在每日的进线量.接通量.呼损量.接听率.进线占比.好评率. 进线量为线路全部进线量,接通量为s ...
- SpringMVC框架 学习DAY_02 : 接收请求参数/向模板页面转发数据/重定向与转发 /Session
1. 接收客户端提交的请求参数 1.1. 使用HttpServletRequest接收请求参数 在处理请求的方法的参数列表中,添加HttpServletRequest类型的参数,在处理请求的过程中,调 ...
- kafka 中参数:session.timeout.ms 和 heartbeat.interval.ms的区别
文章目录 1.heartbeat.interval.ms 2.heartbeat.interval.ms 与 session.timeout.ms 的对比 3.session.timeout.ms 和 ...
- pandas使用melt函数将宽表变换为窄表、id_vars参数指定原宽表聚合数据列、value_vars参数指定需要被压缩的数据列(单个、多个)、var_name指定压缩后数据列的名称
pandas使用melt函数将宽表变换为窄表.id_vars参数指定原宽表聚合数据列.value_vars参数指定需要被压缩的数据列(单个.多个).var_name指定压缩后数据列的名称.value_ ...
- 使用聚合数据API查询快递数据-短信验证码-企业核名
有位朋友让我给他新开的网站帮忙做几个小功能,如下: 输入快递公司.快递单号,查询出这个快件的所有动态(从哪里出发,到了哪里) 在注册.登录等场景下的手机验证码(要求有一定的防刷策略) 通过输入公司名的 ...
- 通过聚合数据API实现快递数据查询-短信验证码-企业核名
有位朋友让我给他新开的网站帮忙做几个小功能,如下: 输入快递公司.快递单号,查询出这个快件的所有动态(从哪里出发,到了哪里) 在注册.登录等场景下的手机验证码(要求有一定的防刷策略) 通过输入公司名的 ...
最新文章
- 获取父页面URL的参数对应值及左对齐字符串
- Git常用命令总结(超实用)
- 【嵌入式】C语言高级编程-container_of宏(04)
- python 倒数_【IT专家】python实现文件倒数N行读取
- 内存分区与栈帧使用分析
- 追捕美国头号电脑通缉犯
- Redis六种底层数据结构
- “System.OutOfMemoryException”类型的异常在 mscorlib.dll 中发生,但未在用户代码中进行处理
- matlab 风机 功率曲线,风力发电机功率曲线统计MATLAB代码实现.docx
- Vue中文数组根据文字首字母拼音排序、筛选
- 推荐一款使用快捷的免费文字识别OCR(图片转文字)在线服务
- 华为eNsp S5700组网配置
- 大家能不能在百忙之中 想想鸟姐的话
- 等时替代模型( Isotemporal Substitution Model)
- Python Flask教程学习01
- Java语言基础-面向对象编程三步走之打开冰箱门
- 云之讯java短信验证码真小白教程
- 交通·未来第4期:利用新兴交通数据进行大规模路网交通管理—以无人车和网约车数据为例...
- DIY个人第一台NAS
- 【网络互联技术】(二) 网络安全的几种解决途径
热门文章
- 雨林木风GHSOT_XP_SP3装机版 V0912 【雪豹】
- python删除图片文字_文字隐藏到图片的python脚本
- 极致清新论文答辩PPT模板
- 移动端、PC端、Web端的各自优劣分析
- 搜索技术之--以图搜图
- 倩女幽魂2稳定的服务器,《倩女幽魂2》服务器帮会形势分析之点将台篇
- java毕业设计开题报告javaweb敬老院管理系统的设计和实现|养老院
- 基于深度学习的鸟类检测识别系统(含UI界面,Python代码)
- Mybatis xml中配置一对一关系association一对多关系collection
- php在pdf文件上写字