贵州烟草大数据之一:零售户聚类
1 思路
考虑零售户的总销售量,总库存量两个特征,用这两个特征对零售户进行聚类,找出哪些零售户供不应求、哪些零售户供过于求。合理安排配货。
2 数据存准备
在hive中创建两张表,之前已经导入了销售数据,现在创建库存数据
hive -e "create table hhstore_data(LICENSE_CODE BIGINT,ITEM_CODE INT,QTY_ORD DECIMAL(8,6),DATE1 INT,
TIME1 varchar(10),COM_NAME VARCHAR(10))row format delimited
fields terminated by ',';"
导入库存数据
hive -e "load data local inpath '/usr/local/tobacco_data/红华库存数据002.csv
' overwrite into table hhstore_data;"
这里顺便测试了一下hive的查询速度:
查出每个零售户的总库存和总销量
select COM_NAME, sum(a.QTY_ORD) store_qty, sum(b.QTY_ORD) sale_qty from hhstore_data a left join hhsale_data b on a.COM_NAME=b.COM_NAME group by a.COM_NAME limit 10;
好啊!这家伙竟然花了7105s 近两个小时!无法忍受
使用hive on spark试试:
package class6import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkContext, SparkConf}/*** Created by root on 16-1-4.* Spark+Hive整合我们知道,在使用Hive进行查询的时候,到底层MapReduce计算层会将HiveQL翻译成MapReduce程序,
在Hadoop平台上执行计算,这使得计算的延迟比较大。我们整合Spark和Hive,就是通过Spark平台来计算Hive查询,
也就是Hive不再使用它默认的MapReduce计算引擎,Spark会直接读取Hive的元数据存储,
将Hive数据转换成Spark RDD数据,通过Spark提供的计算操作来实现(Transformation和Action)。实验结果:下面那个查询hive用了7105s而hiveonspark只用了178s快了39.9倍*/
object HiveOnSpark {case class Record(key:Int,value:String)def main(args: Array[String]) {val sparkConf = new SparkConf().setAppName("HiveOnSpark")val sc = new SparkContext(sparkConf)val HiveContext = new HiveContext(sc)import HiveContext._
// sql("use hive")
// sql("select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a " +
// "join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on " +
// "a.dateid=c.dateid group by c.theyear order by c.theyear")
// .collect().foreach(println)sql("select a.COM_NAME, sum(a.QTY_ORD) store_qty, sum(b.QTY_ORD) sale_qty from hhstore_data a left join hhsale_data b " +"on a.COM_NAME=b.COM_NAME group by a.COM_NAME limit 10").collect().foreach(println)sc.stop()}}
嗯、花了178s,毕竟查询逻辑在那。快了近40倍。
3进行kmeans聚类
package class6import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkContext, SparkConf}/*** Created by root on 16-1-18.*/
object tobacco_in_out_km {def main(args: Array[String]) {val sparkConf = new SparkConf().setAppName("tobacco_in_out_km")val sc = new SparkContext(sparkConf)val HiveContext = new HiveContext(sc)import HiveContext._val sqldata = sql("select a.COM_NAME, sum(a.QTY_ORD) store_qty, sum(b.QTY_ORD) sale_qty from hhstore_data a left join hhsale_data b " +"on a.COM_NAME=b.COM_NAME group by a.COM_NAME")//将查询数据转换成向量val parsedData = sqldata.map{case Row(_,store_qty,sale_qty)=>val features = Array[Double](store_qty.toString.toDouble,sale_qty.toString.toDouble)Vectors.dense(features)}.cache()//注意使用cache提高效率。//对数据集聚类,3 个类,20 次迭代,形成数据模型//注意这里会使用设置的 partition 数 20val numClusters = 3val numIterations = 20val model = KMeans.train(parsedData,numClusters,numIterations)//打印数据模型的中心点println("---------------------------------------------------------------" +"Cluster centers:" +"---------------------------------------------------------------------")for(c <-model.clusterCenters){println(" "+c.toString)}//使用误差平方之和来评估数据模型,--------------------------------------模型在训练集上计算损失val cost=model.computeCost(parsedData)println("--------------------------------------------------------------------" +"Within Set Sum of Squared Errors=-----------------------------------------"+cost)用模型对读入的数据进行分类,并输出//由于 partition 没设置,输出为 200 个小文件,可以使用 bin/hdfs dfs -getmerge 合并//下载到本地val result2 = sqldata.map{case Row(com_name,store_qty,sale_qty)=>val features =Array[Double](store_qty.toString.toDouble,sale_qty.toString.toDouble)val linevectore = Vectors.dense(features)val prediction = model.predict(linevectore)com_name+" "+store_qty+" "+sale_qty+" "+prediction}.saveAsTextFile(args(0))}}
4提交任务
启动hive服务
$nohup hive --service metastore > metastore.log 2>&1 &
spark-submit --master spark://moon:7077 --class class6.tobacco_in_out_km --executor-memory 2g --total-executor-cores 4 tobacco_in_out.jar /class6/tobacco_in_out_km1spark-submit --master spark://moon:7077 --class class6.tobacco_in_out_km --executor-memory 2g --total-executor-cores 4 tobacco_in_out.jar /class6/tobacco_in_out_km_chach > /usr/local/spark/tobacco_km.log
程序在一台笔记本上跑了7个多小时。
对数据进行cache后只用了1个小时!
输出的评估内容:
1 ---------------------------------------------------------------Cluster centers:------------------------------------------------------------- -------- 2 [1.1661534191111112E9,1.9860203797777778E8]3 [2.0781018788888888E9,3.346425471111111E8]4 [5.471160729130435E8,9.399341230434783E7]5 --------------------------------------------------------------------Within Set Sum of Squared Errors=--------------------------------------- --7.5440041898105969E18
5获取结果
hdfs dfs -getmerge /class6/tobacco_in_out_km1 /usr/local/spark/tobacco_in_out_km_result.txt
聚类结果如下(部分,嘿嘿):
[094]兴关店 559219155.000000 51497618 0
[012]贵钢店 1081772705.000000 206043280 0
[056]观水店 1178053590.000000 122014678 1
[043]云阳店 1028635992.000000 168659280 0
[027]湘雅店 1744723578.000000 398014317 1
[077]凤凰翠堤 1045614528.000000 162392859 0
[054]O六一店 1123851861.000000 192251440 0
[073]和平店 1031278143.000000 157066644 0
6优化部分
1 去除脏数据
数据的第一条是字段名,值为null,不去掉的话会影响结果
val sqldata = sql("select a.COM_NAME, sum(a.QTY_ORD) store_qty, sum(b.QTY_ORD) sale_qty from (select * from hhstore_data where item_code is not null)a left join " +"hhsale_data b on a.COM_NAME=b.COM_NAME group by a.COM_NAME")
2 聚类数K选取
用程序选择K
computeCost 方法,该方法通过计算所有数据点到其最近的中心点的平方和来评估聚类的效果。一般来说,
同样的迭代次数和算法跑的次数,这个值越小代表聚类的效果越好。
但是在实际情况下,我们还要考虑到聚类结果的可解释性,不能一味的选择使 computeCost 结果值最小的那个 K
package class6import org.apache.spark.mllib.clustering.{KMeansModel, KMeans}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkContext, SparkConf}/*** Created by root on 16-1-18.*/
object tobacco_in_out_km {def main(args: Array[String]) {val sparkConf = new SparkConf().setAppName("tobacco_in_out_km")val sc = new SparkContext(sparkConf)val HiveContext = new HiveContext(sc)import HiveContext._val sqldata = sql("select a.COM_NAME, sum(a.QTY_ORD) store_qty, sum(b.QTY_ORD) sale_qty from (select * from hhstore_data where item_code is not null)a left join " +"hhsale_data b on a.COM_NAME=b.COM_NAME group by a.COM_NAME")//将查询数据转换成向量val parsedData = sqldata.map{case Row(_,store_qty,sale_qty)=>val features = Array[Double](store_qty.toString.toDouble,sale_qty.toString.toDouble)Vectors.dense(features)}.cache()val ks:Array[Int] = Array(3,4,5,6,7,8,9)ks.foreach(cluster => {val model:KMeansModel = KMeans.train(parsedData, cluster,20,1)val ssd = model.computeCost(parsedData)println("sum of squared distances of points to their nearest center when k=" + cluster + " -> "+ ssd)})//需要重复使用的模型可以保存下来的。。。。// //对数据集聚类,3 个类,20 次迭代,形成数据模型
// //注意这里会使用设置的 partition 数 20
// val numClusters = 3
// val numIterations = 20
// val model = KMeans.train(parsedData,numClusters,numIterations)
// //打印数据模型的中心点
// println("---------------------------------------------------------------" +
// "Cluster centers:" +
// "---------------------------------------------------------------------")
// for(c <-model.clusterCenters){// println(" "+c.toString)
// }
//
// //使用误差平方之和来评估数据模型,--------------------------------------模型在训练集上计算损失
//
// val cost=model.computeCost(parsedData)
// println("--------------------------------------------------------------------" +
// "Within Set Sum of Squared Errors=-----------------------------------------"+cost)
// 用模型对读入的数据进行分类,并输出
// //由于 partition 没设置,输出为 200 个小文件,可以使用 bin/hdfs dfs -getmerge 合并
// //下载到本地
// val result2 = sqldata.map{// case Row(com_name,store_qty,sale_qty)=>
// val features =Array[Double](store_qty.toString.toDouble,
// sale_qty.toString.toDouble)
// val linevectore = Vectors.dense(features)
// val prediction = model.predict(linevectore)
// com_name+" "+store_qty+" "+sale_qty+" "+prediction
// }.saveAsTextFile(args(0))}}
3 使用5G内存运行程序
hadoop@moon:/usr/local/spark/idea/LearnSpark/out/artifacts/tobacco_in_out$ spark-submit --master spark://moon:7077 --class class6.tobacco_in_out_km --executor-memory 5g --total-executor-cores 4 tobacco_in_out.jar >/usr/local/spark/tobacco_in_out_km_result3456789.txt
运行完毕用时26分钟,注意这里其实运行了7次KMeans算法
4 输出评估不同k的结果
1 sum of squared distances of points to their nearest center when k=3 -> 6.4890327862785249E18 2 sum of squared distances of points to their nearest center when k=4 -> 3.209487674100267E183 sum of squared distances of points to their nearest center when k=5 -> 2.1814111396728471E184 sum of squared distances of points to their nearest center when k=6 -> 1.30515483214681062E185 sum of squared distances of points to their nearest center when k=7 -> 1.18605067864590848E186 sum of squared distances of points to their nearest center when k=8 -> 7.1604954233549261E177 sum of squared distances of points to their nearest center when k=9 -> 6.0889529116193229E17
看来K=4是最合理的
package class6import org.apache.spark.mllib.clustering.{KMeansModel, KMeans}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkContext, SparkConf}/*** Created by root on 16-1-18.*/
object tobacco_in_out_km {def main(args: Array[String]) {val sparkConf = new SparkConf().setAppName("tobacco_in_out_km")val sc = new SparkContext(sparkConf)val HiveContext = new HiveContext(sc)import HiveContext._val sqldata = sql("select a.COM_NAME, sum(a.QTY_ORD) store_qty, sum(b.QTY_ORD) sale_qty from (select * from hhstore_data where item_code is not null)a left join " +"hhsale_data b on a.COM_NAME=b.COM_NAME group by a.COM_NAME")//将查询数据转换成向量val parsedData = sqldata.map{case Row(_,store_qty,sale_qty)=>val features = Array[Double](store_qty.toString.toDouble,sale_qty.toString.toDouble)Vectors.dense(features)}.cache()
// val ks:Array[Int] = Array(3,4,5,6,7,8,9)
// ks.foreach(cluster => {// val model:KMeansModel = KMeans.train(parsedData, cluster,20,1)
// val ssd = model.computeCost(parsedData)
// println("sum of squared distances of points to their nearest center when k=" + cluster + " -> "+ ssd)
// })//需要重复使用的模型可以保存下来的。。。。//对数据集聚类,3 个类,20 次迭代,形成数据模型//注意这里会使用设置的 partition 数 20val numClusters = 4val numIterations = 20val model = KMeans.train(parsedData,numClusters,numIterations)//打印数据模型的中心点println("---------------------------------------------------------------" +"Cluster centers:" +"---------------------------------------------------------------------")for(c <-model.clusterCenters){println(" "+c.toString)}//使用误差平方之和来评估数据模型,--------------------------------------模型在训练集上计算损失val cost=model.computeCost(parsedData)println("--------------------------------------------------------------------" +"Within Set Sum of Squared Errors=-----------------------------------------"+cost)用模型对读入的数据进行分类,并输出//由于 partition 没设置,输出为 200 个小文件,可以使用 bin/hdfs dfs -getmerge 合并//下载到本地val result2 = sqldata.map{case Row(com_name,store_qty,sale_qty)=>val features =Array[Double](store_qty.toString.toDouble,sale_qty.toString.toDouble)val linevectore = Vectors.dense(features)val prediction = model.predict(linevectore)com_name+" "+store_qty+" "+sale_qty+" "+prediction}.saveAsTextFile(args(0))}}
提交任务
hadoop@moon:/usr/local/spark/idea/LearnSpark/out/artifacts/tobacco_in_out$ spark-submit --master spark://moon:7077 --class class6.tobacco_in_out_km --executor-memory 5g --total-executor-cores 4 tobacco_in_out.jar /class6/tobacco_in_out_4
--------------------------------------------------------------------Within Set Sum of Squared Errors=-----------------------------------------3.2094876741002665E18
获取结果
hdfs dfs -getmerge /class6/tobacco_in_out_4 /usr/local/spark/tobacco_in_out_4_result.txt
1 [094]兴关店 559219155.000000 51497618 0 2 [012]贵钢店 1081772705.000000 206043280 23 [056]观水店 1178053590.000000 122014678 24 [043]云阳店 1028635992.000000 168659280 25 [027]湘雅店 1744723578.000000 398014317 16 [077]凤凰翠堤 1045614528.000000 162392859 27 [054]O六一店 1123851861.000000 192251440 2
.....
7修改
销售量跟库存量没有直接关系,销售量会小于进货量。
把库存为负的数据过滤掉
package class6import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkContext, SparkConf}/*** Created by root on 16-1-22.* 零售户按年库存量、销售量进行聚类* 两表join出现了数据重叠,考虑每次读一张表,利用RDD的join方法* 得到特征矩阵。。。*--------------------------------------------------------------------Within Set Sum of Squared Errors=-----------------------------------------2.6105260195375473E10**/
object tobacco_kmeans {def main(args: Array[String]) {val sparkConf = new SparkConf().setAppName("tobacco_kmeans")//.setMaster("local[4]")val sc = new SparkContext(sparkConf)val HiveContext = new HiveContext(sc)import HiveContext._/*销售数据*/val saledata = sql("select com_name ,sum(qty_ord) sale_qty from hhsale_data where puh_time is " +"not null group by com_name")/*库存数据*/val storedata = sql("select com_name ,sum(qty_ord) store_qty from hhstore_data where item_code is not " +"null and qty_ord >0 group by com_name")val data=saledata.join(storedata,"com_name")val parsedData = data.map{case Row(_, sale_qty, store_qty) =>val features = Array[Double](sale_qty.toString.toDouble,store_qty.toString.toDouble)Vectors.dense(features)}.cache()//.saveAsTextFile("/class6/data")/*标准化*/
// val summary = Statistics.colStats(parsedData)
// parsedData.map{
// case Row(_,sale,store) =>
//
// }// val dataAsArray = parsedData.map(_.toArray)
//val numCols = dataAsArray.first().length
//val n = dataAsArray.count()
//val sums = dataAsArray.reduce(
//(a,b) => a.zip(b).map(t => t._1 + t._2))
//val sumSquares = dataAsArray.fold(
//new Array[Double](numCols)
//)(
//(a,b) => a.zip(b).map(t => t._1 + t._2 * t._2)
//)
//val stdevs = sumSquares.zip(sums).map {
//case(sumSq,sum) => math.sqrt(n*sumSq - sum*sum)/n
//}
//val means = sums.map(_ / n)
// def normalize(datum: Vector) = {
//val normalizedArray = (datum.toArray, means, stdevs).zipped.map(
//(value, mean, stdev) =>
//if (stdev <= 0) (value - mean) else (value - mean) / stdev
//)
//Vectors.dense(normalizedArray)
//}val numClusters = 3val numIterations = 20val model = KMeans.train(parsedData,numClusters,numIterations)//打印数据模型的中心点println("---------------------------------------------------------------" +"Cluster centers:" +"---------------------------------------------------------------------")for(c <-model.clusterCenters){println(" "+c.toString)}//使用误差平方之和来评估数据模型,--------------------------------------模型在训练集上计算损失val cost=model.computeCost(parsedData)println("--------------------------------------------------------------------" +"Within Set Sum of Squared Errors=-----------------------------------------"+cost)用模型对读入的数据进行分类,并输出//由于 partition 没设置,输出为 200 个小文件,可以使用 bin/hdfs dfs -getmerge 合并//下载到本地val result = data.map{case Row(com_name, sale_qty, store_qty) =>val features = Array[Double](sale_qty.toString.toDouble,store_qty.toString.toDouble)val linevectore = Vectors.dense(features)val prediction = model.predict(linevectore)com_name+" "+sale_qty+" "+store_qty+" "+prediction+"\n"}.saveAsTextFile(args(0))// val result2 = sqldata.map{
// case Row(com_name,store_qty,sale_qty)=>
// val features =Array[Double](store_qty.toString.toDouble,
// sale_qty.toString.toDouble)
// val linevectore = Vectors.dense(features)
// val prediction = model.predict(linevectore)
// com_name+" "+store_qty+" "+sale_qty+" "+prediction
// }.saveAsTextFile(args(0))System.out.println("-----------------------------")sc.stop()}}
提交
hadoop@moon:/usr/local/spark/idea/LearnSpark/out/artifacts/tobacco_kmeans$ spark-submit --master spark://moon:7077 --class class6.tobacco_kmeans --executor-memory 3g --total-executor-cores 4 tobacco_kmeans.jar /class6/tobacco_kmeans_new
-------------------------------------------------------------------Within Set Sum of Squared Errors=-----------------------------------------2.9486673754847862E10
获取结果
hdfs dfs -getmerge /class6/tobacco_kmeans_new /usr/local/spark/tobacco_kmeans_new.txt
Spark 实战,第 4 部分: 使用 Spark MLlib 做 K-means 聚类分析
Spark 实战,第 5 部分: 使用 ML Pipeline 构建机器学习工作流
贵州烟草大数据之一:零售户聚类相关推荐
- 贵州发布大数据发展十方面成果
5月26日,2017数博会主题发布会在数博会新闻发布厅举行,贵州省发展研究中心在会上发布<贵州省大数据发展报告(2016)>白皮书(以下简称"白皮书"),总结了贵州大数 ...
- 加快打造“云上贵州”,靠大数据实现“后发赶超”
"把大数据作为全省弯道取直.后发赶超的战略引擎,充分挖掘利用其商业价值.管理价值.社会价值,以大数据提升政府治理能力,引领产业转型升级,服务广大社会民生,带动大众创业.万众创新." ...
- 中国大数据金融中心崛起:贵州落子大数据
中国大数据金融中心崛起 如今,已然抢跑两年多的"大数据",成为贵州金融发展的基石,成为撬动大西南经济发展的杠杆. 对贵州而言,贵州落子大数据,大数据引导金融发展,大数据建立信用体系 ...
- 贵州移动大数据掘金有道
"大数据将是下一个社会发展阶段的石油和金矿",在大数据的发展中,贵州移动正迎着大势迈着大步朝前走.2015年12月15日,贵州移动来到国际金融中心上海隆重推介中国移动(贵州)大数据 ...
- “云上贵州”建立大数据灾备中心
12月1日,贵州省大数据发展管理局将云上贵州大数据灾备服务中心授牌给中国电信贵州公司. 今年8月,由中国电信承建的云上贵州系统平台贵安节点提前开通,为促进我省政府数据的聚集.融通.应用,打赢省委省政府 ...
- 贵州开采大数据“钻石矿”
14个政府开放数据平台,9000多个数据集,1600多个数据接口,涉及旅游住宿.交通服务.医疗健康.文体娱乐等多个领域.3月1日,2017中国国际大数据挖掘大赛启动.贵州开放部分政府数据,向全球发出项 ...
- 贵州:大数据“国家靶场”是如何炼成的?
"也许发达地区并不会有这种感觉.但是对于贵阳.对于贵州而言,这种感觉特别强烈.大数据时代的到来,让贵州与发达地区真正站到了同一起跑线上." 贵州先行试点大数据战略之后,就一直在探索 ...
- 大数据在零售供应链管理方面的应用
大数据在零售供应链管理方面的应用,零售商可通过多种方式使用大量信息来改善其零售供应链,分析解决方案将供应商的实际绩效与其关键绩效指标进行比较,帮助供应商在按时交货.提升客户满意度等. 随着客户转向在线 ...
- 贵州:大数据先行者地位进一步巩固
7月25日-26日,工信部在山东威海组织召开全国信息化和软件服务业工作座谈会.会上,贵州省经济和信息化委员会总经济师姚轶汇报了贵州省信息化和软件服务业发展情况.姚轶指出,近年来,贵州省把大数据作为经济 ...
最新文章
- my batis的理解
- 针对十类数据从业人员,最好的工具推荐
- Java Thread类的静态void sleep(long time_in_ms)方法,带示例
- vlan划分不能上网_VLAN工作原理
- mcrypt拓展_【PHP】Mcrypt 扩展模块安装及使用
- photoshop改变图片大小,不改变像素
- 解决 screen 连接不上,提示“There is no screen to be resumed matching 18352.” 的问题
- Google 投资 Lyft 背后、AlphaGo Zero 人工智能威胁论?
- NPDP产品经理认证:精益画布使用六步法
- 两化融合管理体系评定申请表概况
- 写出调试c语言程序的基本操作步骤,C语言程序设计基本步骤
- 谷歌网页存储为pdf或图片
- vs2013 CodeLens
- Android开发之高德地图导航
- java算法——通过身份证号获取出生的年月日
- 一本书一句话:真北敏捷的微信读书2021
- 十七、面相对象的进阶
- Git项目管理修改项目名称
- PM创造营——从粗放化管理转型到精细化管理,PMO如何将流程正规化落地企业?
- 论文学习笔记《SWA Object Detection》