Spark版本:2.4.0
语言:Scala
任务:分类

这里对数据的处理步骤如下:

  1. 载入数据
  2. 归一化
  3. PCA降维
  4. 划分训练/测试集
  5. 线性SVM分类
  6. 验证精度
  7. 输出cvs格式的结果

前言

从Spark 2.0开始,Spark机器学习API是基于DataFrame的spark.ml。而之前的基于RDD的API spark.mllib已进入维护模式。
也就是说,Spark ML是Spark MLlib的一种新的API,它主要有以下几个优点:

  • 面向DataFrame,在RDD基础上进一步封装,提供更强大更方便的API
  • Pipeline功能,便于实现复杂的机器学习模型
  • 性能提升

基于Pipeline的Spark ML中的几个概念:

  • DataFrame:从Spark SQL 的引用的概念,表示一个数据集,它可以容纳多种数据类型。例如可以存储文本,特征向量,标签和预测值等
  • Transformer:是可以将一个DataFrame变换成另一个DataFrame的算法。例如,一个训练好的模型是一个Transformer,通过transform方法,将原始DataFrame转化为一个包含预测值的DataFrame
  • Estimator:是一个算法,接受一个DataFrame,产生一个Transformer。例如,一个学习算法(如PCA,SVM)是一个Estimator,通过fit方法,训练DataFrame并产生模型Transformer
  • Pipeline: Pipeline将多个Transformers和Estimators连接起来组合成一个机器学习工作流程
  • Parameter:用于对Transformers和Estimators指定参数的统一接口

本次实验使用的是Spark ML的API

首先要创建SparkSession

// 创建SparkSession
val spark = SparkSession.builder.appName("LinearSVCExample").master("local").getOrCreate()

数据处理步骤

1 载入数据

数据载入的方式有多种,这里使用libsvm格式的数据作为数据源,libsvm格式常被用来存储稀疏的矩阵数据,它每一行的格式如下:

label index1:value1 index2:value2 ...

第一个值是标签,后面是由“列号:值”组成键值对,只需要记录非0项即可。

数据加载使用load方法完成:

// 加载训练数据,生成DataFrame
val data = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")

2 归一化

作为数据预处理的第一步,需要对原始数据做归一化处理,即把原始数据的每一维减去其平均值,再除以其标准差,使得数据总体分布为以0为中心,且标准差为1。

// 归一化
val scaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures").setWithMean(true).setWithStd(true).fit(data)val scaleddata = scaler.transform(data).select("label", "scaledFeatures").toDF("label","features")

3 PCA降维

有时数据的维数可能很大,直接进行分类不仅计算量很大,而且对数据量的要求也很高,常常会出现过拟合。因此需要进行降维,常用的是主成分分析(PCA)算法。

// 创建PCA模型,生成Transformer
val pca = new PCA().setInputCol("features").setOutputCol("pcaFeatures").setK(5).fit(scaleddata)//  transform数据,生成主成分特征
val pcaResult = pca.transform(scaleddata).select("label","pcaFeatures").toDF("label","features")

4 划分训练/测试集

经过降维的数据就可以拿来训练分类器了,但是在此之前要将数据划分为训练集和测试集,分类器只能在训练集上进行训练,在测试集上验证其分类精度。Spark提供了很方便的接口,按给定的比例随机划分训练/测试集。

// 将经过主成分分析的数据,按比例划分为训练数据和测试数据
val Array(trainingData, testData) = pcaResult.randomSplit(Array(0.7, 0.3), seed = 20)

5 线性SVM分类

这一步构建线性SVM模型,设置最大迭代次数和正则化项的系数,使用训练集进行训练。

// 创建SVC分类器(Estimator)
val lsvc = new LinearSVC().setMaxIter(10).setRegParam(0.1)// 训练分类器,生成模型(Transformer)
val lsvcModel = lsvc.fit(trainingData)

6 验证精度

将训练好的分类器作用于测试集上,获得分类结果。

分类结果的好坏有很多种衡量的方法,如查准率、查全率等,这里我们使用最简单的一种衡量标准——精度,即正确分类的样本数占总样本数的比值。

// 用训练好的模型,验证测试数据
val res = lsvcModel.transform(testData).select("prediction","label")// 计算精度
val evaluator = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("accuracy")
val accuracy = evaluator.evaluate(res)println(s"Accuracy = ${accuracy}")

7 输出cvs格式的结果

Spark的DataFrame类型支持导出多种格式,这里以常用的csv格式为例。

这里输出的目的是为了使用Python进行可视化,在降维后进行,可以直观的看出降维后的数据是否明显可分。

使用VectorAssembler,将标签与特征合并为一列,再进行输出。

(这里是将合并后的列转换为String再输出的,因此输出的csv文件是带有引号和括号的,至于为什么要这样输出,请看第二部分)

// 将标签与主成分合成为一列
val assembler = new VectorAssembler().setInputCols(Array("label","features")).setOutputCol("assemble")
val output = assembler.transform(pcaResult)// 输出csv格式的标签和主成分,便于可视化
val ass = output.select(output("assemble").cast("string"))
ass.write.mode("overwrite").csv("output.csv")

