题目:广告点击数据统计。

根据动态生成的黑名单进行过滤,实时统计广告点击流量前三。

背景:

在广告点击计费系统中,我们在线过滤掉黑名单的点击,进而保护广告商的利益,只进行有效的广告点击计费 。或者在防刷评分(或者流量)系统,过滤掉无效的投票或者评分或者流量。

实现目标:

1、实现实时动态黑名单机制:将每天对某个广告点击超过N次的用户拉黑。

2、基于黑名单的非法广告点击流量过滤机制。

3、每天各广告的点击流量实时统计。

4、统计每天Top3热门广告。

请完成Spark程序的编写,并完成实验报告上传。实验报告中应该包含实验步骤,代码,运行截图等。

电子信息学院专业实验报告.doc

第一步就是要安装mysql到虚拟机里面,然后在mysql里面建表

数据如下:

3333 flume
4444 ooize
5555 flume
4444 ooize
5555 flume
2222 hive
3333 hadoop
4444 hbase
3333 flume
4444 ooize
5555 flume
flume 1
hadoop 2

import java.io.Serializable;
import java.util.List;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;import org.apache.spark.streaming.Durations;public class finaltest {public static void main(String[] args) throws InterruptedException {// TODO Auto-generated method stub//1.获取实时数据SparkConf sparkConf = new SparkConf().setAppName("Streaming").setMaster("local[2]");JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,Durations.seconds(60));JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),StorageLevels.MEMORY_AND_DISK_SER);//2.处理数据,获得每天对某个广告点击超过N次的用户JavaPairDStream<String,String> data = lines.mapToPair(f -> new Tuple2<>(f.split(",")[0],f.split(",")[1]));data.foreachRDD(rdd -> {JavaRDD<Advert> adRDD = rdd.map(f -> {Advert ad = new Advert();ad.setUserId(f._1);ad.setAdvertId(f._2);return ad;});SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());Dataset<Row> words = spark.createDataFrame(adRDD,Advert.class);words.createOrReplaceGlobalTempView("words");Dataset<Row> result = spark.sql("select userId from (select userId,advertId,count(*) from words group by userId,advertId having count(*) > 2 a");//3.将实时产生的黑名单存入MYSQL数据库  result.write().format("jdbc").option("url","jdbc:mysql://localhost:3306/studentInfo").option("driver","com.mysql.jdbc.Driver").option("dbtable","lists").option("user","debian-sys-maint").option("password","6KCiLZuGt5t8EuZU").mode("append").save();});//4.实时从MYSQL中读取黑名单JavaPairDStream<String,Integer> data2 = data.transformToPair(rdd -> {SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());Dataset<Row> jdbcDF = spark.read().format("jdbc").option("url", "jdbc:mysql://localhost:3306/studentInfo").option("driver","com.mysql.jdbc.Driver").option("dbtable","lists").option("user","debian-sys-maint").option("password","6KCiLZuGt5t8EuZU").load();JavaRDD<Row> stu = ssc.sparkContext().parallelize(jdbcDF.collectAsList());JavaRDD<String> rdd1 = stu.map(f -> f.toString());List<String> rdd2 = rdd1.distinct().collect();//5.根据黑名单内容对获得的广告点击数据进行过滤,去掉黑名单中的用户JavaPairRDD<String,String> rdd3 = rdd.filter(f -> !(rdd2.contains(f._1)));//6.实时统计广告点击数 7.输出前三点击量的广告到文件JavaPairRDD<String,Integer> rdd4 = rdd3.mapToPair(f -> new Tuple2<String,Integer>(f._2,1)).reduceByKey((x,y) -> x+y);JavaPairRDD<String,Integer> rdd5 = rdd4.mapToPair(f -> f.swap()).sortByKey(false).mapToPair(f -> f.swap());JavaPairRDD<String,Integer> rdd6 = ssc.sparkContext().parallelizePairs(rdd5.take(3));return rdd6;            });data2.dstream().repartition(1).saveAsTextFiles("/home/yyl/data/top3","spark");ssc.start();ssc.awaitTermination();}   //2.处理数据,获得每天对某个广告点击超过N次的用户public static class JavaSparkSessionSingleton{private static transient SparkSession instance = null;public static SparkSession getInstance(SparkConf sparkConf) {if(instance == null) {instance = SparkSession.builder().config(sparkConf).getOrCreate();}return instance;}}  public static class Advert implements Serializable{private String userId;private String advertId;public String getUserId() {return userId;}public void setUserId(String userId) {this.userId = userId;}public String getAdvertId() {return advertId;}public void setAdvertId(String advertId) {this.advertId = advertId;}}}-=============================================================================================================
package thisterm;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;public class filterblock {private static final Pattern SPACE = Pattern.compile(" ");public static void main(String[] args) throws IOException, InterruptedException {if (args.length < 2) {System.err.println("需要传入参数:主机名端口号");System.exit(1);}SparkConf sparkConf = new SparkConf().setAppName("JavaNetWorkWordCount").setMaster("local[2]");JavaStreamingContext scc = new JavaStreamingContext(sparkConf,Durations.seconds(10));
//       JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkMysql").setMaster("local[5]"));SQLContext sqlContext = new SQLContext(scc.sparkContext());String url = "jdbc:mysql://localhost:3306/name";Properties connectionProperties = new Properties();connectionProperties.put("user","root");connectionProperties.put("password","123456");connectionProperties.put("driver","com.mysql.cj.jdbc.Driver");JavaReceiverInputDStream<String> lines = scc.socketTextStream(args[0],Integer.parseInt(args[1]),StorageLevels.MEMORY_AND_DISK_SER);JavaDStream<String> words = lines.flatMap(f -> {return Arrays.asList(f).iterator();});    JavaPairDStream<String,Integer> wordCounts = words.mapToPair(f -> new Tuple2<>(f,1)).reduceByKey((x,y) -> x + y);JavaPairDStream<String,Integer> wordCountfiltter=wordCounts.filter(f->f._2>2);//333 flume  1JavaDStream<String> wordC=wordCountfiltter.map(f->f._1.split(" ")[1]+" "+f._2);//flume  1JavaDStream<Row> personsRDD = wordC.map(new Function<String,Row>(){public Row call(String line) throws Exception {String[] splited = line.split(" ");return RowFactory.create(splited[0],Integer.valueOf(splited[1]));}});List structFields = new ArrayList();structFields.add(DataTypes.createStructField("bname",DataTypes.StringType,true));structFields.add(DataTypes.createStructField("number",DataTypes.IntegerType,true));StructType structType = DataTypes.createStructType(structFields);personsRDD.foreachRDD(f->{Dataset personsDF = sqlContext.createDataFrame(f,structType);personsDF.write().mode("append").jdbc(url,"blockname1",connectionProperties);});List<String> listblock=new ArrayList<String>();personsRDD.foreachRDD(f->{Dataset<Row> personsDF = sqlContext.createDataFrame(f,structType);Dataset<Row> readfile= sqlContext.read().jdbc(url,"blockname1",connectionProperties);JavaRDD<Row> stu=scc.sparkContext().parallelize(readfile.collectAsList());JavaRDD<String> rdd1=stu.map(f1->f1.toString().split(",")[0].substring(1));rdd1.foreach(f2->System.err.println(f2));List<String> list = rdd1.distinct().collect();//5.根据黑名单内容对获得的广告点击数据进行过滤,去掉黑名单中的用户listblock.addAll(list);// System.out.println(stu.toString());// readfile.show();});words.foreachRDD(f->{JavaPairRDD<String,String> rdd=f.mapToPair(s->new Tuple2<String,String>(s.split(" ")[0],s.split(" ")[1]));JavaPairRDD<String,String> rdd3 = rdd.filter(ff -> !(listblock.contains(ff._1)));//6.实时统计广告点击数 7.输出前三点击量的广告到文件JavaPairRDD<String,Integer> rdd4 = rdd3.mapToPair(f3 -> new Tuple2<String,Integer>(f3._2,1)).reduceByKey((x,y) -> x+y);JavaPairRDD<String,Integer> rdd5 = rdd4.mapToPair(f4 -> f4.swap()).sortByKey(false).mapToPair(f4 -> f4.swap());JavaPairRDD<String,Integer> rdd6 = scc.sparkContext().parallelizePairs(rdd5.take(3));});wordCountfiltter.print();scc.start();scc.awaitTermination();}}

代码如下

第一个类

import java.io.Serializable;
import java.util.List;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;public class heimingdan {public static class Advert implements Serializable{private String userid;public String getUserid() {return userid;}public void setUserid(String userid) {this.userid = userid;}public String getAdvertid() {return advertid;}public void setAdvertid(String advertid) {this.advertid = advertid;}private String advertid;}public static void main(String[] args) throws InterruptedException  {SparkConf sparkConf = newSparkConf().setAppName("Streaming").setMaster("local[2]");JavaStreamingContext ssc =new JavaStreamingContext(sparkConf,Durations.seconds(60));JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),StorageLevels.MEMORY_AND_DISK_SER);//鑾峰彇瀹炴椂鏁版嵁JavaPairDStream<String,String> data=lines.mapToPair(f->new Tuple2<>(f.split(",")[0],f.split(",")[1]));data.foreachRDD(rdd->{JavaRDD<Advert> adRDD =rdd.map(f->{Advert ad = new Advert();ad.setUserid(f._1);ad.setAdvertid(f._2);return ad;});SparkSession spark=JavaSparkSessionSingleton.getInstance(rdd.context().getConf());Dataset<Row> words = spark.createDataFrame(adRDD, Advert.class);words.createOrReplaceTempView("words");Dataset<Row> result =spark.sql("select userid from(select userid,advertid,count(*)from words group by userid,advertid having count(*)>2)a");//澶勭悊鏁版嵁锛岃幏寰楁瘡澶╁鏌愪釜骞垮憡鏁拌秴杩�2娆$殑鐢ㄦ埛result.write().format("jdbc").option("url","jdbc:mysql://localhost:3306/studentInfo").option("driver","com.mysql.jdbc.Driver").option("dbtable","hmd").option("user","root").option("password","123456").mode("append").save();});//灏嗗疄鏃朵骇鐢熺殑榛戝悕鍗曞瓨鍏ySQL鏁版嵁搴�JavaPairDStream<String,Integer> data2 = data.transformToPair(rdd->{SparkSession spark= JavaSparkSessionSingleton.getInstance(rdd.context().getConf());Dataset<Row> jdbcDF = spark.read().format("jdbc").option("url", "jdbc:mysql://localhost:3306/studentInfo").option("driver","com.mysql.jdbc.Driver").option("dbtable","hmd").option("user","root").option("password","123456").load();JavaRDD<Row> stu = ssc.sparkContext().parallelize(jdbcDF.collectAsList());JavaRDD<String> rdd1 = stu.map(f->f.toString());List<String> list = rdd1.distinct().collect();//瀹炴椂浠嶮ySQL涓鍙栭粦鍚嶅崟JavaPairRDD<String,String> rdd3 =rdd.filter(f->!(list.contains(f)));//杩囨护鎺夐粦鍚嶅崟JavaPairRDD<String,Integer> rdd4 = rdd3.mapToPair(f->new Tuple2<>(f._2,1)).reduceByKey((x,y)->x+y);//瀹炴椂缁熻骞垮憡鐐瑰嚮鏁�JavaPairRDD<String,Integer> rdd5 = rdd4.mapToPair(f->f.swap()).sortByKey(false).mapToPair(f->f.swap());JavaPairRDD<String,Integer> rdd6 = ssc.sparkContext().parallelizePairs(rdd5.take(3));//杈撳嚭鍓嶄笁鐐瑰嚮閲忕殑骞垮憡鍒版枃浠�return rdd6;});data2.dstream().repartition(1).saveAsTextFiles("/home/gyq/妗岄潰/blacklist","spark");data2.print();ssc.start();ssc.awaitTermination();}
}

第二个类


import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;public class JavaSparkSessionSingleton {private static transient SparkSession instance = null;public static SparkSession getInstance(SparkConf sparkConf) {if(instance == null) {instance = SparkSession.builder().config(sparkConf).getOrCreate();}return instance;}
}   

spark_黑名单过滤题目:广告点击数据统计。相关推荐

  1. SparkStreaming 实现广告计费系统中在线黑名单过滤实战

    本博文内容主要包括以下内容: 1.在线黑名单过滤实现解析 2.SparkStreaming实现在线黑名单过滤 一.在线黑名单过滤实现解析: 流式处理是现代数据处理的主流,各种电子商务网站,搜索引擎等网 ...

  2. 大数据Spark入门案例5–统计广告点击数量排行Top3(scala版本)

    大数据Spark入门案例5–统计每广告点击数量排行Top3(scala版本) 1 数据准备 链接:https://pan.baidu.com/s/1afzmL-hNsAJl1_gx_dH2ag 提取码 ...

  3. 2021-08-08ctf中的上传upload题目.user.ini绕过后缀黑名单过滤(同文件夹下有php文件突破口)

    从SUCTF 2019 CheckIn 浅谈.user.ini的利用 / 2019-08-28 08:59:00 / 转自loong大佬-来自先知社区 <span class="con ...

  4. 基于Hadoop的电商广告点击数的分析与可视化(Shell脚本执行与大屏可视化设计)

    目录 摘要 大屏可视化预览 如何安装Hadoop集群 数据集介绍 项目部署流程 一键化配置环境和参数 一键化建立hive数据表 Flume配置及自动加载数据到hive中 数据分析 mysql接收数据表 ...

  5. 2345浏览器如何启用过滤弹窗广告

    2345浏览器是一款非常便捷的浏览服务软件,有很多用户都会使用手机浏览更多的信息,随时都可以享受便捷的服务,使用过的用户都会知道,浏览器的资讯会有广告,其实在看视频的时候也会有广告,那么要怎么过滤弹窗 ...

  6. SparkStreaming通过读取文件动态黑名单过滤

    SparkStreaming通过读取文件动态黑名单过滤 定时从blackName中拷贝文件到write文件夹中 public class CopyFile {public static void co ...

  7. 可过滤多种广告的“ADM(阿呆喵)广告拦截工具

    网络上的广告有时让人目不暇接,观看视频时也会有一段广告.为了去除这些广告,可以使用一些过滤规则.广告屏蔽插件等.本文提供的这个工具为卡饭论坛网友"Tick90011"制作,可以高效 ...

  8. 火狐、chrome浏览器过滤网页广告设置过程

    火狐.chrome浏览器过滤百度广告设置 一.火狐浏览器设置过程(推荐) 1.火狐浏览器访问about:addons,即附加组件页: 2.左侧点击"获取附加组件",变换到获取组件页 ...

  9. 第103课:动手实战联合使用Spark Streaming、Broadcast、Accumulator实现在线黑名单过滤和计数

    第103课:动手实战联合使用Spark Streaming.Broadcast.Accumulator实现在线黑名单过滤和计数 /* 王家林老师授课http://weibo.com/ilovepain ...

最新文章

  1. 遥控车_vijos1458_纪中1724_水
  2. 42.虚拟内存如何设置:
  3. 如何知道react对象的值_基于react怎么获取兄弟元素的对象或它的值?
  4. 在centos上搭建svn服务器
  5. Adnroid提高效率之资源文件改名
  6. python gis 经纬度 库_入门-Python-GIS坐标转换
  7. OpenCV cv::merge用法的实例(附完整代码)
  8. JDK 14 – JEP 361从预览中切换表达式
  9. 收藏!最强行人检测算法思维导图
  10. SpringSecurity 流程图
  11. 简易修复工具_汽车划痕的简单修复法,你get了吗?
  12. Emacs正则表达式+零宽断言/环视
  13. Mysql之wait_timeout参数生效办法
  14. 第16课:郭盛华课程PHP文件打开,读取
  15. 菜鸟学MAC - mac十大使用技巧
  16. 洛谷P1004方格取数
  17. 水の三角(超级卡特兰数/大施罗德数)
  18. 10分钟教你用python如何正确把妹
  19. Jenkins 从零开始安装部署
  20. VS2008输入中文乱码

热门文章

  1. 程序员锁死服务器搞垮上家公司后,下家公司说不敢惹不敢惹!
  2. Handler源码分析(超详细的)
  3. 多媒体处理—VC图像处理
  4. 1553B基础常识篇
  5. 【NuMaker-M2354试用】MicroSD 模块测评
  6. 连麦张小龙:谈微信 8.0 背后的思考
  7. 拜占庭故障 Byzantine failures
  8. 如何设计一个“好的”测试用例?
  9. 矮油~ C++ explicit关键字详解
  10. imu相机标定_相机+imu标定