目录

  • 需求分析
  • 数据原型
  • 设计思路
  • 数据模型及演化过程
  • 实施过程

需求分析

我们来根据移动设备唯一标识deviceID来计算来自客户端用户访问日志请求和响应的上行流量、下行流量的记录。

  • 上行流量:指的是手机app向服务器发送的请求数据的流量
  • 下行流量:指的是服务器端给手机app返回的数据(比如说图片、文字、json)的流量

1.计算每个设备(deviceID)总上行流量之和与下行流量之和(取时间戳取最小的deviceID)

eg: 时间戳  设备号 上行流量    下行流量
1       001     79976       11496
2       001     95291       89092
3       002     57029       93467       -> LogInfo(1, 001, 79976+95291+20428, 11496+89092+57706)
4       001     20428       57706
5       003     5291        9092

2.根据上行流量和下行流量进行排序
优先根据上行流量进行排序,如果上行流量相等,那么根据下行流量排序。如果上行流量和下行流量都相当,那么就根据最早时间戳类排序,即需要二次排序)

3.获取流量最大的前10个设备

数据原型

时间戳(timeStamp)   设备号(deviceID)                       上行流量  下行流量
1454307391161   77e3c9e1811d4fb291d0d9bbd456bb4b    79976   11496
1454315971161   f92ecf8e076d44b89f2d070fb1df7197    95291   89092
1454304331161   3de7d6514f1d4ac790c630fa63d8d0be    57029   50228
1454303131161   dd382d2a20464a74bbb7414e429ae452    20428   93467
1454319991161   bb2956150d6741df875fbcca76ae9e7c    51994   57706
...

设计思路

  • 1.自定义数据类型LogInfo(timeStamp,upTraffic,downTraffic)
  • 2.将rdd映射成key-value方式<diviceId,LogInfo>
  • 3.根据diviceId进行聚合,timeStamp取最小值,upTraffic为上行流量总和,downTraffic为下行流量总和
  • 4.自定义一个键值对的比较类来实现比较,要实现Ordered接口和Serializable接口,在key中实现自己对多个列的排序算法。
  • 5.将<diviceId, LogInfo(timeStamp,upTraffic,downTraffic)>映射成<LogSort(timeStamp,upTraffic,downTraffic),diviceId>
  • 6.使用sortByKey算子按照自定义的key进行排序
  • 7.使用take算子取出前n名
  • 8.将排序过的value值打印输出

数据模型及演化过程

时间戳  设备号 上行流量    下行流量    <diviceId, LogInfo(timeStamp,upTraffic,downTraffic)>  <diviceId, LogInfo(timeStamp,upTraffic,downTraffic)>  <LogSort(timeStamp,upTraffic,downTraffic),diviceId>
1       001     10          20               <001,LogInfo(1,10,20)>
2       001     20          15               <001,LogInfo(2,20,15)>                                        <001,LogInfo(1,70,55)>                                <LogSort(1,70,55),001>
3       002     25          10      map() -> <002,LogInfo(3,25,10)>                        reduceByKey() -> <002,LogInfo(3,25,10)>                        map() -> <LogSort(3,25,10),002>                    sortByKey(false) -> take(n)
4       001     30          20               <001,LogInfo(4,30,20)>                                        <003,LogInfo(5,10,20)>                                <LogSort(5,10,20),003>
5       003     10          20               <003,LogInfo(5,10,20)>

实施过程

首先将SparkConf分装在一个类中

package com.kfk.spark.common;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;/*** @author : 蔡政洁* @email :caizhengjie888@icloud.com* @date : 2020/11/28* @time : 6:18 下午*/
public class CommSparkContext {public static JavaSparkContext getsc(){SparkConf sparkConf = new SparkConf().setAppName("CommSparkContext").setMaster("local");return new JavaSparkContext(sparkConf);}
}

自定义数据类型LogInfo

package com.kfk.spark.traffic_analysis_project;import java.io.Serializable;/*** @author : 蔡政洁* @email :caizhengjie888@icloud.com* @date : 2020/11/30* @time : 6:40 下午*/
public class LogInfo implements Serializable {private long timeStamp;private long upTraffic;private long downTraffic;public long getTimeStamp() {return timeStamp;}public void setTimeStame(long timeStame) {this.timeStamp = timeStame;}public long getUpTraffic() {return upTraffic;}public void setUpTraffic(long upTraffic) {this.upTraffic = upTraffic;}public long getDownTraffic() {return downTraffic;}public void setDownTraffic(long downTraffic) {this.downTraffic = downTraffic;}public LogInfo(){}public LogInfo(long timeStame, long upTraffic, long downTraffic) {this.timeStamp = timeStame;this.upTraffic = upTraffic;this.downTraffic = downTraffic;}
}

