4 Introducing MLlib
MLlib 即Machine Learning Library。
4.1 载入数据并转换数据
数据集下载:births_train.csv.gz.。
创建数据集的schema:

import pyspark.sql.types as typ

labels = [
(‘INFANT_ALIVE_AT_REPORT’, typ.StringType()),
(‘BIRTH_YEAR’, typ.IntegerType()),
(‘BIRTH_MONTH’, typ.IntegerType()),
(‘BIRTH_PLACE’, typ.StringType()),
(‘MOTHER_AGE_YEARS’, typ.IntegerType()),
(‘MOTHER_RACE_6CODE’, typ.StringType()),
(‘MOTHER_EDUCATION’, typ.StringType()),
(‘FATHER_COMBINED_AGE’, typ.IntegerType()),
(‘FATHER_EDUCATION’, typ.StringType()),
(‘MONTH_PRECARE_RECODE’, typ.StringType()),
(‘CIG_BEFORE’, typ.IntegerType()),
(‘CIG_1_TRI’, typ.IntegerType()),
(‘CIG_2_TRI’, typ.IntegerType()),
(‘CIG_3_TRI’, typ.IntegerType()),
(‘MOTHER_HEIGHT_IN’, typ.IntegerType()),
(‘MOTHER_BMI_RECODE’, typ.IntegerType()),
(‘MOTHER_PRE_WEIGHT’, typ.IntegerType()),
(‘MOTHER_DELIVERY_WEIGHT’, typ.IntegerType()),
(‘MOTHER_WEIGHT_GAIN’, typ.IntegerType()),
(‘DIABETES_PRE’, typ.StringType()),
(‘DIABETES_GEST’, typ.StringType()),
(‘HYP_TENS_PRE’, typ.StringType()),
(‘HYP_TENS_GEST’, typ.StringType()),
(‘PREV_BIRTH_PRETERM’, typ.StringType()),
(‘NO_RISK’, typ.StringType()),
(‘NO_INFECTIONS_REPORTED’, typ.StringType()),
(‘LABOR_IND’, typ.StringType()),
(‘LABOR_AUGM’, typ.StringType()),
(‘STEROIDS’, typ.StringType()),
(‘ANTIBIOTICS’, typ.StringType()),
(‘ANESTHESIA’, typ.StringType()),
(‘DELIV_METHOD_RECODE_COMB’, typ.StringType()),
(‘ATTENDANT_BIRTH’, typ.StringType()),
(‘APGAR_5’, typ.IntegerType()),
(‘APGAR_5_RECODE’, typ.StringType()),
(‘APGAR_10’, typ.IntegerType()),
(‘APGAR_10_RECODE’, typ.StringType()),
(‘INFANT_SEX’, typ.StringType()),
(‘OBSTETRIC_GESTATION_WEEKS’, typ.IntegerType()),
(‘INFANT_WEIGHT_GRAMS’, typ.IntegerType()),
(‘INFANT_ASSIST_VENTI’, typ.StringType()),
(‘INFANT_ASSIST_VENTI_6HRS’, typ.StringType()),
(‘INFANT_NICU_ADMISSION’, typ.StringType()),
(‘INFANT_SURFACANT’, typ.StringType()),
(‘INFANT_ANTIBIOTICS’, typ.StringType()),
(‘INFANT_SEIZURES’, typ.StringType()),
(‘INFANT_NO_ABNORMALITIES’, typ.StringType()),
(‘INFANT_ANCEPHALY’, typ.StringType()),
(‘INFANT_MENINGOMYELOCELE’, typ.StringType()),
(‘INFANT_LIMB_REDUCTION’, typ.StringType()),
(‘INFANT_DOWN_SYNDROME’, typ.StringType()),
(‘INFANT_SUSPECTED_CHROMOSOMAL_DISORDER’, typ.StringType()),
(‘INFANT_NO_CONGENITAL_ANOMALIES_CHECKED’, typ.StringType()),
(‘INFANT_BREASTFED’, typ.StringType())
]

