spark_黑名单过滤题目:广告点击数据统计。
题目:广告点击数据统计。
根据动态生成的黑名单进行过滤,实时统计广告点击流量前三。
背景:
在广告点击计费系统中,我们在线过滤掉黑名单的点击,进而保护广告商的利益,只进行有效的广告点击计费 。或者在防刷评分(或者流量)系统,过滤掉无效的投票或者评分或者流量。
实现目标:
1、实现实时动态黑名单机制:将每天对某个广告点击超过N次的用户拉黑。
2、基于黑名单的非法广告点击流量过滤机制。
3、每天各广告的点击流量实时统计。
4、统计每天Top3热门广告。
请完成Spark程序的编写,并完成实验报告上传。实验报告中应该包含实验步骤,代码,运行截图等。
电子信息学院专业实验报告.doc
第一步就是要安装mysql到虚拟机里面,然后在mysql里面建表
数据如下:
3333 flume
4444 ooize
5555 flume
4444 ooize
5555 flume
2222 hive
3333 hadoop
4444 hbase
3333 flume
4444 ooize
5555 flume
flume 1
hadoop 2
import java.io.Serializable;
import java.util.List;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;import org.apache.spark.streaming.Durations;public class finaltest {public static void main(String[] args) throws InterruptedException {// TODO Auto-generated method stub//1.获取实时数据SparkConf sparkConf = new SparkConf().setAppName("Streaming").setMaster("local[2]");JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,Durations.seconds(60));JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),StorageLevels.MEMORY_AND_DISK_SER);//2.处理数据,获得每天对某个广告点击超过N次的用户JavaPairDStream<String,String> data = lines.mapToPair(f -> new Tuple2<>(f.split(",")[0],f.split(",")[1]));data.foreachRDD(rdd -> {JavaRDD<Advert> adRDD = rdd.map(f -> {Advert ad = new Advert();ad.setUserId(f._1);ad.setAdvertId(f._2);return ad;});SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());Dataset<Row> words = spark.createDataFrame(adRDD,Advert.class);words.createOrReplaceGlobalTempView("words");Dataset<Row> result = spark.sql("select userId from (select userId,advertId,count(*) from words group by userId,advertId having count(*) > 2 a");//3.将实时产生的黑名单存入MYSQL数据库 result.write().format("jdbc").option("url","jdbc:mysql://localhost:3306/studentInfo").option("driver","com.mysql.jdbc.Driver").option("dbtable","lists").option("user","debian-sys-maint").option("password","6KCiLZuGt5t8EuZU").mode("append").save();});//4.实时从MYSQL中读取黑名单JavaPairDStream<String,Integer> data2 = data.transformToPair(rdd -> {SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());Dataset<Row> jdbcDF = spark.read().format("jdbc").option("url", "jdbc:mysql://localhost:3306/studentInfo").option("driver","com.mysql.jdbc.Driver").option("dbtable","lists").option("user","debian-sys-maint").option("password","6KCiLZuGt5t8EuZU").load();JavaRDD<Row> stu = ssc.sparkContext().parallelize(jdbcDF.collectAsList());JavaRDD<String> rdd1 = stu.map(f -> f.toString());List<String> rdd2 = rdd1.distinct().collect();//5.根据黑名单内容对获得的广告点击数据进行过滤,去掉黑名单中的用户JavaPairRDD<String,String> rdd3 = rdd.filter(f -> !(rdd2.contains(f._1)));//6.实时统计广告点击数 7.输出前三点击量的广告到文件JavaPairRDD<String,Integer> rdd4 = rdd3.mapToPair(f -> new Tuple2<String,Integer>(f._2,1)).reduceByKey((x,y) -> x+y);JavaPairRDD<String,Integer> rdd5 = rdd4.mapToPair(f -> f.swap()).sortByKey(false).mapToPair(f -> f.swap());JavaPairRDD<String,Integer> rdd6 = ssc.sparkContext().parallelizePairs(rdd5.take(3));return rdd6; });data2.dstream().repartition(1).saveAsTextFiles("/home/yyl/data/top3","spark");ssc.start();ssc.awaitTermination();} //2.处理数据,获得每天对某个广告点击超过N次的用户public static class JavaSparkSessionSingleton{private static transient SparkSession instance = null;public static SparkSession getInstance(SparkConf sparkConf) {if(instance == null) {instance = SparkSession.builder().config(sparkConf).getOrCreate();}return instance;}} public static class Advert implements Serializable{private String userId;private String advertId;public String getUserId() {return userId;}public void setUserId(String userId) {this.userId = userId;}public String getAdvertId() {return advertId;}public void setAdvertId(String advertId) {this.advertId = advertId;}}}-=============================================================================================================
package thisterm;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;public class filterblock {private static final Pattern SPACE = Pattern.compile(" ");public static void main(String[] args) throws IOException, InterruptedException {if (args.length < 2) {System.err.println("需要传入参数:主机名端口号");System.exit(1);}SparkConf sparkConf = new SparkConf().setAppName("JavaNetWorkWordCount").setMaster("local[2]");JavaStreamingContext scc = new JavaStreamingContext(sparkConf,Durations.seconds(10));
// JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkMysql").setMaster("local[5]"));SQLContext sqlContext = new SQLContext(scc.sparkContext());String url = "jdbc:mysql://localhost:3306/name";Properties connectionProperties = new Properties();connectionProperties.put("user","root");connectionProperties.put("password","123456");connectionProperties.put("driver","com.mysql.cj.jdbc.Driver");JavaReceiverInputDStream<String> lines = scc.socketTextStream(args[0],Integer.parseInt(args[1]),StorageLevels.MEMORY_AND_DISK_SER);JavaDStream<String> words = lines.flatMap(f -> {return Arrays.asList(f).iterator();}); JavaPairDStream<String,Integer> wordCounts = words.mapToPair(f -> new Tuple2<>(f,1)).reduceByKey((x,y) -> x + y);JavaPairDStream<String,Integer> wordCountfiltter=wordCounts.filter(f->f._2>2);//333 flume 1JavaDStream<String> wordC=wordCountfiltter.map(f->f._1.split(" ")[1]+" "+f._2);//flume 1JavaDStream<Row> personsRDD = wordC.map(new Function<String,Row>(){public Row call(String line) throws Exception {String[] splited = line.split(" ");return RowFactory.create(splited[0],Integer.valueOf(splited[1]));}});List structFields = new ArrayList();structFields.add(DataTypes.createStructField("bname",DataTypes.StringType,true));structFields.add(DataTypes.createStructField("number",DataTypes.IntegerType,true));StructType structType = DataTypes.createStructType(structFields);personsRDD.foreachRDD(f->{Dataset personsDF = sqlContext.createDataFrame(f,structType);personsDF.write().mode("append").jdbc(url,"blockname1",connectionProperties);});List<String> listblock=new ArrayList<String>();personsRDD.foreachRDD(f->{Dataset<Row> personsDF = sqlContext.createDataFrame(f,structType);Dataset<Row> readfile= sqlContext.read().jdbc(url,"blockname1",connectionProperties);JavaRDD<Row> stu=scc.sparkContext().parallelize(readfile.collectAsList());JavaRDD<String> rdd1=stu.map(f1->f1.toString().split(",")[0].substring(1));rdd1.foreach(f2->System.err.println(f2));List<String> list = rdd1.distinct().collect();//5.根据黑名单内容对获得的广告点击数据进行过滤,去掉黑名单中的用户listblock.addAll(list);// System.out.println(stu.toString());// readfile.show();});words.foreachRDD(f->{JavaPairRDD<String,String> rdd=f.mapToPair(s->new Tuple2<String,String>(s.split(" ")[0],s.split(" ")[1]));JavaPairRDD<String,String> rdd3 = rdd.filter(ff -> !(listblock.contains(ff._1)));//6.实时统计广告点击数 7.输出前三点击量的广告到文件JavaPairRDD<String,Integer> rdd4 = rdd3.mapToPair(f3 -> new Tuple2<String,Integer>(f3._2,1)).reduceByKey((x,y) -> x+y);JavaPairRDD<String,Integer> rdd5 = rdd4.mapToPair(f4 -> f4.swap()).sortByKey(false).mapToPair(f4 -> f4.swap());JavaPairRDD<String,Integer> rdd6 = scc.sparkContext().parallelizePairs(rdd5.take(3));});wordCountfiltter.print();scc.start();scc.awaitTermination();}}
代码如下
第一个类
import java.io.Serializable;
import java.util.List;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;public class heimingdan {public static class Advert implements Serializable{private String userid;public String getUserid() {return userid;}public void setUserid(String userid) {this.userid = userid;}public String getAdvertid() {return advertid;}public void setAdvertid(String advertid) {this.advertid = advertid;}private String advertid;}public static void main(String[] args) throws InterruptedException {SparkConf sparkConf = newSparkConf().setAppName("Streaming").setMaster("local[2]");JavaStreamingContext ssc =new JavaStreamingContext(sparkConf,Durations.seconds(60));JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),StorageLevels.MEMORY_AND_DISK_SER);//鑾峰彇瀹炴椂鏁版嵁JavaPairDStream<String,String> data=lines.mapToPair(f->new Tuple2<>(f.split(",")[0],f.split(",")[1]));data.foreachRDD(rdd->{JavaRDD<Advert> adRDD =rdd.map(f->{Advert ad = new Advert();ad.setUserid(f._1);ad.setAdvertid(f._2);return ad;});SparkSession spark=JavaSparkSessionSingleton.getInstance(rdd.context().getConf());Dataset<Row> words = spark.createDataFrame(adRDD, Advert.class);words.createOrReplaceTempView("words");Dataset<Row> result =spark.sql("select userid from(select userid,advertid,count(*)from words group by userid,advertid having count(*)>2)a");//澶勭悊鏁版嵁锛岃幏寰楁瘡澶╁鏌愪釜骞垮憡鏁拌秴杩�2娆$殑鐢ㄦ埛result.write().format("jdbc").option("url","jdbc:mysql://localhost:3306/studentInfo").option("driver","com.mysql.jdbc.Driver").option("dbtable","hmd").option("user","root").option("password","123456").mode("append").save();});//灏嗗疄鏃朵骇鐢熺殑榛戝悕鍗曞瓨鍏ySQL鏁版嵁搴�JavaPairDStream<String,Integer> data2 = data.transformToPair(rdd->{SparkSession spark= JavaSparkSessionSingleton.getInstance(rdd.context().getConf());Dataset<Row> jdbcDF = spark.read().format("jdbc").option("url", "jdbc:mysql://localhost:3306/studentInfo").option("driver","com.mysql.jdbc.Driver").option("dbtable","hmd").option("user","root").option("password","123456").load();JavaRDD<Row> stu = ssc.sparkContext().parallelize(jdbcDF.collectAsList());JavaRDD<String> rdd1 = stu.map(f->f.toString());List<String> list = rdd1.distinct().collect();//瀹炴椂浠嶮ySQL涓鍙栭粦鍚嶅崟JavaPairRDD<String,String> rdd3 =rdd.filter(f->!(list.contains(f)));//杩囨护鎺夐粦鍚嶅崟JavaPairRDD<String,Integer> rdd4 = rdd3.mapToPair(f->new Tuple2<>(f._2,1)).reduceByKey((x,y)->x+y);//瀹炴椂缁熻骞垮憡鐐瑰嚮鏁�JavaPairRDD<String,Integer> rdd5 = rdd4.mapToPair(f->f.swap()).sortByKey(false).mapToPair(f->f.swap());JavaPairRDD<String,Integer> rdd6 = ssc.sparkContext().parallelizePairs(rdd5.take(3));//杈撳嚭鍓嶄笁鐐瑰嚮閲忕殑骞垮憡鍒版枃浠�return rdd6;});data2.dstream().repartition(1).saveAsTextFiles("/home/gyq/妗岄潰/blacklist","spark");data2.print();ssc.start();ssc.awaitTermination();}
}
第二个类
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;public class JavaSparkSessionSingleton {private static transient SparkSession instance = null;public static SparkSession getInstance(SparkConf sparkConf) {if(instance == null) {instance = SparkSession.builder().config(sparkConf).getOrCreate();}return instance;}
}
spark_黑名单过滤题目:广告点击数据统计。相关推荐
- SparkStreaming 实现广告计费系统中在线黑名单过滤实战
本博文内容主要包括以下内容: 1.在线黑名单过滤实现解析 2.SparkStreaming实现在线黑名单过滤 一.在线黑名单过滤实现解析: 流式处理是现代数据处理的主流,各种电子商务网站,搜索引擎等网 ...
- 大数据Spark入门案例5–统计广告点击数量排行Top3(scala版本)
大数据Spark入门案例5–统计每广告点击数量排行Top3(scala版本) 1 数据准备 链接:https://pan.baidu.com/s/1afzmL-hNsAJl1_gx_dH2ag 提取码 ...
- 2021-08-08ctf中的上传upload题目.user.ini绕过后缀黑名单过滤(同文件夹下有php文件突破口)
从SUCTF 2019 CheckIn 浅谈.user.ini的利用 / 2019-08-28 08:59:00 / 转自loong大佬-来自先知社区 <span class="con ...
- 基于Hadoop的电商广告点击数的分析与可视化(Shell脚本执行与大屏可视化设计)
目录 摘要 大屏可视化预览 如何安装Hadoop集群 数据集介绍 项目部署流程 一键化配置环境和参数 一键化建立hive数据表 Flume配置及自动加载数据到hive中 数据分析 mysql接收数据表 ...
- 2345浏览器如何启用过滤弹窗广告
2345浏览器是一款非常便捷的浏览服务软件,有很多用户都会使用手机浏览更多的信息,随时都可以享受便捷的服务,使用过的用户都会知道,浏览器的资讯会有广告,其实在看视频的时候也会有广告,那么要怎么过滤弹窗 ...
- SparkStreaming通过读取文件动态黑名单过滤
SparkStreaming通过读取文件动态黑名单过滤 定时从blackName中拷贝文件到write文件夹中 public class CopyFile {public static void co ...
- 可过滤多种广告的“ADM(阿呆喵)广告拦截工具
网络上的广告有时让人目不暇接,观看视频时也会有一段广告.为了去除这些广告,可以使用一些过滤规则.广告屏蔽插件等.本文提供的这个工具为卡饭论坛网友"Tick90011"制作,可以高效 ...
- 火狐、chrome浏览器过滤网页广告设置过程
火狐.chrome浏览器过滤百度广告设置 一.火狐浏览器设置过程(推荐) 1.火狐浏览器访问about:addons,即附加组件页: 2.左侧点击"获取附加组件",变换到获取组件页 ...
- 第103课:动手实战联合使用Spark Streaming、Broadcast、Accumulator实现在线黑名单过滤和计数
第103课:动手实战联合使用Spark Streaming.Broadcast.Accumulator实现在线黑名单过滤和计数 /* 王家林老师授课http://weibo.com/ilovepain ...
最新文章
- 遥控车_vijos1458_纪中1724_水
- 42.虚拟内存如何设置:
- 如何知道react对象的值_基于react怎么获取兄弟元素的对象或它的值?
- 在centos上搭建svn服务器
- Adnroid提高效率之资源文件改名
- python gis 经纬度 库_入门-Python-GIS坐标转换
- OpenCV cv::merge用法的实例(附完整代码)
- JDK 14 – JEP 361从预览中切换表达式
- 收藏!最强行人检测算法思维导图
- SpringSecurity 流程图
- 简易修复工具_汽车划痕的简单修复法,你get了吗?
- Emacs正则表达式+零宽断言/环视
- Mysql之wait_timeout参数生效办法
- 第16课:郭盛华课程PHP文件打开,读取
- 菜鸟学MAC - mac十大使用技巧
- 洛谷P1004方格取数
- 水の三角(超级卡特兰数/大施罗德数)
- 10分钟教你用python如何正确把妹
- Jenkins 从零开始安装部署
- VS2008输入中文乱码