自定义key排序类LogSort

package com.kfk.spark.traffic_analysis_project;import scala.Serializable;
import scala.math.Ordered;/*** @author : 蔡政洁* @email :caizhengjie888@icloud.com* @date : 2020/11/30* @time : 7:39 下午*/
public class LogSort extends LogInfo implements Ordered<LogSort> , Serializable {private long timeStamp;private long upTraffic;private long downTraffic;@Overridepublic long getTimeStamp() {return timeStamp;}public void setTimeStamp(long timeStamp) {this.timeStamp = timeStamp;}@Overridepublic long getUpTraffic() {return upTraffic;}@Overridepublic void setUpTraffic(long upTraffic) {this.upTraffic = upTraffic;}@Overridepublic long getDownTraffic() {return downTraffic;}@Overridepublic void setDownTraffic(long downTraffic) {this.downTraffic = downTraffic;}public LogSort(){}public LogSort(long timeStamp, long upTraffic, long downTraffic) {this.timeStamp = timeStamp;this.upTraffic = upTraffic;this.downTraffic = downTraffic;}public int compare(LogSort that) {int comp = Long.valueOf(this.getUpTraffic()).compareTo(that.getUpTraffic());if (comp == 0){comp = Long.valueOf(this.getDownTraffic()).compareTo(that.getDownTraffic());}if (comp == 0){comp = Long.valueOf(this.getTimeStamp()).compareTo(that.getTimeStamp());}return comp;}public boolean $less(LogSort that) {return false;}public boolean $greater(LogSort that) {return false;}public boolean $less$eq(LogSort that) {return false;}public boolean $greater$eq(LogSort that) {return false;}public int compareTo(LogSort that) {int comp = Long.valueOf(this.getUpTraffic()).compareTo(that.getUpTraffic());if (comp == 0){comp = Long.valueOf(this.getDownTraffic()).compareTo(that.getDownTraffic());}if (comp == 0){comp = Long.valueOf(this.getTimeStamp()).compareTo(that.getTimeStamp());}return comp;}
}

编写主类LogApp

