本文为《Spark大型电商项目实战》 系列文章之一,主要介绍在session粒度聚合数据的基础上进行过滤,筛选参数主要有年龄范围、职业范围、城市范围、性别、搜索词、点击品类等进行筛选。

代码实现

在之前UserVisitSessionAnalyzeSpark.java的基础上添加筛选过滤功能

package com.erik.sparkproject.spark;import java.util.Iterator;import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;import com.alibaba.fastjson.JSONObject;
import com.erik.sparkproject.conf.ConfigurationManager;
import com.erik.sparkproject.constant.Constants;
import com.erik.sparkproject.dao.ITaskDAO;
import com.erik.sparkproject.domain.Task;
import com.erik.sparkproject.impl.DAOFactory;
import com.erik.sparkproject.test.MockData;
import com.erik.sparkproject.util.*;import scala.Tuple2;/*** * @author Erik**/
public class UserVisitSessionAnalyzeSpark {public static void main(String[] args) {args = new String[]{"2"}; //构建spark上下文//首先在Constants.java中设置spark作业相关的常量//String SPARK_APP_NAME = "UserVisitSessionAnalyzeSpark";//保存Constants.java配置SparkConf conf = new SparkConf().setAppName(Constants.SPARK_APP_NAME).setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);SQLContext sqlContext = getSQLContext(sc.sc());//生成模拟测试数据mockData(sc, sqlContext);//创建需要使用的DAO组件ITaskDAO taskDAO = DAOFactory.getTaskDAO();//那么就首先得查询出来指定的任务,并获取任务的查询参数long taskid = ParamUtils.getTaskIdFromArgs(args);Task task = taskDAO.findById(taskid);JSONObject taskParam = JSONObject.parseObject(task.getTaskParam());//如果要进行session粒度的数据聚合,//首先要从user_visit_action表中,查询出来指定日期范围内的数据JavaRDD<Row> actionRDD = getActionRDDByDateRange(sqlContext, taskParam);//聚合//首先,可以将行为数据按照session_id进行groupByKey分组//此时的数据粒度就是session粒度了,然后可以将session粒度的数据与用户信息数据惊醒join//然后就可以获取到session粒度的数据,同时数据里面还包含了session对应的user信息//到这里为止,获取的数据是<sessionid,(sessionid,searchKeywords,//clickCategoryIds,age,professional,city,sex)>JavaPairRDD<String, String> sessionid2AggrInfoRDD = aggregateBySession(sqlContext, actionRDD);//接着,就要针对session粒度的聚合数据,按照使用者指定的筛选参数进行数据过滤//相当于我们自己编写的算子,是要访问外面的任务参数对象的//匿名内部类(算子函数),访问外部对象,是要给外部对象使用final修饰的JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = filterSession(sessionid2AggrInfoRDD, taskParam);//关闭spark上下文sc.close();}/*** 获取SQLContext* 如果在本地测试环境的话,那么久生成SQLC哦那text对象*如果在生产环境运行的话,那么就生成HiveContext对象* @param sc SparkContext* @return SQLContext*/private static SQLContext getSQLContext(SparkContext sc) {//在my.properties中配置//spark.local=true(打包之前改为flase)//在ConfigurationManager.java中添加//public static Boolean getBoolean(String key) {//  String value = getProperty(key);//  try {//      return Boolean.valueOf(value);//  } catch (Exception e) {//      e.printStackTrace();//  }//  return false;   //}//在Contants.java中添加//String SPARK_LOCAL = "spark.local";boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);if(local) {return new SQLContext(sc);}else {return new HiveContext(sc);}   }/*** 生成模拟数据* 只有是本地模式,才会生成模拟数据* @param sc* @param sqlContext*/private static void mockData(JavaSparkContext sc, SQLContext sqlContext) {boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);if(local) {MockData.mock(sc, sqlContext);}}/*** 获取指定日期范围内的用户访问行为数据* @param sqlContext SQLContext* @param taskParam 任务参数* @return 行为数据RDD*/private static JavaRDD<Row> getActionRDDByDateRange(SQLContext sqlContext, JSONObject taskParam) {//先在Constants.java中添加任务相关的常量//String PARAM_START_DATE = "startDate";//String PARAM_END_DATE = "endDate";String startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE);String endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE);String sql = "select * "+ "from user_visit_action"+ "where date>='" + startDate + "'"+ "and date<='" + endDate + "'";DataFrame actionDF = sqlContext.sql(sql);return actionDF.javaRDD();}/*** 对行为数据按sesssion粒度进行聚合* @param actionRDD 行为数据RDD* @return session粒度聚合数据*/private static JavaPairRDD<String, String> aggregateBySession(SQLContext sqlContext, JavaRDD<Row> actionRDD) {//现在actionRDD中的元素是Row,一个Row就是一行用户访问行为记录,比如一次点击或者搜索//现在需要将这个Row映射成<sessionid,Row>的格式JavaPairRDD<String, Row> sessionid2ActionRDD = actionRDD.mapToPair(/*** PairFunction* 第一个参数,相当于是函数的输入* 第二个参数和第三个参数,相当于是函数的输出(Tuple),分别是Tuple第一个和第二个值*/new PairFunction<Row, String, Row>() {private static final long serialVersionUID = 1L;public Tuple2<String, Row> call(Row row) throws Exception {//按照MockData.java中字段顺序获取//此时需要拿到session_id,序号是2return new Tuple2<String, Row>(row.getString(2), row);}});//对行为数据按照session粒度进行分组JavaPairRDD<String, Iterable<Row>> sessionid2ActionsRDD = sessionid2ActionRDD.groupByKey();//对每一个session分组进行聚合,将session中所有的搜索词和点击品类都聚合起来//到此为止,获取的数据格式如下:<userid,partAggrInfo(sessionid,searchKeywords,clickCategoryIds)>JavaPairRDD<Long, String> userid2PartAggrInfoRDD = sessionid2ActionsRDD.mapToPair(new PairFunction<Tuple2<String, Iterable<Row>>, Long, String>() {private static final long serialVersionUID = 1L;public Tuple2<Long, String> call(Tuple2<String, Iterable<Row>> tuple)throws Exception {String sessionid = tuple._1;Iterator<Row> iterator = tuple._2.iterator();StringBuffer searchKeywordsBuffer = new StringBuffer("");StringBuffer clickCategoryIdsBuffer = new StringBuffer("");Long userid = null;//遍历session所有的访问行为while(iterator.hasNext()) {//提取每个 访问行为的搜索词字段和点击品类字段Row row = iterator.next();if(userid == null) {userid = row.getLong(1);}String searchKeyword = row.getString(5);Long clickCategoryId = row.getLong(6);//实际上这里要对数据说明一下//并不是每一行访问行为都有searchKeyword和clickCategoryId两个字段的//其实,只有搜索行为是有searchKeyword字段的//只有点击品类的行为是有clickCaregoryId字段的//所以,任何一行行为数据,都不可能两个字段都有,所以数据是可能出现null值的//所以是否将搜索词点击品类id拼接到字符串中去//首先要满足不能是null值//其次,之前的字符串中还没有搜索词或者点击品类idif(StringUtils.isNotEmpty(searchKeyword)) {if(!searchKeywordsBuffer.toString().contains(searchKeyword)) {searchKeywordsBuffer.append(searchKeyword + ",");}}if(clickCategoryId != null) {if(!clickCategoryIdsBuffer.toString().contains(String.valueOf(clickCategoryId))) {clickCategoryIdsBuffer.append(clickCategoryId + ",");}}                       }//StringUtils引入的包是import com.erik.sparkproject.util.trimComma;String searchKeywords = StringUtils.trimComma(searchKeywordsBuffer.toString());String clickCategoryIds = StringUtils.trimComma(clickCategoryIdsBuffer.toString());//返回的数据即是<sessionid, partAggrInfo>//但是,这一步聚合后,其实还需要将每一行数据,根对应的用户信息进行聚合//问题来了,如果是跟用户信息进行聚合的话,那么key就不应该是sessionid,而应该是userid//才能够跟<userid, Row>格式的用户信息进行聚合//如果我们这里直接返回<sessionid, partAggrInfo>,还得再做一次mapToPair算子//将RDD映射成<userid,partAggrInfo>的格式,那么就多此一举//所以,我们这里其实可以直接返回数据格式就是<userid,partAggrInfo>//然后在直接将返回的Tuple的key设置成sessionid//最后的数据格式,还是<sessionid,fullAggrInfo>//聚合数据,用什么样的格式进行拼接?//我们这里统一定义,使用key=value|key=vale//在Constants.java中定义spark作业相关的常量//String FIELD_SESSION_ID = "sessionid";//String FIELD_SEARCH_KEYWORDS = "searchKeywords";//String FIELD_CLICK_CATEGORY_IDS = "clickCategoryIds";String partAggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionid + "|"+ Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKeywords + "|"+ Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategoryIds;return new Tuple2<Long, String>(userid, partAggrInfo);}});//查询所有用户数据String sql = "select * from user_info";JavaRDD<Row> userInfoRDD = sqlContext.sql(sql).javaRDD();JavaPairRDD<Long, Row> userid2InfoRDD = userInfoRDD.mapToPair(new PairFunction<Row, Long, Row>(){private static final long serialVersionUID = 1L;public Tuple2<Long, Row> call(Row row) throws Exception {return new Tuple2<Long, Row>(row.getLong(0), row);}});//将session粒度聚合数据,与用户信息进行joinJavaPairRDD<Long, Tuple2<String, Row>> userid2FullInfoRDD = userid2PartAggrInfoRDD.join(userid2InfoRDD);//对join起来的数据进行拼接,并且返回<sessionid,fullAggrInfo>格式的数据JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = userid2FullInfoRDD.mapToPair(new PairFunction<Tuple2<Long, Tuple2<String, Row>>, String, String>() {private static final long serialVersionUID = 1L;public Tuple2<String, String> call(Tuple2<Long, Tuple2<String, Row>> tuple) throws Exception {String partAggrInfo = tuple._2._1;Row userInfoRow = tuple._2._2;String sessionid = StringUtils.getFieldFromConcatString(partAggrInfo, "\\|", Constants.FIELD_SESSION_ID);int age = userInfoRow.getInt(3);String professional = userInfoRow.getString(4);String city = userInfoRow.getString(5);String sex = userInfoRow.getString(6);//在Constants.java中添加以下常量//String FIELD_AGE = "age";//String FIELD_PROFESSIONAL = "professional";//String FIELD_CITY = "city";//String FIELD_SEX = "sex";String fullAggrInfo = partAggrInfo + "|"+ Constants.FIELD_AGE + "=" + age + "|"+ Constants.FIELD_PROFESSIONAL + "=" + professional + "|"+ Constants.FIELD_CITY + "=" + city + "|"+ Constants.FIELD_SEX + "=" + sex ;return new Tuple2<String, String>(sessionid, fullAggrInfo);}});return sessionid2FullAggrInfoRDD;}/*** 过滤session数据* @param sessionid2AggrInfoRDD* @return*/private static JavaPairRDD<String, String> filterSession(JavaPairRDD<String, String> sessionid2AggrInfoRDD, final JSONObject taskParam) {//为了使用后面的ValieUtils,所以,首先将所有的筛选参数拼接成一个连接串String startAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE);String endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE);String professionals = ParamUtils.getParam(taskParam, Constants.PARAM_PROFESSIONALS);String cities = ParamUtils.getParam(taskParam, Constants.PARAM_CITIES);String sex = ParamUtils.getParam(taskParam, Constants.PARAM_SEX);String keywords = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS);String categoryIds = ParamUtils.getParam(taskParam, Constants.PARAM_CATEGORY_IDS);String _parameter = (startAge != null ? Constants.PARAM_START_AGE + "=" + startAge + "|" : "")+ (endAge != null ? Constants.PARAM_END_AGE + "=" + endAge + "|" : "")+ (professionals != null ? Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" : "")+ (cities != null ? Constants.PARAM_CITIES + "=" + cities + "|" : "")+ (sex != null ? Constants.PARAM_SEX + "=" + sex + "|" : "")+ (keywords != null ? Constants.PARAM_KEYWORDS + "=" + keywords + "|" : "")+ (categoryIds != null ? Constants.PARAM_CATEGORY_IDS + "=" + categoryIds : "");if (_parameter.endsWith("\\|")) {_parameter = _parameter.substring(0, _parameter.length() - 1);}final String parameter = _parameter;//根据筛选参数进行过滤JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = sessionid2AggrInfoRDD.filter(new Function<Tuple2<String, String>, Boolean>() {private static final long serialVersionUID = 1L;public Boolean call(Tuple2<String, String> tuple) throws Exception {//首先,从tuple中,获取聚合数据String aggrInfo = tuple._2;//接着,依次按照筛选条件进行过滤//按照年龄范围进行过滤(startAge、endAge)//先在Constants.java中添加常量//String PARAM_START_AGE = "startAge";//String PARAM_END_AGE = "endage";//String PARAM_PROFESSIONALS = "professionals";//String PARAM_CITIES = "cities";//String PARAM_SEX = "sex";//String PARAM_KEYWORDS = "keywords";//String PARAM_CATEGORY_IDS = "categoryIds";if(!ValidUtils.between(aggrInfo, Constants.FIELD_AGE, parameter, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE)) {return false;}//按照职业范围进行过滤(professionals)if(!ValidUtils.in(aggrInfo, Constants.FIELD_PROFESSIONAL, parameter, Constants.PARAM_PROFESSIONALS)) {return false;}//按照城市范围进行过滤(cities)if(!ValidUtils.in(aggrInfo, Constants.FIELD_CITY, parameter, Constants.PARAM_CATEGORY_IDS)) {return false;}//按照性别过滤if(!ValidUtils.equal(aggrInfo, Constants.FIELD_SEX, parameter, Constants.PARAM_SEX)) {return false;}//按照搜索词过滤if(!ValidUtils.in(aggrInfo, Constants.FIELD_SEARCH_KEYWORDS, parameter, Constants.PARAM_KEYWORDS)) {return false;                           }//按照点击品类id进行搜索if(!ValidUtils.in(aggrInfo, Constants.FIELD_CLICK_CATEGORY_IDS, parameter, Constants.PARAM_CATEGORY_IDS)) {return false;}                   return true;}       });     return null;}
}

《Spark 大型电商项目实战》源码:https://github.com/Erik-ly/SprakProject

本文为《Spark大型电商项目实战》系列文章之一,
更多文章:Spark大型电商项目实战:http://blog.csdn.net/u012318074/article/category/6744423

22.按筛选参数对session粒度聚合数据进行过滤相关推荐

