spark(day06-spark算法、Spark Sql)
案例
处理u.data文件用户id 电影id 用户打分要求基于u.data文件,建立推荐系统模型,为789号用户推荐10部电影建模时,k的取值10~50之间,迭代次数:5~20次之间 λ:0.01~0.05之间额外要求:模型返回的10部电影id,处理之后,把电影id对应的电影名获 取到电影信息在u.item文件存储着电影id|电影名
原始的偏好矩阵往往是稀疏的,而用户因子矩阵和物品因子矩阵是稠密的。
所以我们可以通过用户因子矩阵计算出用户-用户之间的相似度。我们可以通过物品因子矩阵,计算出物品-物品之间的相似度
补充:对于k的取值,在生产环境下,建议:10~50之间,不易过多,因为k越大,计算代价越大。
过拟合(Overfitting)
当收集到样本数据之后,我们会建立自标方程去拟合样本数据。一般的,我们希望模型能够尽可能的去拟合样本数据。但如果拟合的过好,就会产生过拟合现象。
如果产生过拟合,会导致:模型在实验环境表现良好,但是模型在生产环境表现很差,失去实用价值I
解决过拟谷的于段:
1.引入正则化参数
2.更换模型,比如集成模型(比如 Radom Forest)等
部分数据
u.data
60 68 8 16637622
97 14 2 16637622
32 61 8 16637622
59 31 3 16637622
u.item
2|The Outlander Series Bundle (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
代码
package cn.com.alsmovieimport org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.mllib.recommendation.ALS/** 处理u.data文件* 用户id 电影id 用户打分* 要求基于u.data文件,建立推荐系统模型,为789号用户推荐10部电影* 建模时,k的取值10~50之间,迭代次数:5~20次之间 λ:0.01~0.05之间* * 额外要求:模型返回的10部电影id,处理之后,把电影id对应的电影名获取到* 电影信息在u.item文件存储着* 电影id|电影名* * */object Driver {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("alsmovie")val sc=new SparkContext(conf)val data=sc.textFile("D://data/ml/u.data",4)val movieData=sc.textFile("D://data/ml/u.item",4)//movieData : RDD[String]->RDD[(movieId ,movieName) ]//->collectAsMap->Map<movieId,movieName>val movieMap=movieData.map { line =>val info=line.split("\\|")val movieId=info(0).toIntval movieName=info(1)(movieId,movieName)}.collectAsMap//第一步:RDD[String]->RDD[Rating]val ratings=data.map { line =>val info=line.split(" ")val userId=info(0).toIntval movieId=info(1).toIntval score=info(2).toIntRating(userId,movieId,score)}//第二步:建立推荐系统模型//隐藏因子 k:50//迭代次数 10//过敏和 越大,误差越大val model=ALS.train(ratings, 50, 10, 0.01)//第三步:未指定用户推荐商品val u56=model.recommendProducts(56, 10).map { x =>val userId=x.userval movieId=x.productval movieName=movieMap.get(movieId)val score=x.rating(userId,movieName,score)}// u56.foreach{println}//第四步:检验结果的质量,本例采用直观检验法//实现思路//1.先获取789号用户看过的所有电影,比如他看过了30部电影//2.再找出789号用户最喜爱的前10部电影//3.最后比较推荐的电影和他喜爱的电影是否有类似的//keyBy函数:根据匿名函数规则,指定以什么属性为key来查找//下面的代码表是以用户id属性为key来查找//lookup:具体查找的key值val u56Movies=ratings.keyBy { x => x.user }.lookup(56)val u56Top10=u56Movies.sortBy { x => -x.rating }.take(10).map { x => (x.user,movieMap(x.product),x.rating) }
// u56Top10.foreach{println}//第五步:推荐系统模型的存储。避免每次推荐时导致重新建模//路径可以是本地文件系统,也可以是HDFS(生产环境)model.save(sc, "D://data/ml/rec-result")
// model.save(sc, "hdfs://hadoop01:9000/rec-result")}
}
package cn.com.alsmovieimport org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
/** 从指定的目录下,加载推荐系统模型并使用* */object LoadDriver {def cosArray(a1:Array[Double],a2:Array[Double])={val a1a2=a1 zip a2val a1a2FenZi=a1a2.map{x=>x._1*x._2}.sumval a1FenMu=Math.sqrt(a1.map{x=>x*x}.sum)val a2FenMu=Math.sqrt(a2.map{x=>x*x}.sum)val a1a2Cos=a1a2FenZi/(a1FenMu*a2FenMu)a1a2Cos}def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local").setAppName("load")val sc=new SparkContext(conf)//加载模型val model=MatrixFactorizationModel.load(sc, "d://data/ml/rec-result")val movieData=sc.textFile("D://data/ml/u.item",4)//movieData : RDD[String]->RDD[(movieId ,movieName) ]//->collectAsMap->Map<movieId,movieName>val movieMap=movieData.map { line =>val info=line.split("\\|")val movieId=info(0).toIntval movieName=info(1)(movieId,movieName)}.collectAsMapval u56=model.recommendProducts(56, 10)u56.foreach { println }//--推荐系统的推荐方式有两种://--①基于用户的推荐,Spark的ALS模型只提供了基于用户的推荐//-②基于物品的推荐,比如某个用户看过了123号这部电影,然后腰求推荐和123号电影相关的电影//这种基于物品来推荐物品的方式,需要程序员自己实现(Spark的ALS模型没有提供)//--实现思路∶//--1.核心的需要计算出物品和物品之间的相似度//由此引出:需要获取物品因子矩阵//获取用户的因子矩阵val userFactors=model.userFeatures//获取物品的因子矩阵(主要用这个)//-RDD[( itemId,item的因子数组)]val itemFactors=model.productFeatures//2.比如我们想基于123号这部电影做推荐//所以先获取123号这部电影的因子值//下面的代码表示以电影id属性为key,具体找123号的数据val movie56Factor=itemFactors.keyBy{x=>x._1}.lookup(56).head._2//3.基于56号电影推荐10部电影//计算出其他电影和56号电影的相似度(向量之间的夹角余弦来计算)//然后按相似度做降序排序,取出前10部推荐val result=itemFactors.map{case(movieId,factor)=>//factor 和 movie56Factor 的余弦距离//计算当前电影和123号电影的夹角余弦val cos=cosArray(movie56Factor, factor)//返回的结果:(当前电影id,当前电影与123号电影的夹角余弦)(movieId,cos)}val movie56Top10=result.sortBy{x=> -x._2}.take(11).drop(1).map{x=> val movieName=movieMap(x._1)val cos=x._2(movieName,cos)}movie56Top10.foreach{println}}
}
展示:
(The Outlander Series Bundle (1995),0.7710896700540864)
(Around the World in 80 Days (1995),0.7679128162094432)
(Thirty Million Words (1995),0.7631992509153498)
(Gulliver‘s Travels (1995),0.7580377838881895)
(The Civil War Trilogy 3-Book Boxset (1995),0.750909684309467)
(伊索寓言:THE AESOP FOR CHILDREN (1995),0.7499406002999569)
(A Tale of Two Cities (1995),0.7440397407261276)
(Wonder (1995),0.7411365835679982)
(Emma (1995),0.733170332968417)
(中医儿科学 (1995),0.7278338152258704)
推荐系统的冷启动问题
概述
推荐系统需要根据用户的历史行为和兴趣预测用户未来的行为和兴趣,因此大量的用户行为数据就成为推荐系统的重要组成部分和先决条件。对于很多像百度、当当这样的网站来说,这或许不是个问题,因为它们目前已经积累了大量的用户数据。但是对于很多在开始阶段就希望有个性化推荐应用的网站来说,如何在没有大量用户数据的情况下设计个性化推荐系统并且让用户对推荐结果满意从而愿意使用推
荐系统,就是冷启动的问题。
总结:
冷启动指的是建立推荐系统模型时,缺少用户偏好数据或者对于一个新用户而言(此时后台数据中并没有此用户的偏好数据),对于出现的这种问题,称为推荐系统的冷启动。
如何解决:
共性推荐:
解决方案1:
新用户在注册时,根据选择的感兴趣方向做相关内容的推荐
解决方案2:
根据用户的注册信息,做推荐
解决方案3:
根据热搜排行榜推荐
当新用户的偏好数据有一定积累后,可以切换到个性推荐(基于用户推荐)
解决方案4:
根据用户浏览的商品,加入购物车的商品,购买的商品做物品推荐
物品冷启动:
当一个新的商品上架,一般都会在主页上推荐此商品,目的是让更多的用户浏览和购买此商品,从而积累商品的评分数据。
此外,对于新商品,最初可以人为打分。最初是不准的,但是随着打分(使用)人数增多,评分会趋近于真实值。
系统冷启动:
首要要解决数据源问题。
1.通过爬虫技术爬取数据
2.通过数据网站购买数据
3.自身平台的运营积累数据
总结:综上,推荐系统的冷启动分为:
①用户冷启动
②物品冷启动
③系统冷启动
Hadoop集群的搭建,管理和监控
CDH
Ambari
Kettle–ETL工具
数据的提取,转换和加载
Ozziz
任务调度框架
算法模型:
1.回归模型
2.聚类模型
3.分类模型
4.决策树模型
5.支持向量机模型
6.集成模型
7.降维模型
8.贝叶斯模型
9.推荐模型模型
10.神经网络模型
Spark Sql
概述
Spark为结构化数据处理引入了一个称为Spark SQL的编程模块。它提供了一个称为DataFrame(数据框)的编程抽象,DF的底层仍然是RDD,并且可以充当分布式SQL查询引擎。
SparkSql模块前身是Shark。
从上图可以看出Shark是基于Hive做的优化和改进。更换了底层的计算模块,并引入缓存机制。从性能和速度上要优于Hive。
但是Shark框架过度依赖于Hive。导致Shark和Spark其他模块的兼容并不是很好,所有后来,Spark舍弃了Shark,重新开发了SparkSql模块。SparkSql模块引入了DataFrame(数据框)的结构,可以把一个DataFrame就当做一张表,然后通过sql去操作这张表结构。DataFrame底层仍然是RDD。
此外,SparkSql也吸取了Shark的一些优点,比如内存列存储机制。
列存储的优点:
1.在查询时不存在冗余列问题。而行存储存在冗余列,而消除冗余列的过程是在内存里发生。这个优势在数据量大时被放大。
2.每一列数据都是同类型,可以避免类型之间的频繁转换,节省cpu
3.每一列数据都是同类型,所以可以采用更高效的压缩算法压缩数据比如存储性别列数据:(类型为比特)
男 | 女 | 男 | 女 |
---|---|---|---|
1 | 0 | 1 | 0 |
scala> val r1=sc.makeRDD(List((1,"tom"),(2,"rose"),(3,"jim")),2)
val df1=r1.toDF("id","name")
scala> df1.show
+---+----+
| id|name|
+---+----+
| 1| tom|
| 2|rose|
| 3| jim|
+---+----+
案例(普通文本转表数据)
[root@hadoop01 home]# vim 4.txt1 tom 23
2 rose 18
3 jim 25
scala> val r2=sc.textFile("file:///home/4.txt")
scala> val df2=r2.map{line=>line.split(" ")}.map{arr=>(arr(0).toInt,arr(1),arr(2).toInt)}
scala> val data=df2.toDF("id","name","age")
scala> data.show
+---+----+---+
| id|name|age|
+---+----+---+
| 1| tom| 23|
| 2|rose| 18|
| 3| jim| 25|
+---+----+---+
案例(json转表数据)
[root@hadoop01 home]# vim 5.txt{"id":1,"name":"tom"}
{"id":2,"name":"ros"}
{"id":3,"name":"jim"}
scala> import org.apache.spark.sql._
import org.apache.spark.sql._scala> val sqc=new SQLContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
sqc: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@74453b6cscala> val df3=sqc.read.json("file:///home/5.txt")
df3: org.apache.spark.sql.DataFrame = [id: bigint, name: string]scala> df3.show
+---+----+
| id|name|
+---+----+
| 1| tom|
| 2| ros|
| 3| jim|
+---+----+
案例3(读取数据库)
首先拷贝sql的jar包到spark中jar目录
[root@hadoop01 lib]# cp mysql-connector-java-5.1.39-bin.jar /home/presoftware/spark-2.0.1-bin-hadoop2.7/jars
scala> import org.apache.spark.sql._
import org.apache.spark.sql._
scala> val sqc=new SQLContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
sqc: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@ad0bb4escala> val prop=new java.util.Properties
prop: java.util.Properties = {}scala> prop.put("user","root")
res0: Object = nullscala> prop.put("password","root")
res1: Object = nullscala> val df1=sqc.read.jdbc("jdbc:mysql://hadoop01:3306/demo","score",prop)
df1: org.apache.spark.sql.DataFrame = [name: string, chinese: int ... 2 more fields]scala> df1.show
+------+-------+----+-------+
| name|chinese|math|english|
+------+-------+----+-------+
| Adair| 88| 82| 64|
| Alex| 64| 63| 68|
| Bob| 90| 64| 92|
| Chad| 66| 74| 37|
| Colin| 64| 86| 74|
| Eden| 71| 85| 43|
| Grace| 57| 86| 24|
|Grover| 99| 86| 43|
| Henry| 39| 79| 78|
+------+-------+----+-------+
查询中文成绩
scala> df1.select("chinese")
res3: org.apache.spark.sql.DataFrame = [chinese: int]scala> res3.show
+-------+
|chinese|
+-------+
| 88|
| 64|
| 90|
| 66|
| 64|
| 71|
| 57|
| 99|
| 39|
+-------+
带条件查询
scala> df1.select("name","chinese").where($"name"==="Bob").show
+----+-------+
|name|chinese|
+----+-------+
| Bob| 90|
+----+-------+
中文成绩升序
scala> df1.select("name","chinese").orderBy("chinese").show
+------+-------+
| name|chinese|
+------+-------+
| Henry| 39|
| Grace| 57|
| Colin| 64|
| Alex| 64|
| Chad| 66|
| Eden| 71|
| Adair| 88|
| Bob| 90|
|Grover| 99|
+------+-------+
降序
scala> df1.select("name","chinese").orderBy($"chinese".desc).show
+------+-------+
| name|chinese|
+------+-------+
|Grover| 99|
| Bob| 90|
| Adair| 88|
| Eden| 71|
| Chad| 66|
| Colin| 64|
| Alex| 64|
| Grace| 57|
| Henry| 39|
+------+-------+
案例(月份升序,利润降序)
scala> val r2=sc.makeRDD(List((2,150),(1,200),(2,300),(3,80),(1,120),(3,300)),2)
r2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[17] at makeRDD at <console>:27scala> val df2=r2.toDF("month","profit")
df2: org.apache.spark.sql.DataFrame = [month: int, profit: int]
scala> df2.select("month","profit").orderBy($"month",$"profit".desc).show
+-----+------+
|month|profit|
+-----+------+
| 1| 200|
| 1| 120|
| 2| 300|
| 2| 150|
| 3| 300|
| 3| 80|
+-----+------+
按月份分组,求最大利润
scala> df2.select("month","profit").groupBy("month").max("profit").show
+-----+-----------+
|month|max(profit)|
+-----+-----------+
| 1| 200|
| 3| 300|
| 2| 300|
+-----+-----------+
平均值
scala> df2.select("month","profit").groupBy("month").avg("profit").show
+-----+-----------+
|month|avg(profit)|
+-----+-----------+
| 1| 160.0|
| 3| 190.0|
| 2| 225.0|
+-----+-----------+
组合查询(最大值,最小值,平均值)
scala> df2.select("month","profit").groupBy("month").agg(max("profit"),min("profit"),avg("profit")).show
+-----+-----------+-----------+-----------+
|month|max(profit)|min(profit)|avg(profit)|
+-----+-----------+-----------+-----------+
| 1| 200| 120| 160.0|
| 3| 300| 80| 190.0|
| 2| 300| 150| 225.0|
+-----+-----------+-----------+-----------+
连接查询
scala> val dept=sc.parallelize(List((100,"caiwubu"),(200,"yanfabu"))).toDF("deptid","deptname")
dept: org.apache.spark.sql.DataFrame = [deptid: int, deptname: string]scala> val emp=sc.parallelize(List((1,100,"zhang"),(2,200,"li"),(3,300,"wang"))).toDF("id","did","name")
emp: org.apache.spark.sql.DataFrame = [id: int, did: int ... 1 more field]
scala> dept.join(emp,$"deptid"===$"did").show
+------+--------+---+---+-----+
|deptid|deptname| id|did| name|
+------+--------+---+---+-----+
| 100| caiwubu| 1|100|zhang|
| 200| yanfabu| 2|200| li|
+------+--------+---+---+-----+
运算
val df = sc.makeRDD(List(1,2,3,4,5)).toDF("num");
df.select($"num" * 100).show
使用列表
val df = sc.makeRDD(List(("zhang",Array("bj" ,"sh").("li",Array("sz","gz")).toDF("name","addrs")
df.selectExpr("name" ,"addrs[o]").show
使用结构体
{"name":"陈晨" ,"address":{"city":"西安" ,"street":"南二环甲字1号"}}
{"name":"娜娜" ,"address":{"city":"西安" ,"street":"南二环甲字2号"}}
val df = sqlContext.read.json("file:///root/work/users.json")
dfs.select("name" ,"address.street").show
使用sql操作
//设置表对象对应的表名
df1.registerTempTable("tb1")
scala> sqc.sql("select *from tb1").show
+------+-------+----+-------+
| name|chinese|math|english|
+------+-------+----+-------+
| Adair| 88| 82| 64|
| Alex| 64| 63| 68|
| Bob| 90| 64| 92|
| Chad| 66| 74| 37|
| Colin| 64| 86| 74|
| Eden| 71| 85| 43|
| Grace| 57| 86| 24|
|Grover| 99| 86| 43|
| Henry| 39| 79| 78|
+------+-------+----+-------+
scala> sqc.sql("select *from tb1 where chinese>70").show
+------+-------+----+-------+
| name|chinese|math|english|
+------+-------+----+-------+
| Adair| 88| 82| 64|
| Bob| 90| 64| 92|
| Eden| 71| 85| 43|
|Grover| 99| 86| 43|
+------+-------+----+-------+
scala> sqc.sql("select *from tb1 where name='Bob'").show
+----+-------+----+-------+
|name|chinese|math|english|
+----+-------+----+-------+
| Bob| 90| 64| 92|
+----+-------+----+-------+
中文成绩降序排序
scala> sqc.sql("select *from tb1 order by chinese desc").show
+------+-------+----+-------+
| name|chinese|math|english|
+------+-------+----+-------+
|Grover| 99| 86| 43|
| Bob| 90| 64| 92|
| Adair| 88| 82| 64|
| Eden| 71| 85| 43|
| Chad| 66| 74| 37|
| Colin| 64| 86| 74|
| Alex| 64| 63| 68|
| Grace| 57| 86| 24|
| Henry| 39| 79| 78|
+------+-------+----+-------+
查询月最大值
scala> sqc.sql("select * from tb2 ").show
+-----+------+
|month|profit|
+-----+------+
| 2| 150|
| 1| 200|
| 2| 300|
| 3| 80|
| 1| 120|
| 3| 300|
+-----+------+scala> sqc.sql("select month,max(profit) from tb2 group by month").show
+-----+-----------+
|month|max(profit)|
+-----+-----------+
| 1| 200|
| 3| 300|
| 2| 300|
+-----+-----------+
查看表
sqc.sql("show tables").showscala> sqc.sql("show tables").show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
| tb1| true|
| tb2| true|
+---------+-----------+
eclipse操作
package cn.com.sqlimport org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext/** 对于Sparksql模块需要掌握的:* 1.会将一个RDD转变为一个DataFrame* 2.掌握DataFrame一些基本的查询,df.select() df.select().where()* 3.掌握通过sql方式查询一个DataFrame* * * */
object Driver {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local").setAppName("sql")val sc=new SparkContext(conf)val sqc=new SQLContext(sc)val data=sc.textFile("d://data/ml/logistic.txt")//RDD[String]- >RDD[(,.,)]val parseData=data.map { line =>val info=line.split(" ")val age=info(0).toIntval vision=info(1).toIntval driver=info(2).toIntval accident=info(3).toInt(age,vision,driver,accident)}//val df=sqc.createDataFrame(parseData).toDF("age","vision","driver","accident")//导入sqc对象的隐式转换包,可以隐式的将RDD转变为DataFrame,相当于省略createDataFrame方法import sqc.implicits._val df=parseData.toDF("age","vision","driver","accident")
// df.show()df.registerTempTable("tb2")val r1=sqc.sql("select * from tb2 where age>18")r1.show//如果出现了结果数过多的bug,可以通过coalesce()方法来调整分区数val resultRDD=r1.javaRDD.coalesce(1)//结果存存储resultRDD.saveAsTextFile("d://data/sql-result")}
}
=============================
SparkStreaming
Spark提供了SparkStreaming模块,用于实时流数据处理。即随着数据的实时到达,进行实时计算。目前实时计算框架:
1.Storm
2.SparkStreaming
3.Flink
从上图可以看到,SparkStreaming可以接受多种数据源的数据,然后经过处理产生的结果可以存储到HDFS,HBase,Mysql等
SparkStreaming的工作原理图
详细说明
Spark Streaming是将流式计算分解成一系列短小的批处理作业,也就是把Spark Streaming的输入数据按照batch size (如1秒)分成一段一段的数据DStream ( Discretized-离散化 Stream ),每一段数据都转换成Spark中的RDD ( Resilient Distributed Dataset),然后将Spark Streaming中对DStream的Transformations操作变为针对Spark中对RDD的Transformations操作,将RDD经过操作变成中间结果保存在内存中。整个流式计算根据业务的需求可以对中间的结果进行叠加或者存储到外部设备。
总结
1.SparkStreaming监听指定的数据源(比如HDFS,Kafka等),数据连续不断的传输给SparkStreaming
2.SparkStreaming将接收的数据源根据指定的批大小(batch size) ,离散(divide)化成一段一段的数据
补充:批大小是时间单位,比如几秒,几十秒等
3.这一段一段的数据,会被SparkStreaming封装成一个一个的DStream(离散化的数据流),每一个DStream底层就是一个RDD。
4.实时计算处理的就是一个一个的DStream,所以本质上就是处理一个个的RDD。
操作
scala> import org.apache.spark.streaming._
//5秒为一个批次
scala> val ssc=new StreamingContext(sc,Seconds(5))
scala> val streamData=ssc.textFileStream("file:///home/data")
scala> streamData.print
scala> ssc.start
拷贝一个文件观察控制台
[root@hadoop01 data]# cp ../1.txt ./
Time: 1664019995000 ms
-------------------------------------------
hello world
hello hadoop
hello spark
设置本地线程数量启动
案例(实统计分布式文件系统的文件----需要启动分布式文件系统)
SparkStreaming实现
WordCount的实时频次统计
[root@hadoop01 bin]# sh spark-shell --master=local[3]
scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._scala> val ssc=new StreamingContext(sc,Seconds(5))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@6eed46e9scala> val streamData=ssc.textFileStream("hdfs://hadoop01:9000/data")
streamData: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@4a47bc9cscala> val result=streamData.flatMap{_.split(" ")}.map{(_,1)}.reduceByKey{_+_}
result: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@563bd6a4scala> result.printscala> ssc.start
[root@hadoop01 home]# cat 1.txt
hello world
hello hadoop
hello spark//上传文件
[root@hadoop01 home]# hadoop fs -put 1.txt /data
展示
Time: 1664070355000 ms
-------------------------------------------
(world,1)
(hadoop,1)
(hello,3)
(spark,1)
===========================
实现WordCount历史的频次累加统计
updateStateByKey{(seq, op:Option[Int]) => { Some(seq.sum +op.get0rElse(0))}}
代码
scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._scala> val ssc=new StreamingContext(sc,Seconds(5))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@6ad6ae45scala> val streamData=ssc.textFileStream("hdfs://hadoop01:9000/data")
streamData: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@d949bc4scala> ssc.checkpoint("file:///home/check")scala> val r1=streamData.flatMap{_.split(" ")}.map{(_,1)}
r1: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@3f030217scala> val r2=r1.updateStateByKey{(seq,op:Option[Int])=>Some(seq.sum+op.getOrElse(0))}
r2: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.StateDStream@7c48ea9escala> r2.printscala> ssc.startscala> -------------------------------------------
Time: 1664071660000 ms
=================
每隔一段时间重新计算下一段时间的数据
SparkStreaming提供了滑动窗口机制
案例(窗口长度10s,批大小5s)
//删除Hadoop目录。再新增
[root@hadoop01 home]# hadoop fs -put 1.txt /data
[root@hadoop01 home]# hadoop fs -mkdir /data
scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._scala> val ssc=new StreamingContext(sc,Seconds(5))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@6eed46e9scala> ssc.checkpoint("file:///home/windowcheck")scala> val streamData=ssc.textFileStream("hdfs://hadoop01:9000/data")
streamData: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@47ffdbbascala> val r1=streamData.flatMap{_.split(" ")}.map{(_,1)}
r1: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@1cfd6f63scala> val r2=r1.reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Seconds(10),Seconds(10))
r2: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@7be9e9fcscala> r2.printscala> ssc.start
案例
eclipse操作(eclipse监听hdfs上传文件内容—不好使—光打印时间戳,内容监听不到)
原因:物理机器(eclipse所在的windows)的时间戳和虚拟机器(hdfs所在的Linux)的时间戳不一致导致
解决:更改Linux的时间
date -s"20210712 18:30:50"
代码
package cn.com.streamingimport org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Secondsobject Driver {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local[2]").setAppName("streaming")val sc=new SparkContext(conf)//创建SparkStreaming的上下文件对象,用于监听数据源,并将数据源封装成Dtream操作//设置batch size 大小,并不是越小越好,尽量达到上一批次刚好处理完,下一个批次刚好达到val ssc=new StreamingContext(sc,Seconds(5))// val streamData=ssc.textFileStream("d://")val streamData=ssc.textFileStream("hdfs://hadoop01:9000/data")val result=streamData.flatMap{_.split(" ")}.map{(_,1)}.reduceByKey{_+_}result.print()ssc.start()//保持SparkStreaming一直开启,直到用户手动终止退出ssc.awaitTermination()}
}
案例
windows监听本地文件内容(复制粘贴文件方式监听不到-通过代码写一个流文件可以监听到)
代码
流文件
package cn.tedu.io.buffer;import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;/**
*@author 作者:
*@version 创建时间:2020年10月29日下午8:58:49
*@description 描述:
*/
public class BufferWriterDemo1 {public static void main(String[] args) throws IOException {//BufferedWriter bw=new BufferedWriter(new FileWriter("D:\\b.txt"));//写出数据bw.write("abc aaa");//换行//不同操作系统下的换行符不一样//window-----\r\n linux----\nbw.newLine();bw.write("123 111");//关流---冲刷缓冲区bw.close();}
}
sparkstreaming监听代码
package cn.com.streamingimport org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Secondsobject Driver {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local[2]").setAppName("streaming")val sc=new SparkContext(conf)//创建SparkStreaming的上下文件对象,用于监听数据源,并将数据源封装成Dtream操作//设置batch size 大小,并不是越小越好,尽量达到上一批次刚好处理完,下一个批次刚好达到val ssc=new StreamingContext(sc,Seconds(5))val streamData=ssc.textFileStream("d://")
// val streamData=ssc.textFileStream("hdfs://hadoop01:9000/data")val result=streamData.flatMap{_.split(" ")}.map{(_,1)}.reduceByKey{_+_}result.print()ssc.start()//保持SparkStreaming一直开启,直到用户手动终止退出ssc.awaitTermination()}
}
效果展示(成功监听)
22/09/25 22:19:10 INFO JobScheduler: Finished job streaming job 1664115550000 ms.0 from job set of time 1664115550000 ms
22/09/25 22:19:10 INFO JobScheduler: Total delay: 0.244 s for time 1664115550000
ms (execution: 0.155 s)
-------------------------------------------
Time: 1664115550000 ms
-------------------------------------------
(123,1)
(abc,1)
(111,1)
(aaa,1)
spark(day06-spark算法、Spark Sql)相关推荐
- 学习笔记Spark(六)—— Spark SQL应用(1)—— Spark SQL简介、环境配置
一.Spark SQL简介 1.1.Spark SQL特性 Spark SQL是Spark Core之上的一个组件,它引入了一个称为SchemaRDD的新- 数据抽象,它为结构化和半结构化数据提供支持 ...
- 基于Spark的Als算法+自迭代+Spark2.0新写法
主要介绍了一下几点: 1矩阵分解的几种算法 2spark使用矩阵分解的几种方式,1ml 包中使用,2mllib包中的使用,其实有不调用包自己写的案列(可以去看看哈,就在example目录) 3使 ...
- Spark四大组件包括Spark Streaming、Spark SQL、Spark MLlib和Spark GraphX。
Spark四大组件包括Spark Streaming.Spark SQL.Spark MLlib和Spark GraphX.它们的主要应用场景是: Spark Streaming: Spark Str ...
- Spark MLlib机器学习 | 算法综合实战(一)(史上最详细)
========== ========= 8.1.1 什么是机器学习 机器学习可以看做是一门人工智能的科学,该领域的主要研究对象是人工智能.机器学习利用 ...
- spark day06 + day07 + day08
目录 1.spark SQL 基本信息 1.什么是sparksql 2.strucrured data 3.sparksql特征 4.概述 1.sparksql性能比spark rdd高 2.Spar ...
- Spark MLlib回归算法------线性回归、逻辑回归、SVM和ALS
Spark MLlib回归算法------线性回归.逻辑回归.SVM和ALS 1.线性回归: (1)模型的建立: 回归正则化方法(Lasso,Ridge和ElasticNet)在高维和数据集变量之间多 ...
- Spark ML - 聚类算法
http://ihoge.cn/2018/ML2.html Spark ML - 聚类算法 1.KMeans快速聚类 首先到UR需要的包: import org.apache.spark.ml.clu ...
- 摄像头大数据分析跟踪均值漂移算法-spark和python
非结构化数据的大数据处理 数据有文字,图片,音频,视频,这些都属于非结构化数据,计算机不能直接识别,摄像头信息需要进行预处理,解压,解码,去重,合并,提取,清洗,分词nlp,将图片,音频,视频等媒体信 ...
- spark 实现K-means算法
spark 实现K-means算法 package kmeans; import java.io.BufferedReader; import java.io.File; import java.io ...
- spark python_Python、流、SQL 有更新!耗时两年,Spark 3.0 重磅发布!
2020 年 6 月 19 日,经过近两年的开发之后,Apache Spark TM 3.0.0 版本终于面世了.据官方介绍,此次 Spark 3.0.0 版本更新了 3,400 多个补丁程序,将使 ...
最新文章
- 单片机学习从入门到入土?这3个关键点导致!
- 对付审稿人“强迫引用”,新方法来了,Nature都说好
- springboot Field userInfoInter in com.**.** required a bean of type ‘***.**‘
- Python+Selenium操作select下拉框
- 纽瓦克市政厅电脑遭勒索软件劫持,部分公共服务被迫瘫痪
- mysql全局变量 error_记录——node-mysql连接池遇到的全局变量问题
- React开发(128):ant design学习指南之input中addonBefore
- 基于hadoop架构的企业数字化转型,阿里数据中台实战案例
- SharePoint Designer 2010中的外部内容类型-SQL Server
- 内存占用_一文教你节省 90% 的内存占用
- 深入理解操作系统原理之操作系统概述
- 哈理工OJ 1151 追求(斐波那契变形【思维题目】)
- mysql sniffer 安装_ubunt 安装mysql-sniffer
- 计算机 桌面 休眠,电脑自动休眠在哪设置的_教你让电脑自动休眠状态的方法-系统城...
- ftp服务器软件,推荐几款免费的ftp服务器软件,ftp客户端下载
- 洛谷---三角形的分类C语言详解
- Centos上卸载阿里云盾
- 老照片修复清晰?父母以前的老照片还能修复吗?
- 玛格丽特·米德2019下半年EI会议与人格理论初探
- 计算机网络思科平台第五章测验答案