package com.kfk.spark.traffic_analysis_project;import com.kfk.spark.common.CommSparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;import java.util.List;/*** @author : 蔡政洁* @email :caizhengjie888@icloud.com* @date : 2020/11/30* @time : 6:36 下午*/
public class LogApp {/*** rdd映射成key-value方式<diviceId,LogInfo>* rdd map() -> <diviceId,LogInfo(timeStamp,upTraffic,downTraffic)>* @param rdd* @return*/public static JavaPairRDD<String,LogInfo> mapToPairValues(JavaRDD<String> rdd){JavaPairRDD<String,LogInfo> mapToPairRdd =  rdd.mapToPair(new PairFunction<String, String, LogInfo>() {public Tuple2<String, LogInfo> call(String line) throws Exception {long timeStamp = Long.parseLong(line.split("\t")[0]);String diviceId = String.valueOf(line.split("\t")[1]);long upTraffic = Long.parseLong(line.split("\t")[2]);long downTraffic = Long.parseLong(line.split("\t")[3]);LogInfo logInfo = new LogInfo(timeStamp,upTraffic,downTraffic);return new Tuple2<String, LogInfo>(diviceId,logInfo);}});return mapToPairRdd;}/*** 根据diviceId进行聚合* mapToPairRdd reduceByKey() -> <diviceId,LogInfo(timeStamp,upTraffic,downTraffic)>* @param mapPairRdd* @return*/public static JavaPairRDD<String,LogInfo> reduceByKeyValues(JavaPairRDD<String,LogInfo> mapPairRdd){JavaPairRDD<String,LogInfo> reduceByKeyRdd = mapPairRdd.reduceByKey(new Function2<LogInfo, LogInfo, LogInfo>() {public LogInfo call(LogInfo v1, LogInfo v2) throws Exception {long timeStamp = Math.min(v1.getTimeStamp(), v2.getTimeStamp());long upTraffic = v1.getUpTraffic() + v2.getUpTraffic();long downTraffic = v1.getDownTraffic() + v2.getDownTraffic();LogInfo logInfo = new LogInfo();logInfo.setTimeStame(timeStamp);logInfo.setUpTraffic(upTraffic);logInfo.setDownTraffic(downTraffic);return logInfo;}});return reduceByKeyRdd;}/*** reduceByKeyRdd map() -> <LogSort(timeStamp,upTraffic,downTraffic),diviceId>* @param aggregateByKeyRdd* @return*/public static JavaPairRDD<LogSort,String> mapToPairSortValues(JavaPairRDD<String,LogInfo> aggregateByKeyRdd){JavaPairRDD<LogSort,String> mapToPairSortRdd = aggregateByKeyRdd.mapToPair(new PairFunction<Tuple2<String, LogInfo>, LogSort, String>() {public Tuple2<LogSort, String> call(Tuple2<String, LogInfo> stringLogInfoTuple2) throws Exception {String diviceId = stringLogInfoTuple2._1;long timeStamp = stringLogInfoTuple2._2.getTimeStamp();long upTraffic = stringLogInfoTuple2._2.getUpTraffic();long downTraffic = stringLogInfoTuple2._2.getDownTraffic();LogSort logSort = new LogSort(timeStamp,upTraffic,downTraffic);return new Tuple2<LogSort, String>(logSort,diviceId);}});return mapToPairSortRdd;}public static void main(String[] args) {JavaSparkContext sc = CommSparkContext.getsc();JavaRDD<String> rdd = sc.textFile("/Users/caizhengjie/IdeaProjects/spark_study01/src/main/java/com/kfk/spark/datas/access.log");// rdd map() -> <diviceId,LogInfo(timeStamp,upTraffic,downTraffic)>JavaPairRDD<String,LogInfo> mapToPairRdd = mapToPairValues(rdd);// mapToPairRdd reduceByKey() -> <diviceId,LogInfo(timeStamp,upTraffic,downTraffic)>JavaPairRDD<String,LogInfo> reduceByKeyRdd = reduceByKeyValues(mapToPairRdd);// reduceByKeyRdd map() -> <LogSort(timeStamp,upTraffic,downTraffic),diviceId>JavaPairRDD<LogSort, String> mapToPairSortRdd = mapToPairSortValues(reduceByKeyRdd);// sortByKeyJavaPairRDD<LogSort,String> sortByKeyValues = mapToPairSortRdd.sortByKey(false);// TopNList<Tuple2<LogSort,String>> sortKeyList = sortByKeyValues.take(10);for (Tuple2<LogSort,String> logSortStringTuple2 : sortKeyList){System.out.println(logSortStringTuple2._2 + " : " + logSortStringTuple2._1.getUpTraffic() + " : " + logSortStringTuple2._1.getDownTraffic());}}
}

运行结果:

efde893d9c254e549f740d9613b3421c : 1036288 : 629025
84da30d2697042ca9a6835f6ccec6024 : 930018 : 737453
94055312e11c464d8bb16f21e4d607c6 : 827278 : 897382
c2a24d73d77d4984a1d88ea3330aa4c5 : 826817 : 943297
6e535645436f4926be1ee6e823dfd9d2 : 806761 : 613670
92f78b79738948bea0d27178bbcc5f3a : 761462 : 567899
1cca6591b6aa4033a190154db54a8087 : 750069 : 696854
f92ecf8e076d44b89f2d070fb1df7197 : 740234 : 779789
e6164ce7a908476a94502303328b26e8 : 722636 : 513737
537ec845bb4b405d9bf13975e4408b41 : 709045 : 642202

以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!

基于Spark对某移动APP流量访问日志分析(Java版)相关推荐

  1. 一款非常好用的网站访问日志分析工具,360星图

    原文转载自:豫章小站 » <[分享]360星图,一款非常好用的网站访问日志分析工具> 广大站长有没有这样一个体验,网站流量异常,要通过网站日志来分析的时候非常头疼,日志文件很大,一条一条来 ...

  2. nginx 访问日志分析工具 goacess

    2019独角兽企业重金招聘Python工程师标准>>> 20150702 nginx 访问日志分析 goacess 1.安装GoAccess需要一些系统支持库 yum install ...

  3. 安卓app测试之Monkey日志分析《转载》