schema = typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels
])

.read.csv(…) 方法可以读取未压缩的或者压缩的逗号分隔值:

births = spark.read.csv(‘births_train.csv.gz’,
header=True,
schema=schema)

header 参数指定为True 表示第一行包含头,我们用schema 明确数据的正确类型。

我们的数据集中有很多特征是字符串。这些大多是分类变量,我们需要以某种方式转换为数值形式。

字典映射:

recode_dictionary = {
‘YNU’: {
‘Y’: 1,
‘N’: 0,
‘U’: 0
}
}

我们的目标是预测 ‘INFANT_ALIVE_AT_REPORT’ 是 1 or 0.。因此,我们要去除其他与婴儿相关的特征:

selected_features = [
‘INFANT_ALIVE_AT_REPORT’,
‘BIRTH_PLACE’,
‘MOTHER_AGE_YEARS’,
‘FATHER_COMBINED_AGE’,
‘CIG_BEFORE’,
‘CIG_1_TRI’,
‘CIG_2_TRI’,
‘CIG_3_TRI’,
‘MOTHER_HEIGHT_IN’,
‘MOTHER_PRE_WEIGHT’,
‘MOTHER_DELIVERY_WEIGHT’,
‘MOTHER_WEIGHT_GAIN’,
‘DIABETES_PRE’,
‘DIABETES_GEST’,
‘HYP_TENS_PRE’,
‘HYP_TENS_GEST’,
‘PREV_BIRTH_PRETERM’
]

births_trimmed = births.select(selected_features)

特征字典映射:

# 0意味着母亲在怀孕前或怀孕期间不抽烟;1-97表示抽烟的实际人数,98表示98或更多;而99表示未知,我们将假设未知是0并相应地重新编码。
import pyspark.sql.functions as func

def recode(col, key):
return recode_dictionary[key][col]

def correct_cig(feat):
return func
.when(func.col(feat) != 99, func.col(feat))
.otherwise(0)

rec_integer = func.udf(recode, typ.IntegerType())

recode()方法从recode_dictionary中返回key对应的值,correct_cig() 方法
correct_cig方法检查特征feat的值何时不等于99,若不等于99,则返回特征的值;如果这个值等于99,则返回0。

我们不能直接在DataFrame上使用recode函数;它需要转换为Spark理解的UDF。User Define Function, 用户自定义函数,简称UDF,用户可以在Spark SQL 里自定义实际需要的UDF来处理数据。 rec_integer函数:通过传入我们指定的recode函数并指定返回值数据类型,我们可以使用rec_integer做字典映射。

首先纠正与吸烟数量有关的特征:

births_transformed = births_trimmed
.withColumn(‘CIG_BEFORE’, correct_cig(‘CIG_BEFORE’))
.withColumn(‘CIG_1_TRI’, correct_cig(‘CIG_1_TRI’))
.withColumn(‘CIG_2_TRI’, correct_cig(‘CIG_2_TRI’))
.withColumn(‘CIG_3_TRI’, correct_cig(‘CIG_3_TRI’))

.withColumn(…) 方法第一个参数是新列名,第二个参数是指定原数据的某列。

找出哪些特征是Yes/No/Unknown :

cols = [(col.name, col.dataType) for col in births_trimmed.schema]

YNU_cols = []

for i, s in enumerate(cols):
if s[1] == typ.StringType():
dis = births.select(s[0])
.distinct()
.rdd
.map(lambda row: row[0])
.collect()

    if 'Y' in dis:YNU_cols.append(s[0])

DataFrames 可以在选择特征的同时批量转换特征:

births.select([
‘INFANT_NICU_ADMISSION’,
rec_integer(
‘INFANT_NICU_ADMISSION’, func.lit(‘YNU’)
)
.alias(‘INFANT_NICU_ADMISSION_RECODE’)]
).take(5)

