pyspark | 数据处理基本操作
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.types import *
from pyspark.sql import functions as F, Window
初始化与配置环境
# 配置集群
config = SparkConf()
# config.set('spark.dynamicAllocation.maxExecutors', '8')
# config.set('spark.driver.memory', '4G')
# config.set('spark.executor.memory', '8G')
# config.set('spark.executor.cores', '8')
# config.set('spark.yarn.executora.memoryOverhead', '4G')
# config.set('spark.sql.shuffle.partitions', '500')
# config.set('spark.default.parallelism', '500')
# config.set('spark.port.maxRetries', '1000')
# config.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
config.set('spark.master','local[4]')spark = SparkSession.builder.config(conf=config).getOrCreate()
创建DataFrame
字典方式创建
df = spark.createDataFrame([{'user_id': 'A203', 'country': 'India', 'browser': 'Chrome', 'OS': 'WIN', 'age': 33},{'user_id': 'A201', 'country': 'China', 'browser': 'Safari', 'OS': 'MacOs', 'age': 35},{'user_id': 'A205', 'country': 'UK', 'browser': 'Mozilla', 'OS': 'Linux', 'age': 25}
])
/usr/lib/spark/python/pyspark/sql/session.py:346: UserWarning: inferring schema from dict is deprecated,please use pyspark.sql.Row insteadwarnings.warn("inferring schema from dict is deprecated,"
df.show()
+-----+---+-------+-------+-------+
| OS|age|browser|country|user_id|
+-----+---+-------+-------+-------+
| WIN| 33| Chrome| India| A203|
|MacOs| 35| Safari| China| A201|
|Linux| 25|Mozilla| UK| A205|
+-----+---+-------+-------+-------+
df.printSchema()
root|-- OS: string (nullable = true)|-- age: long (nullable = true)|-- browser: string (nullable = true)|-- country: string (nullable = true)|-- user_id: string (nullable = true)
申明列类型创建
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.types
schema = StructType().add('user_id', StringType(), True).add('country', StringType(), True).add('browser', StringType(), True).add('OS', StringType(), True).add('age', IntegerType(), True)df = spark.createDataFrame([('A203', 'India', 'Chrome', 'WIN', 33),('A201', 'China', 'Safari', 'MacOS', 35), ('A205', 'UK', 'Mozilla', 'Linux', 25),
], schema=schema)
df.show()
+-------+-------+-------+-----+---+
|user_id|country|browser| OS|age|
+-------+-------+-------+-----+---+
| A203| India| Chrome| WIN| 33|
| A201| China| Safari|MacOS| 35|
| A205| UK|Mozilla|Linux| 25|
+-------+-------+-------+-----+---+
df.printSchema()
root|-- user_id: string (nullable = true)|-- country: string (nullable = true)|-- browser: string (nullable = true)|-- OS: string (nullable = true)|-- age: integer (nullable = true)
其他方式加载数据
网上数据集
- employees_df: https://query.data.world/s/mbwaztyugiidkaw4z32ptikrwrjqyl
- salaries_df: https://query.data.world/s/wraycqnu6fopv56tcr2lwlyaujlhtz
# 从pandas加载数据到spark里
salaries_df = spark.createDataFrame(pd.read_csv('salaries_emp_no_less_20000.csv')).cache()
employees_df = spark.createDataFrame(pd.read_csv('employees_emp_no_less_20000.csv')).cache()salaries_df.show(5)
employees_df.show(5)
+------+------+----------+----------+
|emp_no|salary| from_date| to_date|
+------+------+----------+----------+
| 10001| 60117|1986-06-26|1987-06-26|
| 10001| 62102|1987-06-26|1988-06-25|
| 10001| 66074|1988-06-25|1989-06-25|
| 10001| 66596|1989-06-25|1990-06-25|
| 10001| 66961|1990-06-25|1991-06-25|
+------+------+----------+----------+
only showing top 5 rows+------+----------+----------+---------+------+----------+
|emp_no|birth_date|first_name|last_name|gender| hire_date|
+------+----------+----------+---------+------+----------+
| 10001|1953-09-02| Georgi| Facello| M|1986-06-26|
| 10002|1964-06-02| Bezalel| Simmel| F|1985-11-21|
| 10003|1959-12-03| Parto| Bamford| M|1986-08-28|
| 10004|1954-05-01| Chirstian| Koblick| M|1986-12-01|
| 10005|1955-01-21| Kyoichi| Maliniak| M|1989-09-12|
+------+----------+----------+---------+------+----------+
only showing top 5 rows
空值处理
df_na = spark.createDataFrame([{'user_id': 'A203', 'country': None, 'browser': 'Chrome', 'OS': 'WIN', 'age': 33},{'user_id': 'A201', 'country': 'China', 'browser': None, 'OS': 'MacOs', 'age': 35},{'user_id': 'A205', 'country': 'UK', 'browser': 'Mozilla', 'OS': 'Linux', 'age': 25}
])
df_na.show()
+-----+---+-------+-------+-------+
| OS|age|browser|country|user_id|
+-----+---+-------+-------+-------+
| WIN| 33| Chrome| null| A203|
|MacOs| 35| null| China| A201|
|Linux| 25|Mozilla| UK| A205|
+-----+---+-------+-------+-------+
# 数据分布
df_na.summary().show()
+-------+-----+-----------------+-------+-------+-------+
|summary| OS| age|browser|country|user_id|
+-------+-----+-----------------+-------+-------+-------+
| count| 3| 3| 2| 2| 3|
| mean| null| 31.0| null| null| null|
| stddev| null|5.291502622129181| null| null| null|
| min|Linux| 25| Chrome| China| A201|
| 25%| null| 25| null| null| null|
| 50%| null| 33| null| null| null|
| 75%| null| 35| null| null| null|
| max| WIN| 35|Mozilla| UK| A205|
+-------+-----+-----------------+-------+-------+-------+
# 空值填充为空字符串''
df_na.fillna('').show()
+-----+---+-------+-------+-------+
| OS|age|browser|country|user_id|
+-----+---+-------+-------+-------+
| WIN| 33| Chrome| | A203|
|MacOs| 35| | China| A201|
|Linux| 25|Mozilla| UK| A205|
+-----+---+-------+-------+-------+
# 按列填充空值
df_na.fillna({'browser': 'unknown', 'country': 'unknown'}).show()
+-----+---+-------+-------+-------+
| OS|age|browser|country|user_id|
+-----+---+-------+-------+-------+
| WIN| 33| Chrome|unknown| A203|
|MacOs| 35|unknown| China| A201|
|Linux| 25|Mozilla| UK| A205|
+-----+---+-------+-------+-------+
# 删除为空的数据
df_na.na.drop().show()
+-----+---+-------+-------+-------+
| OS|age|browser|country|user_id|
+-----+---+-------+-------+-------+
|Linux| 25|Mozilla| UK| A205|
+-----+---+-------+-------+-------+
# 删除某列为空的数据
df_na.na.drop(subset='browser').show()
+-----+---+-------+-------+-------+
| OS|age|browser|country|user_id|
+-----+---+-------+-------+-------+
| WIN| 33| Chrome| null| A203|
|Linux| 25|Mozilla| UK| A205|
+-----+---+-------+-------+-------+
# 删除某列
df_na.drop('os').show()
+---+-------+-------+-------+
|age|browser|country|user_id|
+---+-------+-------+-------+
| 33| Chrome| null| A203|
| 35| null| China| A201|
| 25|Mozilla| UK| A205|
+---+-------+-------+-------+
类SQL操作
- Select
- Filter
- Where
- Aggregations
方案一: 求职工最大工资
# 使用group by的方式
employees_df.join(salaries_df,salaries_df['emp_no']==employees_df['emp_no']
).withColumn('gender_custom',F.when(employees_df['gender']=='M', 'male').otherwise('female')
).groupBy(employees_df['emp_no'],employees_df['birth_date'] ,employees_df['first_name'] ,employees_df['last_name'],'gender_custom'
).agg(F.max('salary').alias('max_salary'),F.count('salary').alias('salary_change_times')
).select(employees_df['emp_no'],employees_df['birth_date'],employees_df['first_name'],employees_df['last_name'],'gender_custom' ,'max_salary','salary_change_times'
).where((employees_df['emp_no']>=10001) & (employees_df['emp_no']<=10005)
).orderBy('emp_no'
).show()
+------+----------+----------+---------+-------------+----------+-------------------+
|emp_no|birth_date|first_name|last_name|gender_custom|max_salary|salary_change_times|
+------+----------+----------+---------+-------------+----------+-------------------+
| 10001|1953-09-02| Georgi| Facello| male| 88958| 17|
| 10002|1964-06-02| Bezalel| Simmel| female| 72527| 6|
| 10003|1959-12-03| Parto| Bamford| male| 43699| 7|
| 10004|1954-05-01| Chirstian| Koblick| male| 74057| 16|
| 10005|1955-01-21| Kyoichi| Maliniak| male| 94692| 13|
+------+----------+----------+---------+-------------+----------+-------------------+
方案二: 求职工最大工资
# 使用窗口函数 TopN
employees_df.join(salaries_df,salaries_df['emp_no']==employees_df['emp_no']
).withColumn('gender_custom',F.when(employees_df['gender']=='M', 'male').otherwise('female')
).withColumn('index',F.row_number().over(Window.partitionBy(employees_df['emp_no']).orderBy(salaries_df['salary'].desc()))
).filter(F.col('index')==1
).select(employees_df['emp_no'],employees_df['birth_date'],employees_df['first_name'],employees_df['last_name'],F.col('gender_custom'),F.col('salary').alias('max_salary')
).where((employees_df['emp_no']>=10001) & (employees_df['emp_no']<=10005)
).orderBy('emp_no'
).show()
+------+----------+----------+---------+-------------+----------+
|emp_no|birth_date|first_name|last_name|gender_custom|max_salary|
+------+----------+----------+---------+-------------+----------+
| 10001|1953-09-02| Georgi| Facello| male| 88958|
| 10002|1964-06-02| Bezalel| Simmel| female| 72527|
| 10003|1959-12-03| Parto| Bamford| male| 43699|
| 10004|1954-05-01| Chirstian| Koblick| male| 74057|
| 10005|1955-01-21| Kyoichi| Maliniak| male| 94692|
+------+----------+----------+---------+-------------+----------+
pyspark | 数据处理基本操作相关推荐
- pyspark rdd 基本操作
pyspark rdd 基本操作 原文链接 #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on F ...
- 数据结构(03)— 数据处理基本操作(数据的查找、新增、删除、修改)
我们先来看一个关于查找的例子.查找,就是从复杂的数据结构中,找到满足某个条件的元素.通常可从以下两个方面来对数据进行查找操作: 根据元素的位置或索引来查找: 根据元素的数值特征来查找. 针对上述两种 ...
- SM74HC595D电路级联教程[转载]
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 本文链接: https://blog.csdn.net/zwj695535100/articl ...
- 【Spark】(task1)PySpark基础数据处理
学习总结 文章目录 学习总结 一.Spark介绍 1.1 Scala和PySpark 1.2 Spark原理 1.3 一个具体栗子 二.安装方式 三.测试是否安装成功 四.Spark程序的模块分类 五 ...
- pyspark的聚合函数agg使用
pyspark中聚合函数agg的使用 作为聚合函数agg,通常是和分组函数groupby一起使用,表示对分组后的数据进行聚合操作: 如果没有分组函数,默认是对整个dataframe进行聚合操作. ...
- Coggle数据科学Spark基础
任务1:PySpark数据处理 步骤1:使用Python链接Spark环境 步骤2:创建dateframe数据 步骤3:用spark执行以下逻辑:找到数据行数.列数 步骤4:用spark筛选class ...
- Python数据分析【第11天】| DataFrame转化格式并保存(to_excel(),to_json(),to_csv())
系列文章目录 第1天:读入数据 第2天:read().readline()与readlines() 第3天:进度条(tqdm模块) 第4天:命令行传参(argparse模块) 第5天:读.写json文 ...
- 2019年Apache Spark技术交流社区原创文章回顾
整理了这一年(本号开通半年)分享过的来自诸多专家的实践经验,希望2020年我们仍然能够互相支持,壮大Spark社区. 感谢持续分享输出优质内容的阿里云EMR团队的王道远,余根茂,彭搏,郑锴,夏立,林武 ...
- 这个公众号到底有没有好文章?我整理了300篇,觉得不好我跪榴莲!
花了6个小时,总算整理好了,嗯,榴莲,真香! 大数据成神之路 Spark/Kafka/Flink/ElasticSearch/Hadoop/Hbase/Hive/Yarn/Kylin/Redis/.. ...
最新文章
- 构建消费者数据平台(CDP),实现全域消费者数字化运营闭环
- 原码、补码、反码详解
- osgi学习之---扩展点理解
- 约瑟夫问题的学习(基于循环链表)以及基于循环数组
- Linux 下 NFS服务的搭建
- 卡巴斯基授权许可文件_制片方未提供电视台授权证明,构成根本违约吗?
- 程序员因拒绝带电脑回家工作被开除!获赔19.4万元
- (转)MyBatis框架的学习(二)——MyBatis架构与入门
- java最基础的小总结
- [转帖]我们是OIer、
- 查看磁盘阵列 使用率(简单)
- 呼吸灯效果html,css之呼吸灯效果
- svn基础学习之常用知识
- webservice:com.sun.xml.internal.ws.server.ServerRtException: [failed to localize]
- jumpserver文件的上传和下载
- xfs和ext4文件系统大小调整
- 计算机应用财会,财会与计算机应用专业生的自我评价
- 算法:循环赛日程表_一般化(n可以为奇数,也可以为偶数)
- JAVA 特约商户进件对接
- 管理SQL Server AlwaysOn(1)——基础维护