pyspark基础学习——数据处理
目录
- 前言
- 一、准备工作和数据的导入选择
- 1.1 导入数据
- 1.2 选择数据子集:
- 1.3 列名重命名
- 二、数据清洗
- 2.1 检测空值数量
- 2.2 删除存在空值的行
- 2.3 forward,backward填充
- 三、 数据处理
- 3.1 数据筛选
- 3.2 数据统计
- 3.3 数据类型转换
- 3.4 采用SQL语法进行处理
- 四、数据导出
- 总结
前言
上一篇文章中讲了如何在windows下安装和检测: pyspark,同时简单介绍了运行的环境。本文想就我的一些学习经验,分享一下使用pyspark来处理csv文件上的一些常用的pyspark语法。
一、准备工作和数据的导入选择
运行python代码,第一件事当然是导入对应的包,同时我们要为spark先创建好相应的环境,并且,spark中支持SQL,而且在SQL中有众多的函数,因此我们可以创建SparkSession对象,为了后续SQL函数的调用,我们要导入functions包,以及数据类型转换的时候,我们要导入types的包。
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
1.1 导入数据
将csv文件导入为Dataframe样式:
header表示是否需要导入表头;inferSchema表示是否需要推导出数据的类型(false默认为string);delimiter表示指定分隔符进行读取。file对应文件的位置。
df1 = spark.read.options(header='True', inferSchema='True', delimiter=',').csv(file)
1.2 选择数据子集:
drop中填入不需要的列的列名。
df2 = df1.drop('列名')
1.3 列名重命名
df3=df2.withColumnRenamed("original name", "modified name")
如果有多个列的列名要进行修改,可以直接在后面再加上withColumnRenamed()进行修改
二、数据清洗
因为数据本身的问题,在处理的过程中需要我们对一些空值、异常值等进行处理。但是此次作业获取到的数据中主要是对空值的处理,因此对于异常值的处理不进行讨论
2.1 检测空值数量
df3.toPandas().isnull().sum()
2.2 删除存在空值的行
对于一些关键列的数据丢失、或是该行的缺失值占比较高的情况下,我们很难将人工将其弥补,因此直接对该行进行删除。
df_clear=df3.dropna(subset='列名')
2.3 forward,backward填充
forward: 前面一个值填充后面
backward:后面一个值填充前面
代码示例:
df = spark.createDataFrame([(1, 'd1',None),(1, 'd2',10),(1, 'd3',None),(1, 'd4',30),(1, 'd5',None),(1, 'd6',None),
],('id', 'day','temperature'))
df.show()
运行结果如下:
id | day | temperature |
---|---|---|
1 | d1 | null |
1 | d2 | 10 |
1 | d3 | null |
1 | d4 | 30 |
1 | d5 | null |
1 | d6 | null |
from pyspark.sql.window import Windowforward = Window.partitionBy('id').orderBy('day').rowsBetween(Window.unboundedPreceding, Window.currentRow)
backward = Window.partitionBy('id').orderBy('day').rowsBetween(Window.currentRow, Window.unboundedFollowing)df.withColumn('forward_fill', last('temperature', ignorenulls=True).over(forward))\.withColumn('backward_fill', first('temperature', ignorenulls=True).over(backward))\
.show()
填充后的结果如下表所示:
id | day | temperature | forward_fill | backward_fill |
---|---|---|---|---|
1 | d1 | null | null | 10 |
1 | d2 | 10 | 10 | 10 |
1 | d3 | null | 10 | 30 |
1 | d4 | 30 | 30 | 30 |
1 | d5 | null | 30 | null |
1 | d6 | null | 30 | null |
Window.unboundedPreceding:分区的开始位置
Window.currentRow:分区计算到现在的位置
Window.unboundedFollowing:分区的最后位置。
负数:表示若前面有元素,范围向前延申几个元素
0:表示当前位置,等价于Window.currentRow
正数:表示若后面有元素,范围向后延申几个元素
三、 数据处理
3.1 数据筛选
data1= df_clear.filter(df_clear['column'] == 'attribute') # 条件过滤
data2 = df_clear.select('column') # 选择某一列的数据
3.2 数据统计
# 输出树状结构(输出列名、数据类型和是否能为空值)
df_clear.printSchema()
# 将该列数据进行汇总统计
df_clear.select('column').describe().show()
# 求平均,按照id的方式进行统计
ave_column = df_clear.groupBy('id').agg({'column': 'mean'})
agg({“列名”,“函数名”})为聚合函数,其中有:
函数名 | 作用 |
---|---|
avg | 求均值 |
count | 计数 |
max | 求最大值 |
mean | 求均值 |
min | 求最小值 |
sum | 求和 |
3.3 数据类型转换
from pyspark.sql.functions import *
# 转换为Int类型
df_clear.withColumn("column",df.age.cast('int'))# 转换为String类型
df_clear.withColumn("column",df.age.cast('string'))# 转换为Data类型
df_clear= df_clear.withColumn('column', to_date(df_clear['column']))# 转换为TimestampType类型
dfTime=df_clear.withColumn('column',F.col('column').cast(TimestampType()))
3.4 采用SQL语法进行处理
df_sql_cf=df_clear.createOrReplaceTempView("carflow")
spark.sql("select * from carflow\where sum_Total_CF=\(select max(sum_Total_CF) from carflow)").show()
四、数据导出
# ascending表示是否为升序,默认为True
df_clear_asc= df_clear.orderBy("column",ascending=False)
# 将对应的数据类型转化为list,再导出为csv文件
df_asc= df_clear_asc.select(F.collect_list('column')).first()[0]
df_asc.select("col1","col2","col3").toPandas().to_csv("total.csv")
总结
由于此次学习仅用于完成课堂大作业,因此有不足之处还望各位大佬在评论区制指正,若是能够为你们提供一点小小的帮助,希望各位大佬们能动动手指,给小弟一个赞!感谢各位大佬们!
该作业的处理的源代码和相关数据已经传至github
pyspark基础学习——数据处理相关推荐
- 第12期:Spark零基础学习路线
大家好,我是你们的老朋友老王随聊,今天和大家讨论的话题--Spark零基础应该怎么学? 通过这段时间和群里同学们交流,发现很多大学生甚至职场小白对Spark学习路线不是很清晰,所以我花了一些时间给大家 ...
- 0基础学习数据分析必须掌握的技能有哪些?
现如今,我们处于一个互联网发展的时代,大大小小的企业对于数据分析相关岗位的需求逐渐增加,因为所有的企业都有数据,企业需要让数据分析师通过整理.分析企业数据总结出企业目前的发展现状,并且为企业做出下一步 ...
- python语言的单行注释以井号开头_推荐|零基础学习Python基础知识
原标题:推荐|零基础学习Python基础知识 Python是一种面向对象.解释型计算机程序设计语言.语法简洁清晰,强制用空白符作为语句缩进. Python具有丰富和强大的库,又被称为胶水语言.能把其他 ...
- 明日科技的python书籍怎么样_零基础学习Python不可错过的5本书籍
3.Python基础教程(第3版) 作者:[挪]芒努斯·利·海特兰德(Magnus Lie Hetland) 出版社:人民邮电出版社 Python3.5编程从入门到实践,Python入门佳作,机器学习 ...
- java学习_Python基础学习教程:从0学爬虫?让爬虫满足你的好奇心
Python基础学习教程:从0学爬虫?让爬虫满足你的好奇心 有必要学爬虫吗? 我想,这已经是一个不需要讨论的问题了. 爬虫,"有用"也"有趣"! 这个数据为王的 ...
- 键盘可以实现向计算机输入数据判断,计算机应用基础—学习指南.docx
计算机应用基础-学习指南 一.填空题1.世界上第一台电子计算机是在()年诞生的.A.1927 B.1936 C.1946 D.19522.世界上第一台计算机是().A.EDSAC B.ENIAC C. ...
- 8. SpringBoot基础学习笔记
SpringBoot基础学习笔记 课程前置知识说明 1 SpringBoot基础篇 1.1 快速上手SpringBoot SpringBoot入门程序制作 1.2 SpringBoot简介 1.2.1 ...
- 尚学堂JAVA基础学习笔记_2/2
尚学堂JAVA基础学习笔记_2/2 文章目录 尚学堂JAVA基础学习笔记_2/2 写在前面 第10章 IO技术 1. IO入门 2. IO的API 3. 装饰流 4. IO实战 5. CommonsI ...
- WiFi基础学习到实战(三:WiFi网络“物理层”)
欢迎大家一起学习探讨通信之WLAN.上节我们对802.11标准基于OSI模型,在"数据链路层"的定义进行了解,数据传送经过LLC层被封装为一个MSDU,通过MAC层封装为一个MPD ...
最新文章
- Java 蜡烛图_ta-lib 里的蜡烛图形态函数源码
- 批量删除推文_如何搜索(和删除)您的旧推文
- 笔记本电脑摄像头不能用_电脑没有摄像头怎么办
- Java集合系列:Vector解析
- 使用yum命令安装服务时,一直卡在Loaded plugins: fastestmirror Determining fastest mirrors
- 打破AI算力瓶颈,华为升级“智能计算”重塑IT行业
- linux检查python安装情况,使用Python检测Linux服务器连接状态
- Python深度学习入门学习路线(简单速成不掉头发)
- PADS9.5 导入立创元器件库卡顿
- 如何用html实现图片轮播,怎么单纯的用html+css实现图片轮播?
- ES3,ES5,ES6的区别
- java 生成二维码名片
- 2021-11-26学习总结
- 怎样快速查询单号物流信息,筛选出未签收的单号
- List集合关于Stream的操作
- 计算机弹音乐薛之谦的歌曲,薛之谦 万能音符(The Key) 薛之谦歌曲,薛之谦mp3在线试听 - 5nd音乐网...
- 王者荣耀无法提取模型
- wget 下载失败,使用“--no-check-certificate”,/C=US/O=Let‘s Encrypt/CN=R3” 颁发的证书
- 优锘科技:数字孪生为何大爆发?
- java 将ftl文件作为模板导出word文档
热门文章
- Python生成后缀表达式及计算
- 交流微电网仿真模型(包含PCS)【含个人笔记+建模过程】
- EasyRecovery帮助找回回收站清空的文件
- QSettings 操作注册表 与 ini 文件的优劣
- 酷派大神f1,酷派大神f2 无法打log的解决方法。
- 2021年世界互联网领先科技成果提名项目之类脑芯片 KA200
- 有关微型计算机系统总线描述正确的是(),【大学信息技术考试试题(附答案)】第一至三章练习...
- 区块链产教融合刻不容缓,高校该如何培养行业人才
- Git的理解以及在IDEA中的使用
- 同轴电缆75欧什么意思?这是高频电磁传播的概念是特性阻抗,不同于直流电路的电阻阻值。下文指出:同轴电缆的特征阻抗只与外导体的内径和内导体的外径有关,和电缆长度无关。测试原理TDR,史密斯,谐振法