  1. R把天数据按照不同时间粒度聚合数据(Aggregate)

    R把天数据按照不同时间粒度聚合数据(Aggregate) 目录 R把天数据按照不同时间粒度聚合数据(Aggregate) 聚合天数据

  2. 5.22 使用筛选功能快速过滤指定条件的数据 [原创Excel教程]

    原文:http://coolketang.com/staticOffice/5a97f32bac502e0032eb0624.html 1. 本节课将为您演示强大的筛选功能的使用.使用筛选功能可以实现 ...

  3. navicat创建计算列_Tableau Part 9 计算字段amp;粒度聚合比率amp;表计算

    1.计算字段 使用函数和运算符来构造公式的字段,能拖放到功能区创建视图,还可以用来创建新的计算字段,返回值分为数值型.字符型等等 用数据源中的字段创建一个新的字段,值是由计算过程来确定的,新的字段会被 ...

  4. SQL:求筛选时间段内每天各分组的聚合数据

    任务场景: 报表需求,必须由SQL处理完成,页面筛选条件为日期段,需要将所选日期按照每日区分,查出所有组别在每日的进线量.接通量.呼损量.接听率.进线占比.好评率. 进线量为线路全部进线量,接通量为s ...

  5. SpringMVC框架 学习DAY_02 : 接收请求参数/向模板页面转发数据/重定向与转发 /Session