用一个列表转换所有的 YNU_cols :

exprs_YNU = [
rec_integer(x, func.lit(‘YNU’)).alias(x)
if x in YNU_cols
else x
for x in births_transformed.columns
]
births_transformed = births_transformed.select(exprs_YNU)

4.2 熟悉数据
4.2.1 描述性统计
用.colStats(…)方法进行统计。需要注意的一点是,该方法是基于样本的描述性统计,如果你的数据集少于100个观测值,你可能会得到一些奇怪的结果。

import pyspark.mllib.stat as st
import numpy as np

numeric_cols = [‘MOTHER_AGE_YEARS’,‘FATHER_COMBINED_AGE’,
‘CIG_BEFORE’,‘CIG_1_TRI’,‘CIG_2_TRI’,‘CIG_3_TRI’,
‘MOTHER_HEIGHT_IN’,‘MOTHER_PRE_WEIGHT’,
‘MOTHER_DELIVERY_WEIGHT’,‘MOTHER_WEIGHT_GAIN’
]

numeric_rdd = births_transformed
.select(numeric_cols)
.rdd
.map(lambda row: [e for e in row])

mllib_stats = st.Statistics.colStats(numeric_rdd)

for col, m, v in zip(numeric_cols,
mllib_stats.mean(),
mllib_stats.variance()):
print(’{0}: \t{1:.2f} \t {2:.2f}’.format(col, m, np.sqrt(v)))

.colStats(…)的输入:RDD数据
该方法的函数:count()、max()、min()、mean()、 normL1()、normL2() 、numNonzeros() 、 variance()等。

计算分类变量的频率:

categorical_cols = [e for e in births_transformed.columns
if e not in numeric_cols]

categorical_rdd = births_transformed
.select(categorical_cols)
.rdd
.map(lambda row: [e for e in row])

for i, col in enumerate(categorical_cols):
agg = categorical_rdd
.groupBy(lambda row: row[i])
.map(lambda row: (row[0], len(row[1])))

print(col, sorted(agg.collect(), key=lambda el: el[1], reverse=True))

4.2.2 相关性
计算特征间的相关性:

corrs = st.Statistics.corr(numeric_rdd)

for i, el in enumerate(corrs > 0.5):
correlated = [
(numeric_cols[j], corrs[i][j])
for j, e in enumerate(el)
if e == 1.0 and j != i]

if len(correlated) > 0:for e in correlated:print('{0}-to-{1}: {2:.2f}' \.format(numeric_cols[i], e[0], e[1]))

丢弃相关性很高的特征,保留部分特征:

features_to_keep = [
‘INFANT_ALIVE_AT_REPORT’,
‘BIRTH_PLACE’,
‘MOTHER_AGE_YEARS’,
‘FATHER_COMBINED_AGE’,
‘CIG_1_TRI’,
‘MOTHER_HEIGHT_IN’,
‘MOTHER_PRE_WEIGHT’,
‘DIABETES_PRE’,
‘DIABETES_GEST’,
‘HYP_TENS_PRE’,
‘HYP_TENS_GEST’,
‘PREV_BIRTH_PRETERM’
]
births_transformed = births_transformed.select([e for e in features_to_keep])

4.2.3 统计检验
我们不能计算分类特征的相关性。不过,我们可以进行卡方检验,以确定是否存在显著差异。

import pyspark.mllib.linalg as ln

for cat in categorical_cols[1:]:
agg = births_transformed
.groupby(‘INFANT_ALIVE_AT_REPORT’)
.pivot(cat)
.count()

agg_rdd = agg \.rdd\.map(lambda row: (row[1:])) \.flatMap(lambda row: [0 if e == None else e for e in row]) \.collect()row_length = len(agg.collect()[0]) - 1
agg = ln.Matrices.dense(row_length, 2, agg_rdd)test = st.Statistics.chiSqTest(agg)
print(cat, round(test.pValue, 4))

