Spark机器学习管道 - Estimator
Spark机器学习管道 - Estimator
- 一、实验目的
- 二、实验内容
- 三、实验原理
- 四、实验环境
- 五、实验步骤
- 5.1 启动Spark集群和Zeppelin服务器。
- 5.2 使用IDF estimator,计算每个单词的重要性。
- 5.3 使用StringIndexer estimator来对电影类型进行编码。
- 5.4 使用OneHotEncoderEstimator estimator将分类值的索引编码为二元向量。
- 5.5 使用MinMaxScaler estimator对数值数据进行规范化。
- 5.6 使用MinMaxScaler estimator对数值数据进行标准化。
- 结束语
未经许可,禁止以任何形式转载,若要引用,请标注链接地址
全文共计6161字,阅读大概需要3分钟
一、实验目的
掌握Spark机器学习管道中常用Estimator的使用。
二、实验内容
1、使用IDF estimator,计算每个单词的重要性。
2、使用StringIndexer estimator来对电影类型进行编码。
3、使用OneHotEncoderEstimator estimator将分类值的索引编码为二元向量。
4、使用MinMaxScaler estimator对数值数据进行规范化。
5、使用MinMaxScaler estimator对数值数据进行标准化。
三、实验原理
一个Estimator代表了一种机器学习算法,用来在训练数据集上训练或拟合机器学习模型。它实现了一个名为fit的方法,该方法接受一个DataFrame作为参数并返回一个机器学习模型。
Estimator所代表的算法可分为两类,一类是用于机器学习的算法,另一类是用于进行数据转换的算法。
从技术的角度来看,一个estimator有一个名为fit的函数,它在输入列上应用一个算法,结果被封装在一个叫做Model的对象类型中,它是一个Transformer类型。输入列和输出列名称可以在estimator的构造过程中指定。
下图描述了一个estimator及其输入和输出。
四、实验环境
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1
五、实验步骤
5.1 启动Spark集群和Zeppelin服务器。
在终端窗口下,输入以下命令,分别启动Spark集群和Zeppelin服务器:
1. $ cd /opt/spark
2. $ ./sbin/start-all.sh
3. $ zeppelin-daemon.sh start
然后使用jps命令查看启动的进程,确保Spark集群和Zeppelin服务器已经正确启动。
2、创建notebook。启动浏览器,访问”http://localhost:9090“, 打开zeppelin notebook首页,点击”Create new note”链接,创建一个新的笔记本。如下图所示:
5.2 使用IDF estimator,计算每个单词的重要性。
IDF estimator是用于处理文本的常用的estimators之一。它的名字是inverse document frequency(反转文档频率)的缩写。这个estimator经常在文本被分词和术语频率被计算之后立即使用。这个estimator背后的思想是通过计算它出现的文档数量来计算每个单词的重要性或权重。
在zeppelin中输入以下代码:
1. // 使用IDF estimator来计算每个单词的权重
2. import org.apache.spark.ml.feature.Tokenizer
3. import org.apache.spark.ml.feature.HashingTF
4. import org.apache.spark.ml.feature.IDF
5.
6. // 构造一个DataFrame,代表一个文档
7. val text_data = spark.createDataFrame(Seq(
8. (1, "Spark is a unified data analytics engine"),
9. (2, "Spark is cool and it is fun to work with Spark"),
10. (3, "There is a lot of exciting sessions at upcoming Spark summit"),
11. (4, "mllib transformer estimator evaluator and pipelines") )
12. ).toDF("id", "line")
13.
14. // 分析转换器
15. val tokenizer = new Tokenizer().setInputCol("line").setOutputCol("words")
16. val tkResult = tokenizer.transform(text_data)
17.
18. // HashingTF转换器
19. val tf = new HashingTF().setInputCol("words").setOutputCol("wordFreqVect").setNumFeatures(4096)
20. val tfResult = tf.transform(tkResult ) // Tokenizer transformer的输出列是HashingTF的输入
21.
22. // IDF estimator
23. // HashingTF转换器的输出是IDF estimator的输入
24. val idf = new IDF().setInputCol("wordFreqVect").setOutputCol("features")
25.
26. // 因为IDF是一个estimator,所以调用fit函数, 得到一个学习过的模型
27. val idfModel = idf.fit(tfResult)
28.
29. // 返回对象是一个模型(Model), 它是Transformer类型
30. val weightedWords = idfModel.transform(tfResult)
31. // weightedWords.select("label", "features").show(false)
32. weightedWords.select("features").show(false)
同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:
5.3 使用StringIndexer estimator来对电影类型进行编码。
StringIndexer estimator是一个知道什么时候处理包含分类值的文本数据的estimator。它将一个分类值编码成一个基于其频率的索引,这样最频繁的分类值就会得到0的索引值,以此类推。
在zeppelin中输入以下代码:
1. // 使用StringIndexer estimator来对电影类型进行编码
2. import org.apache.spark.ml.feature.StringIndexer
3.
4. // 构造一个DataFrame
5. val movie_data = spark.createDataFrame(
6. Seq((1, "Comedy"),
7. (2, "Action"),
8. (3, "Comedy"),
9. (4, "Horror"),
10. (5, "Action"),
11. (6, "Comedy"))
12. ).toDF("id", "genre")
13.
14. // StringIndexer estimator
15. val movieIndexer = new StringIndexer().setInputCol("genre").setOutputCol("genreIdx")
16.
17. // 首先拟合数据
18. val movieIndexModel = movieIndexer.fit(movie_data)
19.
20. // 使用返回的transformer来转换该数据
21. val indexedMovie = movieIndexModel.transform(movie_data)
22.
23. // 查看结果
24. indexedMovie.orderBy("genreIdx").show()
同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:
5.4 使用OneHotEncoderEstimator estimator将分类值的索引编码为二元向量。
OneHotEncoderEstimator estimator是另一种有用的分类值的estimator,它将分类值的索引编码为二元向量。这个estimator经常与StringIndexer estimator一起使用,其中StringIndexer的输出成为这个estimator的输入。
在zeppelin中输入以下代码:
1. // OneHotEncoderEstimator estimator消费StringIndexer estimator的输出
2. import org.apache.spark.ml.feature.OneHotEncoderEstimator
3.
4. // 输入列genreIdx是之前示例中StringIndex的输出列
5. val oneHotEncoderEst = new OneHotEncoderEstimator().setInputCols(Array("genreIdx"))
6. .setOutputCols(Array("genreIdxVector"))
7.
8. // 指使indexedMovie数据(在上一个示例中产生的)
9. val oneHotEncoderModel = oneHotEncoderEst.fit(indexedMovie)
10. val oneHotEncoderVect = oneHotEncoderModel.transform(indexedMovie)
11.
12. // 显示
13. oneHotEncoderVect .orderBy("genre").show()
同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:
5.5 使用MinMaxScaler estimator对数值数据进行规范化。
规范化数值数据是将其原始范围映射到从0到1的范围的过程。当observations有多个不同范围的属性时,这一点特别有用。
在zeppelin中输入以下代码:
1. // 使用MinMaxScaler estimator来重新调节特征
2. import org.apache.spark.ml.feature.MinMaxScaler
3. import org.apache.spark.ml.linalg.Vectors
4.
5. // 构造DataFrame
6. val employee_data = spark.createDataFrame(
7. Seq((1, Vectors.dense(125400, 5.3)),
8. (2, Vectors.dense(179100, 6.9)),
9. (3, Vectors.dense(154770, 5.2)),
10. (4, Vectors.dense(199650, 4.11)))
11. ).toDF("empId", "features")
12.
13. // MinMaxScaler estimator
14. val minMaxScaler = new MinMaxScaler().setMin(0.0)
15. .setMax(5.0)
16. .setInputCol("features")
17. .setOutputCol("scaledFeatures")
18.
19. // 拟合数据,建立模型
20. val scalerModel = minMaxScaler.fit(employee_data)
21.
22. // 使用学习到的模型对数据集进行转换
23. val scaledData = scalerModel.transform(employee_data)
24.
25. // 输出特征缩放到的范围
26. println(s"特征缩放到的范围: [${minMaxScaler.getMin},${minMaxScaler.getMax}]")
27.
28. // 显示结果
29. scaledData.select("features", "scaledFeatures").show(false)
同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:
5.6 使用MinMaxScaler estimator对数值数据进行标准化。
除了数值数据规范化之外,另一个经常用于处理数值数据的操作称为标准化(standardization.)。当数值数据有一个接近与钟形曲线关闭的分布,这个操作尤其适用。标准化操作可以帮助将数据转换为标准化形式,其中数据的范围为-1和1,平均值为0。
在zeppelin中输入以下代码:
1. // 使用StandardScaler estimator标准化围绕均值0的特征
2. import org.apache.spark.ml.feature.StandardScaler
3. import org.apache.spark.ml.linalg.Vectors
4.
5. // 构造DataFrame
6. val employee_data = spark.createDataFrame(Seq(
7. (1, Vectors.dense(125400, 5.3)),
8. (2, Vectors.dense(179100, 6.9)),
9. (3, Vectors.dense(154770, 5.2)),
10. (4, Vectors.dense(199650, 4.11)))
11. ).toDF("empId", "features")
12.
13. // 将单位标准偏差设置为true并围绕平均值
14. val standardScaler = new StandardScaler().setWithStd(true)
15. .setWithMean(true)
16. .setInputCol("features")
17. .setOutputCol("scaledFeatures")
18.
19. // 拟合数据,建立模型
20. val standardMode = standardScaler.fit(employee_data)
21.
22. // 使用学习到的模型对数据集进行转换
23. val standardData = standardMode.transform(employee_data)
24.
25. // 显示结果
26. standardData.show(false)
同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:
结束语
一个Estimator代表了一种机器学习算法,用来在训练数据集上训练或拟合机器学习模型。它实现了一个名为fit的方法,该方法接受一个DataFrame作为参数并返回一个机器学习模型。
Estimator所代表的算法可分为两类,一类是用于机器学习的算法,另一类是用于进行数据转换的算法。例如,称为LinearRegression的ML算法就属于第一种类型,它的fit方法返回一个LinearRegressionModel类的实例。它用于诸如预测房价等回归任务。而StringIndexer就属于第二种类型,它用来将一列的分类值编码成索引,这样每个分类值的索引值都是基于它出现在DataFrame的整个输入列中的频率。
从技术的角度来看,一个estimator有一个名为fit的函数,它在输入列上应用一个算法,结果被封装在一个叫做Model的对象类型中,它是一个Transformer类型。输入列和输出列名称可以在estimator的构造过程中指定。
Spark机器学习管道 - Estimator相关推荐
- Spark机器学习管道 - Pipeline
Spark机器学习管道 - Pipeline 一.实验目的 二.实验内容 三.实验原理 四.实验环境 五.实验步骤 5.1 启动Spark集群和Zeppelin服务器. 5.2 使用管道创建一个小型工 ...
- Spark机器学习管道--中文翻译
在这一节里,我们将介绍ML Pipelines的概念,ML Pipelines提供了一个构建于DataFrames之上的统一的 统一的高级API的集合,用来帮助用户创建与调优实际的机器学习管道. 内容 ...
- 手把手带你玩转Spark机器学习-使用Spark构建回归模型
系列文章目录 手把手带你玩转Spark机器学习-专栏介绍 手把手带你玩转Spark机器学习-问题汇总 手把手带你玩转Spark机器学习-Spark的安装及使用 手把手带你玩转Spark机器学习-使用S ...
- logistic回归预测_使用Apache Spark机器学习Logistic回归预测乳腺癌
logistic回归预测 在此博客文章中,我将帮助您开始使用Apache Spark的spark.ml Logistic回归来预测癌症恶性程度. Spark的spark.ml库目标是在DataFram ...
- Spark机器学习库(MLlib)指南
spark-1.6.1 机器学习库(MLlib)指南 MLlib是Spark的机器学习(ML)库.旨在简化机器学习的工程实践工作,并方便扩展到更大规模.MLlib由一些通用的学习算法和工具组成,包括分 ...
- python 机器学习管道_构建机器学习管道-第1部分
python 机器学习管道 Below are the usual steps involved in building the ML pipeline: 以下是构建ML管道所涉及的通常步骤: Imp ...
- 基于大数据的Uber数据实时监控(Part 1:Spark机器学习)
导言 据Gartner称:到2020年,25亿辆联网汽车将成为物联网的主要对象.联网车辆预计每小时可以生成25GB的数据,对这些数据进行分析实现实时监控.大数据目前是10个主要领域之一,利用它可以使城 ...
- 用Spark机器学习数据流水线进行广告检测
在这篇文章中,我们Spark的其它机器学习API,名为Spark ML,如果要用数据流水线来开发大数据应用程序的话,这个是推荐的解决方案.关键点: 了解机器学习数据流水线有关内容. 怎么用Apache ...
- Spark机器学习9· 实时机器学习(scala with sbt)
Spark机器学习 1 在线学习 模型随着接收的新消息,不断更新自己:而不是像离线训练一次次重新训练. 2 Spark Streaming 离散化流(DStream) 输入源:Akka actors. ...
最新文章
- Struts2中采用Json返回List对象数据为空解决方案
- Px4源码框架结构图
- 音视频技术开发周刊 76期
- jsoncpp去掉多余字符_如何处理JSON中的特殊字符
- C++和C#相互调用COM组件的方法简介
- 雷林鹏分享:XML 编码
- PTA-基础编程题目集-函数题 ……
- TFTP 服务器的配置
- 计算机综述(computer overview)
- CR渲染器全景图如何渲染颜色通道_【3D】日不落投影灯 VR/CR投影效果制作
- 高等数学笔记-苏德矿-第十章-曲线积分和曲面积分-第七节-高斯公式与斯托克斯公式
- Zebras CodeForces - 950C(思维)
- Groundhog Looking Dowdy
- ADS(Advanced Design system)良率分析(Yield)、良率优化(YieldOptim)
- 长度短点的uuid_如何压缩UUID长度?
- 应该如何进行程序化交易系统的检验?
- 左耳听风——笔记三:面试技巧
- ACM-ICPC 2018 南京赛区网络预赛 - AC Challenge(状压DP)
- 堆和栈的概念和区别 python_C++堆用法详解
- 黑马训练营-毕业设计项目(超全)