    1. 接收客户端提交的请求参数 1.1. 使用HttpServletRequest接收请求参数 在处理请求的方法的参数列表中,添加HttpServletRequest类型的参数,在处理请求的过程中,调 ...

  6. kafka 中参数:session.timeout.ms 和 heartbeat.interval.ms的区别

    文章目录 1.heartbeat.interval.ms 2.heartbeat.interval.ms 与 session.timeout.ms 的对比 3.session.timeout.ms 和 ...

  7. pandas使用melt函数将宽表变换为窄表、id_vars参数指定原宽表聚合数据列、value_vars参数指定需要被压缩的数据列(单个、多个)、var_name指定压缩后数据列的名称

    pandas使用melt函数将宽表变换为窄表.id_vars参数指定原宽表聚合数据列.value_vars参数指定需要被压缩的数据列(单个.多个).var_name指定压缩后数据列的名称.value_ ...

  8. 使用聚合数据API查询快递数据-短信验证码-企业核名

    有位朋友让我给他新开的网站帮忙做几个小功能,如下: 输入快递公司.快递单号,查询出这个快件的所有动态(从哪里出发,到了哪里) 在注册.登录等场景下的手机验证码(要求有一定的防刷策略) 通过输入公司名的 ...

  9. 通过聚合数据API实现快递数据查询-短信验证码-企业核名