首先遍历所有的分类变量,并通过’INFANT_ALIVE_AT_REPORT“特征获取计数。接下来,将它们转换成一个RDD,然后再使用pyspark.mllib.linalg模块将它们转换成矩阵。
.Matrices.dense(…)方法的第一个参数指定矩阵中的行数,在我们的例子中,它是分类特征的不同值的长度。第二个参数指定列数:“INFANT_ALIVE_AT_REPORT”目标变量只有两个值。最后一个参数是要转换成矩阵的值列表。
一个小例子:

print(ln.Matrices.dense(3,2, [1,2,3,4,5,6]))
DenseMatrix([[ 1., 4.],
[ 2., 5.],
[ 3., 6.]])

测试表明,所有的特征应该是显著不同的,应该可以帮助我们预测婴儿的生存机会。

4.3 创建最终的数据集
现在创建我们将用来构建模型的最终数据集,将DataFrame转换成LabeledPoints的RDD。LabeledPoint是用来训练机器学习的一个MLlib结构。它由两个属性组成:标签和特征。标签是我们的目标变量,特征可以是NumPy数组,列表,pyspark.mllib.linalg.SparseVector,pyspark.mllib.linalg.DenseVector或scipy.sparse列矩阵。
4.3.1 创建一个LabeledPoints的RDD
在我们建立最终的数据集之前,我们首先需要处理一个最后的问题:我们的“BIRTH_PLACE”特征仍然是一个字符串。而任何其他的分类变量可以按原样使用(因为它们现在是虚拟变量),我们将使用哈希编码“BIRTH_PLACE”特征:

import pyspark.mllib.feature as ft
import pyspark.mllib.regression as reg

hashing = ft.HashingTF(7)

births_hashed = births_transformed
.rdd
.map(lambda row: [
list(hashing.transform(row[1]).toArray())
if col == ‘BIRTH_PLACE’
else row[i]
for i, col
in enumerate(features_to_keep)])
.map(lambda row: [[e] if type(e) == int else e
for e in row])
.map(lambda row: [item for sublist in row
for item in sublist])
.map(lambda row: reg.LabeledPoint(
row[0],
ln.Vectors.dense(row[1:]))
)

首先,我们创建哈希模型。我们的特征有7个级别,所以散列值选择7。接下来,如果你的数据集有很多列,但在一行中只有少部分非零值,SparseVector数据结构是首选的。因此使用模型将我们的“BIRTH_PLACE”特征转换为SparseVector,然后,将所有特征组合在一起创建一个LabeledPoint。
4.3.2 切割训练集与测试集

RDD有一个简单的方法,.randomSplit(…)。该方法采用一个比例列表随机分割数据集:

births_train, births_test = births_hashed.randomSplit([0.6, 0.4])

4.4 预测
在这里,我们将构造两种模型:线性分类器 - 逻辑回归和非线性分类器 - 随机森林。对于前者,我们将使用所有的特征,而对于后者,我们将使用ChiSqSelector(…)方法选择前四个特征。
4.4.1 MLlib之逻辑回归
Spark 2.0中使用LogisticRegressionWithLBFGS模型,该模型使用拟牛顿(BFGS) 优化算法。
训练模型:

from pyspark.mllib.classification
import LogisticRegressionWithLBFGS

LR_Model = LogisticRegressionWithLBFGS
.train(births_train, iterations=10)

要求的参数 是LabeledPoints的RDD。
预测分类:

LR_results = (
births_test.map(lambda row: row.label)
.zip(LR_Model
.predict(births_test
.map(lambda row: row.features)))
).map(lambda row: (row[0], row[1] * 1.0))

评估模型:

import pyspark.mllib.evaluation as ev
LR_evaluation = ev.BinaryClassificationMetrics(LR_results)

print(‘Area under PR: {0:.2f}’
.format(LR_evaluation.areaUnderPR))
print(‘Area under ROC: {0:.2f}’
.format(LR_evaluation.areaUnderROC))
LR_evaluation.unpersist()