当然也可以用同样的方法输出训练/预测的结果,这里就不再详细介绍。

遇到的问题

完成这个简单的分类实验,花了我两天多的时间,从配置环境到熟悉API,再到遇见各种奇怪的问题……这里我都把他们记录下来,供以后参考。

1 配置环境

起初,我想通过在本机编写代码,然后访问安装在虚拟机中的Spark节点(单节点)这种方式进行实验的(不是提交jar包然后执行spark-submit),也就在是创建SparkSession时,指定虚拟机中的Spark:

val spark = SparkSession.builder.appName("LinearSVCExample").master("spark://192.168.1.128:7077") // 虚拟机IP.getOrCreate()

然而,这样并没有成功。遇到的问题有:

  • 拒绝连接
  • Spark的worker里可以查看到提交的任务,但是一直处于等待状态,没有响应。并且提示:
    WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources(实际上,内存和CPU是够的)
  • 报错RuntimeException: java.io.EOFException......

在尝试过各种方案都没有解决问题之后,我放弃了,最后还是在本机中安装Spark,在local模式下运行。(如果有同学成功实现上面的访问方法,欢迎留言告诉我~

至于如何在本机(Windows)安装Spark,百度搜索即可

2 导出CSV格式的数据

将DataFrame导出为cvs格式的时候,遇到了这个问题:
java.lang.UnsupportedOperationException: CSV data source does not support struct<type:tinyint,size:int,indices:array<int>,values:array<double>> data type.

而我要导出的DataFrame只是一个多行数组而已啊:

根据StackOverflow上面的提问,Spark的csv导出不支持复杂结构,array都不行。

然后有人给了一种办法,把数组转化为String,就可以导出了。

但是导出的结果是这样的:

需要进一步处理。

所以还不如手动实现导出csv文件,或者你有更好的办法,欢迎留言告诉我,非常感谢~

3 PCA维数限制

当我想跑一个10万维度的数据时,程序运行到PCA报错:
java.lang.IllegalArgumentException: Argument with more than 65535 cols: 109600

原来,Spark ML的PCA不支持超过65535维的数据。参见源码

4 SVM核

翻阅了Spark ML文档,只找到Linear Support Vector Machine,即线性核的支持向量机。对于高斯核和其他非线性的核,Spark ML貌似还没有实现。

5 withColumn操作

起初我认为对数据进行降维前,需要把DataFrame中的标签label与特征feature分开,然后对feature进行降维,再使用withColumn方法,把label与降维后的feature组合成新的DataFrame。

发现这样既不可行也没有必要。

首先,withColumn只能添加当前DataFrame的数据(对DataFrame某一列进行一些操作,再添加到这个DataFrame本身),不能把来自于不同DataFrame的Column添加到当前DataFrame中。

其次,PCA降维时,只需指定InputCoulum作为特征列,指定OutputColumn作为输出列,其他列的存在并不影响PCA的执行,PCA也不会改变它们,在新生成的DataFrame中依然会保留原来所有Column,并且添加上降维后的数据Column,后面再使用select方法选择出所需的Column即可。

完整代码(Pipeline版)

import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature._
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, MulticlassClassificationEvaluator}
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.classification.LinearSVC
import org.apache.spark.sql.SparkSessionobject Hello {def main(args: Array[String]) {System.setProperty("hadoop.home.dir", "D:\\hadoop-2.8.3")//  屏蔽日志Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)// 创建sparkSessionval spark = SparkSession.builder.appName("LinearSVCExample").master("local").getOrCreate()// 加载训练数据,生成DataFrameval data = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")println(data.count())// 归一化val scaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures").setWithMean(true).setWithStd(true).fit(data)val scaleddata = scaler.transform(data).select("label", "scaledFeatures").toDF("label","features")// 创建PCA模型,生成Transformerval pca = new PCA().setInputCol("features").setOutputCol("pcaFeatures").setK(5).fit(scaleddata)//  transform 数据,生成主成分特征val pcaResult = pca.transform(scaleddata).select("label","pcaFeatures").toDF("label","features")//  pcaResult.show(truncate=false)// 将标签与主成分合成为一列val assembler = new VectorAssembler().setInputCols(Array("label","features")).setOutputCol("assemble")val output = assembler.transform(pcaResult)// 输出csv格式的标签和主成分,便于可视化val ass = output.select(output("assemble").cast("string"))ass.write.mode("overwrite").csv("output.csv")// 将经过主成分分析的数据,按比例划分为训练数据和测试数据val Array(trainingData, testData) = pcaResult.randomSplit(Array(0.7, 0.3), seed = 20)// 创建SVC分类器(Estimator)val lsvc = new LinearSVC().setMaxIter(10).setRegParam(0.1)// 创建pipeline, 将上述步骤连接起来val pipeline = new Pipeline().setStages(Array(scaler, pca, lsvc))// 使用串联好的模型在训练集上训练val model = pipeline.fit(trainingData)// 在测试集上测试val predictions = model.transform(testData).select("prediction","label")// 计算精度val evaluator = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("accuracy")val accuracy = evaluator.evaluate(predictions)println(s"Accuracy = ${accuracy}")spark.stop()}
}

最后的精度为1.0,这里使用的测试数据比较好分,从PCA后对前两维的可视化结果可以看出:

参考资料

Spark ML文档
DataFrame API
PCA列数限制-源码
导出cvs文件方法-stackoverflow
无法导出csv文件-stackoverflow
示例数据

使用Spark ML进行数据分析相关推荐

  1. dataframe 筛选_Spark.DataFrame与Spark.ML简介

    本文是PySpark销量预测系列第一篇,后面会陆续通过实战案例详细介绍PySpark销量预测流程,包含特征工程.特征筛选.超参搜索.预测算法. 在零售销量预测领域,销售小票数据动辄上千万条,这个量级在 ...

  2. mongodb数据导入hbase,spark读取hbase数据分析

    为什么80%的码农都做不了架构师?>>>    使用mavn管理相关依赖包pom.xml <project xmlns="http://maven.apache.or ...

  3. Spark ML的特征处理实战

    一 .特征处理的意义 通常情况下,我们得到的数据中包含脏数据或者噪声.在模型训练前,需要对这些数据进行预处理,否则再好的模型也只能"garbage in,garbage out". ...

  4. spark 逻辑回归算法案例_黄美灵的Spark ML机器学习实战

    原标题:黄美灵的Spark ML机器学习实战 本课程主要讲解基于Spark 2.x的ML,ML是相比MLlib更高级的机器学习库,相比MLlib更加高效.快捷:ML实现了常用的机器学习,如:聚类.分类 ...

  5. spark ml中一个比较通用的transformer

    spark ml中有许多好用的transformer,很方便用来做特征的处理,比如Tokenizer, StopWordsRemover等,具体可参看文档:http://spark.apache.or ...

  6. 基于Spark ML 聚类分析实战的KMeans

    2019独角兽企业重金招聘Python工程师标准>>> 聚类分析是一个无监督学习 (Unsupervised Learning) 过程, 一般是用来对数据对象按照其特征属性进行分组, ...

  7. 《Spark快速大数据分析》—— 第三章 RDD编程

    本文转自博客园xingoo的博客,原文链接:<Spark快速大数据分析>-- 第三章 RDD编程,如需转载请自行联系原博主.

  8. 使用spark ml pipeline进行机器学习

    一.关于spark ml pipeline与机器学习 一个典型的机器学习构建包含若干个过程 1.源数据ETL 2.数据预处理 3.特征选取 4.模型训练与验证 以上四个步骤可以抽象为一个包括多个步骤的 ...

  9. spark任务shell运行_《Spark快速大数据分析》- 根据简单例子理解RDD

    1. RDD简介 RDD,弹性分布式数据集(Resiliennt Distributed Datasets),是Spark中最重要的核心概念,是Spark应用中存储数据的数据结构. RDD 其实就是分 ...

  10. Spark ML - 聚类算法

    http://ihoge.cn/2018/ML2.html Spark ML - 聚类算法 1.KMeans快速聚类 首先到UR需要的包: import org.apache.spark.ml.clu ...

最新文章

  1. R语言构建ElasticNet回归模型实战:基于mtcars数据集
  2. LVS负载均衡群集的了解与基本配置(一)
  3. Redis 为什么是单线程的?
  4. 与WebXR共同创建者Diego Marcos一起探讨沉浸式Web的未来(上)
  5. mybatis 动态传入表名 注解_Mybatis动态sql的动态表名问题
  6. Spring中Bean的作用域都有哪些?
  7. 译文 | 与TensorFlow的第一次接触 第六章:并发
  8. Python精通-Python字典操作
  9. python字符串驼峰转换_驼峰风格字符串转换为下滑线风格字符串
  10. Java开发 - 异常 - 使用throws
  11. antd table动态表头_antd table动态控制指定列的显隐
  12. Web前端性能优化的9大问题
  13. QFIL的烧录、读写
  14. 【新手教程】51Sim-One Cloud 2.0如何接入被测算法
  15. 网页前端 网页换肤(js)
  16. 信号的扩展是因果_信号与系统 怎么判断e(1-t)的时不变和因果性?
  17. 判断当前是在ie还是谷歌
  18. 如何查看raid控制器的信息HP DELL
  19. HTML和CSS内容总结
  20. MFC中添加GIF图片

热门文章

  1. 计算机网络中ipv6什么意思,路由器ipv6是什么意思(图文)
  2. itest考试切屏能检测出来吗_itest考试作弊怎么检测
  3. itest听力答案2020_大学英语itest2018答案
  4. 教您用公式编辑器打恒不等于符号
  5. 【金融系列】使用Python分析债券,画零息利率曲线,对债券进行精确定价,计算债券的麦考利久期、修正久期和凸度,并进行价格敏感性分析
  6. 如何优雅的给你的APK文件打上签名
  7. 【JavaWeb】消息摘要、数字签名与数字证书的区别
  8. 微信小程序 免密代扣
  9. 磁珠 符号_超实用理解磁珠
  10. 洛谷 P1878 舞蹈课(优先队列 + 双链表)