    有位朋友让我给他新开的网站帮忙做几个小功能,如下: 输入快递公司.快递单号,查询出这个快件的所有动态(从哪里出发,到了哪里) 在注册.登录等场景下的手机验证码(要求有一定的防刷策略) 通过输入公司名的 ...

最新文章

  1. 获取父页面URL的参数对应值及左对齐字符串
  2. Git常用命令总结(超实用)
  3. 【嵌入式】C语言高级编程-container_of宏(04)
  4. python 倒数_【IT专家】python实现文件倒数N行读取
  5. 内存分区与栈帧使用分析
  6. 追捕美国头号电脑通缉犯
  7. Redis六种底层数据结构
  8. “System.OutOfMemoryException”类型的异常在 mscorlib.dll 中发生,但未在用户代码中进行处理
  9. matlab 风机 功率曲线,风力发电机功率曲线统计MATLAB代码实现.docx
  10. Vue中文数组根据文字首字母拼音排序、筛选
  11. 推荐一款使用快捷的免费文字识别OCR(图片转文字)在线服务
  12. 华为eNsp S5700组网配置
  13. 大家能不能在百忙之中 想想鸟姐的话
  14. 等时替代模型( Isotemporal Substitution Model)
  15. Python Flask教程学习01
  16. Java语言基础-面向对象编程三步走之打开冰箱门
  17. 云之讯java短信验证码真小白教程
  18. 交通·未来第4期:利用新兴交通数据进行大规模路网交通管理—以无人车和网约车数据为例...
  19. DIY个人第一台NAS
  20. 【网络互联技术】(二) 网络安全的几种解决途径

热门文章

  1. 雨林木风GHSOT_XP_SP3装机版 V0912 【雪豹】
  2. python删除图片文字_文字隐藏到图片的python脚本
  3. 极致清新论文答辩PPT模板
  4. 移动端、PC端、Web端的各自优劣分析
  5. 搜索技术之--以图搜图
  6. 倩女幽魂2稳定的服务器,《倩女幽魂2》服务器帮会形势分析之点将台篇
  7. java毕业设计开题报告javaweb敬老院管理系统的设计和实现|养老院
  8. 基于深度学习的鸟类检测识别系统(含UI界面,Python代码)
  9. Mybatis xml中配置一对一关系association一对多关系collection
  10. php在pdf文件上写字