模型表现相当好,Precision-Recall曲线下的85%面积表示很适合。(ROC)下的区域可以被理解为与随机选择的负实例相比,模型等级的概率高于随机选择的正实例。 63%的值可以被认为是可以接受的。

MLlib允许我们用卡方选择器(Chi-Square)选择最好的特征,ChiSqSelector(…)方法只能用于数值特征;分类变量需要在选择之前进行散列或虚拟编码才可以使用。

selector = ft.ChiSqSelector(4).fit(births_train)

topFeatures_train = (
births_train.map(lambda row: row.label)
.zip(selector
.transform(births_train
.map(lambda row: row.features)))
).map(lambda row: reg.LabeledPoint(row[0], row[1]))

topFeatures_test = (
births_test.map(lambda row: row.label)
.zip(selector
.transform(births_test
.map(lambda row: row.features)))
).map(lambda row: reg.LabeledPoint(row[0], row[1]))

4.4.2 MLlib之随机森林
训练模型:

from pyspark.mllib.tree import RandomForest

RF_model = RandomForest
.trainClassifier(data=topFeatures_train,
numClasses=2,
categoricalFeaturesInfo={},
numTrees=6,
featureSubsetStrategy=‘all’,
seed=666)

.trainClassifier(…)方法的第一个参数指定了训练数据集。numClasses表示我们的目标变量有多少类。至于第三个参数,你可以传递一个字典,其中的key是RDD中分类特征的索引,key的值表示分类特征具有的级别数。 numTrees指定森林中树的数量。下一个参数告诉模型使用我们数据集中的所有特征,而不是只保留最具描述性的特征,而最后一个参数指定模型随机部分的种子。
评估模型:

RF_results = (
topFeatures_test.map(lambda row: row.label)
.zip(RF_model
.predict(topFeatures_test
.map(lambda row: row.features)))
)

RF_evaluation = ev.BinaryClassificationMetrics(RF_results)

print(‘Area under PR: {0:.2f}’
.format(RF_evaluation.areaUnderPR))
print(‘Area under ROC: {0:.2f}’
.format(RF_evaluation.areaUnderROC))
RF_evaluation.unpersist()

可以看见,随机森林用更少的特征要优于逻辑回归模型。让我们看看逻辑回归减少特征后性能如何:

LR_Model_2 = LogisticRegressionWithLBFGS
.train(topFeatures_train, iterations=10)

LR_results_2 = (
topFeatures_test.map(lambda row: row.label)
.zip(LR_Model_2
.predict(topFeatures_test
.map(lambda row: row.features)))
).map(lambda row: (row[0], row[1] * 1.0))

LR_evaluation_2 = ev.BinaryClassificationMetrics(LR_results_2)

print(‘Area under PR: {0:.2f}’
.format(LR_evaluation_2.areaUnderPR))
print(‘Area under ROC: {0:.2f}’
.format(LR_evaluation_2.areaUnderROC))
LR_evaluation_2.unpersist()