    安卓app测试之Monkey日志分析 链接:https://www.cnblogs.com/wuzm/p/10965762.html 转载于:https://www.cnblogs.com/bifen ...

  4. 【Spark】基于Spark的大型电商网站交互式行为分析系统项目实战

    1.项目背景 (1)Spark在美团的实践 美团是数据驱动的互联网服务,用户每天在美团上的点击.浏览.下单支付行为都会产生海量的日志,这些日志数据将被汇总处理.分析.挖掘与学习,为美团的各种推荐.搜索 ...

  5. SLB访问日志分析:基于客户端来源和HTTP状态码的实践

    摘要: 阿里云负载均衡SLB可以对多台云服务器(ECS)进行流量分发,支持TCP的四层负载均衡和基于HTTP/HTTPS的七层负载均衡.使用SLB可以降低单台ECS异常时对业务的冲击,提升系统可用性. ...

  6. 服务器网站访问日志分析,服务器日志分析与流量统计_直观快捷分析每个网站的日志...

    本帖最后由 软程科技 于 2019-9-9 19:13 编辑 应用名称:日志分析与流量统计 价格:1元 作者:软程科技 版本: 2.1.9 提示(使用前必读): 1. 此插件未在超大(超过1G)的日志 ...

  7. 50.Spark大型电商项目-用户访问session分析-top10热门品类之本地测试

    本篇文章记录用户访问session分析-top10热门品类之本地测试. 在测试的过程中,到很多问题. 问题一:二次排序需要序列化,否则会在程序运行的时候报错. public class Categor ...

  8. 43.Spark大型电商项目-用户访问session分析-top10热门品类之需求回顾以及实现思路分析

    目录 需求回顾 top10热门品类 二次排序 实现思路分析 本篇文章将记录用户访问session分析-top10热门品类之需求回顾以及实现思路分析. 需求回顾 top10热门品类 计算出来通过筛选条件 ...

  9. java tomcat 日志分析工具_设计一个Tomcat访问日志分析工具

    常使用web服务器的朋友大都了解,一般的web server有两部分日志: 一是运行中的日志,它主要记录运行的一些信息,尤其是一些异常错误日志信息 二是访问日志信息,它记录的访问的时间,IP,访问的资 ...

最新文章

  1. 解题报告(二)C、(darkBZOJ 3771)Triple(生成函数 + FFT + 容斥原理)(3)
  2. Python使用matplotlib函数subplot可视化多个不同颜色的折线图、为多个子图添加总标题(main title)、自定义设置主标题字体类型、字体大小、字体颜色等
  3. 阶段1 语言基础+高级_1-3-Java语言高级_02-继承与多态_第5节 final关键字_1_final关键字概念与四种用法...
  4. 异常信息: java.lang.ClassNotFoundException: org.aspec
  5. Mock server的实现 - run Fiori application using mock data in offline mode
  6. Entity Framework Code First模式基础知识及入门实例01
  7. 使用正则表达式从字符串中提取email
  8. Write operations are not allowed in read-only mode (FlushMode.MANUAL)
  9. Ghost8.0分区备份与恢复详细图解
  10. 本周Asp.net源码更新(6.25-6.29)
  11. 天使和恶魔差异只在一念之间
  12. 微波工程(5)——滤波器
  13. Stata因为“只读文件”不能存储的处理方法
  14. matlab改进遗传算法求解带时间窗的路径优化问题
  15. js输入银行卡号,自动查询银行名称、银行卡类型
  16. html子布局不超出父布局,flex布局子元素超出父元素
  17. Arcgis用矢量文件裁剪栅格图像
  18. php后端开发主要会哪些技术?
  19. 说大数据杀熟,这锅可不背!
  20. vs code + mingw64配置C语言环境

热门文章

  1. 字符串 intValue、floatValue、doubleValue、longLongValue 方法可以正确转换的位数或者大小
  2. 漫画:从诗词大会飞花令到ElasticSearch原理解析
  3. C++ 打印 vector
  4. c语言 =0x20,0x20(十六进制0x20等于多少)
  5. Android手机计步器中加速度传感器熄屏时的侦听
  6. android自定义窗口层级(自定义车载系统中倒车影像显示层级)
  7. UE5——创建你的第一个游戏
  8. 商超便利店如何提高销售额
  9. 酷派7728软件安装到外置SD卡上的方法,也适用于联想s850e等
  10. 债务人确无还款能力,可申请退出老赖黑名单