PySpark机器学习 MLlib相关推荐

  1. Spark机器学习MLlib系列1(for python)--数据类型,向量,分布式矩阵,API

    Spark机器学习MLlib系列1(for python)--数据类型,向量,分布式矩阵,API 关键词:Local vector,Labeled point,Local matrix,Distrib ...

  2. 手把手教你实现PySpark机器学习项目——回归算法

    作者 | hecongqing 来源 | AI算法之心(ID:AIHeartForYou) [导读]PySpark作为工业界常用于处理大数据以及分布式计算的工具,特别是在算法建模时起到了非常大的作用. ...

  3. 【PySpark入门】手把手实现PySpark机器学习项目-回归算法

    摘要   PySpark作为工业界常用于处理大数据以及分布式计算的工具,特别是在算法建模时起到了非常大的作用.PySpark如何建模呢?这篇文章手把手带你入门PySpark,提前感受工业界的建模过程! ...

  4. pyspark读取csv_手把手实现 PySpark 机器学习项目回归算法

    点击上方"AI有道",选择"星标"公众号 重磅干货,第一时间送达 摘要   PySpark作为工业界常用于处理大数据以及分布式计算的工具,特别是在算法建模时起到 ...

  5. pyspark读取csv_手把手教你实现PySpark机器学习项目——回归算法

    作者 | hecongqing 来源 | AI算法之心(ID:AIHeartForYou) [导读]PySpark作为工业界常用于处理大数据以及分布式计算的工具,特别是在算法建模时起到了非常大的作用. ...

  6. Pyspark机器学习:向量及其常用操作

    Spark版本:V3.2.1   本篇主要介绍pyspark.ml.linalg中的向量操作. 1. DenseVector(稠密向量) 1.1 创建 稠密向量和一般的数组差不多,其创建方法如下: f ...

  7. 【Pyspark教程】SQL、MLlib、Core等模块基础使用

    文章目录 零.Spark基本原理 0.1 pyspark.sql 核心类 0.2 spark的基本概念 0.3 spark部署方式 0.4 RDD数据结构 (1)创建RDD的2种方式 (2)RDD操作 ...

  8. 使用PySpark搭建机器学习模型

    使用PySpark搭建机器学习模型 文章目录 使用PySpark搭建机器学习模型 前言 搭建回归模型 1.加载数据集 2.拆分数据集 3.创建模型 4&5 模型训练与预测 6.模型评估 绘制折 ...

  9. MLlib 二分类问题

    MLlib 二分类问题 [课程性质:PySpark机器学习] 文章目录 1. 实验目标 2. 本次实验主要使用的 PythonPythonPython 库 3. 适用的对象 4. 实验步骤 步骤1 安 ...

最新文章

  1. show在php,show.php
  2. 被同事嘲笑说技术方案没深度?
  3. 算法--Hash算法及其应用场所
  4. mysql内测试连通性命令_怎么使用ping命令进行连通性测试
  5. java 多线程——一个定时调度的例子
  6. 基于Chrome插件的微博超话自动签到
  7. MYSQL授权root远程访问
  8. 【web前端特效源码】使用HTML5+CSS3+JavaScript制作一个进度条动画效果~适合初学者~超简单~ |前端开发|IT软件
  9. 洛谷P3709 大爷的字符串题 莫队
  10. 文献阅读(40)ICLR2021-Combining Label Propagation and Simple Models Out-performs Graph Neural Networks
  11. 今天要学习的技术点,Python 筛选数字,模块导入,特殊变量__all__ 实战博客
  12. 用Android studio设计贺卡,功能强大的贺卡设计制作软件推荐:Hallmark Card Studio
  13. thymeleaf中三元运算符嵌套写法
  14. 千淘万漉虽辛苦,吹尽狂沙始见金
  15. 1月6日科技资讯|小辣椒手机创始人王晓雁加入小米;手机 QQ 可显示对方实时电量
  16. The AudioContext was not allowed to start. It must be resumed (or created) after a user gesture on .
  17. java 代码加壳,关于java加壳和代码混淆
  18. 有关esp8266OTA升级的过程
  19. 基于java+ssm+mysql的高校后勤管理系统
  20. java插件已崩溃怎么处理_为什么使用插件就崩溃?

热门文章

  1. 一元钱兑换成1分、2分和5分的兑换方法种类
  2. android系统怎么清除数据,安卓手机怎么清理卸载后的残留数据
  3. 【Xcode】使用教程
  4. 物联网ESP8266 WIFI SoftAP模式一键配网,开源源码,测试OK可以直接使用
  5. The Art of Public Speaking——读后感
  6. 554 DT:SPM 163 smtp3解决方案
  7. 新加坡行前——上海一日游 13/09/2012
  8. Fortify SCA
  9. Anaconda3安装教程【详细图文教程】
  10. web安全之弱口令漏洞学习总结