import pandas as pd
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.types import *
from pyspark.sql import functions as F, Window

初始化与配置环境

# 配置集群
config = SparkConf()
# config.set('spark.dynamicAllocation.maxExecutors', '8')
# config.set('spark.driver.memory', '4G')
# config.set('spark.executor.memory', '8G')
# config.set('spark.executor.cores', '8')
# config.set('spark.yarn.executora.memoryOverhead', '4G')
# config.set('spark.sql.shuffle.partitions', '500')
# config.set('spark.default.parallelism', '500')
# config.set('spark.port.maxRetries', '1000')
# config.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
config.set('spark.master','local[4]')spark = SparkSession.builder.config(conf=config).getOrCreate()

创建DataFrame

字典方式创建

df = spark.createDataFrame([{'user_id': 'A203', 'country': 'India', 'browser': 'Chrome', 'OS': 'WIN', 'age': 33},{'user_id': 'A201', 'country': 'China', 'browser': 'Safari', 'OS': 'MacOs', 'age': 35},{'user_id': 'A205', 'country': 'UK', 'browser': 'Mozilla', 'OS': 'Linux', 'age': 25}
])
/usr/lib/spark/python/pyspark/sql/session.py:346: UserWarning: inferring schema from dict is deprecated,please use pyspark.sql.Row insteadwarnings.warn("inferring schema from dict is deprecated,"
df.show()
+-----+---+-------+-------+-------+
|   OS|age|browser|country|user_id|
+-----+---+-------+-------+-------+
|  WIN| 33| Chrome|  India|   A203|
|MacOs| 35| Safari|  China|   A201|
|Linux| 25|Mozilla|     UK|   A205|
+-----+---+-------+-------+-------+
df.printSchema()
root|-- OS: string (nullable = true)|-- age: long (nullable = true)|-- browser: string (nullable = true)|-- country: string (nullable = true)|-- user_id: string (nullable = true)

申明列类型创建

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.types

schema = StructType().add('user_id', StringType(), True).add('country', StringType(), True).add('browser', StringType(), True).add('OS', StringType(), True).add('age', IntegerType(), True)df = spark.createDataFrame([('A203', 'India', 'Chrome', 'WIN', 33),('A201', 'China', 'Safari', 'MacOS', 35),    ('A205', 'UK', 'Mozilla', 'Linux', 25),
], schema=schema)
df.show()
+-------+-------+-------+-----+---+
|user_id|country|browser|   OS|age|
+-------+-------+-------+-----+---+
|   A203|  India| Chrome|  WIN| 33|
|   A201|  China| Safari|MacOS| 35|
|   A205|     UK|Mozilla|Linux| 25|
+-------+-------+-------+-----+---+
df.printSchema()
root|-- user_id: string (nullable = true)|-- country: string (nullable = true)|-- browser: string (nullable = true)|-- OS: string (nullable = true)|-- age: integer (nullable = true)

其他方式加载数据

网上数据集

  • employees_df: https://query.data.world/s/mbwaztyugiidkaw4z32ptikrwrjqyl
  • salaries_df: https://query.data.world/s/wraycqnu6fopv56tcr2lwlyaujlhtz
# 从pandas加载数据到spark里
salaries_df = spark.createDataFrame(pd.read_csv('salaries_emp_no_less_20000.csv')).cache()
employees_df = spark.createDataFrame(pd.read_csv('employees_emp_no_less_20000.csv')).cache()salaries_df.show(5)
employees_df.show(5)
+------+------+----------+----------+
|emp_no|salary| from_date|   to_date|
+------+------+----------+----------+
| 10001| 60117|1986-06-26|1987-06-26|
| 10001| 62102|1987-06-26|1988-06-25|
| 10001| 66074|1988-06-25|1989-06-25|
| 10001| 66596|1989-06-25|1990-06-25|
| 10001| 66961|1990-06-25|1991-06-25|
+------+------+----------+----------+
only showing top 5 rows+------+----------+----------+---------+------+----------+
|emp_no|birth_date|first_name|last_name|gender| hire_date|
+------+----------+----------+---------+------+----------+
| 10001|1953-09-02|    Georgi|  Facello|     M|1986-06-26|
| 10002|1964-06-02|   Bezalel|   Simmel|     F|1985-11-21|
| 10003|1959-12-03|     Parto|  Bamford|     M|1986-08-28|
| 10004|1954-05-01| Chirstian|  Koblick|     M|1986-12-01|
| 10005|1955-01-21|   Kyoichi| Maliniak|     M|1989-09-12|
+------+----------+----------+---------+------+----------+
only showing top 5 rows

空值处理

df_na = spark.createDataFrame([{'user_id': 'A203', 'country': None, 'browser': 'Chrome', 'OS': 'WIN', 'age': 33},{'user_id': 'A201', 'country': 'China', 'browser': None, 'OS': 'MacOs', 'age': 35},{'user_id': 'A205', 'country': 'UK', 'browser': 'Mozilla', 'OS': 'Linux', 'age': 25}
])
df_na.show()
+-----+---+-------+-------+-------+
|   OS|age|browser|country|user_id|
+-----+---+-------+-------+-------+
|  WIN| 33| Chrome|   null|   A203|
|MacOs| 35|   null|  China|   A201|
|Linux| 25|Mozilla|     UK|   A205|
+-----+---+-------+-------+-------+
# 数据分布
df_na.summary().show()
+-------+-----+-----------------+-------+-------+-------+
|summary|   OS|              age|browser|country|user_id|
+-------+-----+-----------------+-------+-------+-------+
|  count|    3|                3|      2|      2|      3|
|   mean| null|             31.0|   null|   null|   null|
| stddev| null|5.291502622129181|   null|   null|   null|
|    min|Linux|               25| Chrome|  China|   A201|
|    25%| null|               25|   null|   null|   null|
|    50%| null|               33|   null|   null|   null|
|    75%| null|               35|   null|   null|   null|
|    max|  WIN|               35|Mozilla|     UK|   A205|
+-------+-----+-----------------+-------+-------+-------+
# 空值填充为空字符串''
df_na.fillna('').show()
+-----+---+-------+-------+-------+
|   OS|age|browser|country|user_id|
+-----+---+-------+-------+-------+
|  WIN| 33| Chrome|       |   A203|
|MacOs| 35|       |  China|   A201|
|Linux| 25|Mozilla|     UK|   A205|
+-----+---+-------+-------+-------+
# 按列填充空值
df_na.fillna({'browser': 'unknown', 'country': 'unknown'}).show()
+-----+---+-------+-------+-------+
|   OS|age|browser|country|user_id|
+-----+---+-------+-------+-------+
|  WIN| 33| Chrome|unknown|   A203|
|MacOs| 35|unknown|  China|   A201|
|Linux| 25|Mozilla|     UK|   A205|
+-----+---+-------+-------+-------+
# 删除为空的数据
df_na.na.drop().show()
+-----+---+-------+-------+-------+
|   OS|age|browser|country|user_id|
+-----+---+-------+-------+-------+
|Linux| 25|Mozilla|     UK|   A205|
+-----+---+-------+-------+-------+
# 删除某列为空的数据
df_na.na.drop(subset='browser').show()
+-----+---+-------+-------+-------+
|   OS|age|browser|country|user_id|
+-----+---+-------+-------+-------+
|  WIN| 33| Chrome|   null|   A203|
|Linux| 25|Mozilla|     UK|   A205|
+-----+---+-------+-------+-------+
# 删除某列
df_na.drop('os').show()
+---+-------+-------+-------+
|age|browser|country|user_id|
+---+-------+-------+-------+
| 33| Chrome|   null|   A203|
| 35|   null|  China|   A201|
| 25|Mozilla|     UK|   A205|
+---+-------+-------+-------+

类SQL操作

  • Select
  • Filter
  • Where
  • Aggregations

方案一: 求职工最大工资

# 使用group by的方式
employees_df.join(salaries_df,salaries_df['emp_no']==employees_df['emp_no']
).withColumn('gender_custom',F.when(employees_df['gender']=='M', 'male').otherwise('female')
).groupBy(employees_df['emp_no'],employees_df['birth_date']    ,employees_df['first_name']    ,employees_df['last_name'],'gender_custom'
).agg(F.max('salary').alias('max_salary'),F.count('salary').alias('salary_change_times')
).select(employees_df['emp_no'],employees_df['birth_date'],employees_df['first_name'],employees_df['last_name'],'gender_custom'   ,'max_salary','salary_change_times'
).where((employees_df['emp_no']>=10001) & (employees_df['emp_no']<=10005)
).orderBy('emp_no'
).show()
+------+----------+----------+---------+-------------+----------+-------------------+
|emp_no|birth_date|first_name|last_name|gender_custom|max_salary|salary_change_times|
+------+----------+----------+---------+-------------+----------+-------------------+
| 10001|1953-09-02|    Georgi|  Facello|         male|     88958|                 17|
| 10002|1964-06-02|   Bezalel|   Simmel|       female|     72527|                  6|
| 10003|1959-12-03|     Parto|  Bamford|         male|     43699|                  7|
| 10004|1954-05-01| Chirstian|  Koblick|         male|     74057|                 16|
| 10005|1955-01-21|   Kyoichi| Maliniak|         male|     94692|                 13|
+------+----------+----------+---------+-------------+----------+-------------------+

方案二: 求职工最大工资

# 使用窗口函数 TopN
employees_df.join(salaries_df,salaries_df['emp_no']==employees_df['emp_no']
).withColumn('gender_custom',F.when(employees_df['gender']=='M', 'male').otherwise('female')
).withColumn('index',F.row_number().over(Window.partitionBy(employees_df['emp_no']).orderBy(salaries_df['salary'].desc()))
).filter(F.col('index')==1
).select(employees_df['emp_no'],employees_df['birth_date'],employees_df['first_name'],employees_df['last_name'],F.col('gender_custom'),F.col('salary').alias('max_salary')
).where((employees_df['emp_no']>=10001) & (employees_df['emp_no']<=10005)
).orderBy('emp_no'
).show()
+------+----------+----------+---------+-------------+----------+
|emp_no|birth_date|first_name|last_name|gender_custom|max_salary|
+------+----------+----------+---------+-------------+----------+
| 10001|1953-09-02|    Georgi|  Facello|         male|     88958|
| 10002|1964-06-02|   Bezalel|   Simmel|       female|     72527|
| 10003|1959-12-03|     Parto|  Bamford|         male|     43699|
| 10004|1954-05-01| Chirstian|  Koblick|         male|     74057|
| 10005|1955-01-21|   Kyoichi| Maliniak|         male|     94692|
+------+----------+----------+---------+-------------+----------+

pyspark | 数据处理基本操作相关推荐

  1. pyspark rdd 基本操作

    pyspark rdd 基本操作 原文链接 #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on F ...

  2. 数据结构(03)— 数据处理基本操作(数据的查找、新增、删除、修改)

    我们先来看一个关于查找的例子.查找,就是从复杂的数据结构中,找到满足某个条件的元素.通常可从以下两个方面来对数据进行查找操作:​ 根据元素的位置或索引来查找: 根据元素的数值特征来查找. 针对上述两种 ...

  3. SM74HC595D电路级联教程[转载]

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 本文链接: https://blog.csdn.net/zwj695535100/articl ...

  4. 【Spark】(task1)PySpark基础数据处理

    学习总结 文章目录 学习总结 一.Spark介绍 1.1 Scala和PySpark 1.2 Spark原理 1.3 一个具体栗子 二.安装方式 三.测试是否安装成功 四.Spark程序的模块分类 五 ...

  5. pyspark的聚合函数agg使用

    pyspark中聚合函数agg的使用   作为聚合函数agg,通常是和分组函数groupby一起使用,表示对分组后的数据进行聚合操作: 如果没有分组函数,默认是对整个dataframe进行聚合操作. ...

  6. Coggle数据科学Spark基础

    任务1:PySpark数据处理 步骤1:使用Python链接Spark环境 步骤2:创建dateframe数据 步骤3:用spark执行以下逻辑:找到数据行数.列数 步骤4:用spark筛选class ...

  7. Python数据分析【第11天】| DataFrame转化格式并保存(to_excel(),to_json(),to_csv())

    系列文章目录 第1天:读入数据 第2天:read().readline()与readlines() 第3天:进度条(tqdm模块) 第4天:命令行传参(argparse模块) 第5天:读.写json文 ...

  8. 2019年Apache Spark技术交流社区原创文章回顾

    整理了这一年(本号开通半年)分享过的来自诸多专家的实践经验,希望2020年我们仍然能够互相支持,壮大Spark社区. 感谢持续分享输出优质内容的阿里云EMR团队的王道远,余根茂,彭搏,郑锴,夏立,林武 ...

  9. 这个公众号到底有没有好文章?我整理了300篇,觉得不好我跪榴莲!

    花了6个小时,总算整理好了,嗯,榴莲,真香! 大数据成神之路 Spark/Kafka/Flink/ElasticSearch/Hadoop/Hbase/Hive/Yarn/Kylin/Redis/.. ...

最新文章

  1. 构建消费者数据平台(CDP),实现全域消费者数字化运营闭环
  2. 原码、补码、反码详解
  3. osgi学习之---扩展点理解
  4. 约瑟夫问题的学习(基于循环链表)以及基于循环数组
  5. Linux 下 NFS服务的搭建
  6. 卡巴斯基授权许可文件_制片方未提供电视台授权证明,构成根本违约吗?
  7. 程序员因拒绝带电脑回家工作被开除!获赔19.4万元
  8. (转)MyBatis框架的学习(二)——MyBatis架构与入门
  9. java最基础的小总结
  10. [转帖]我们是OIer、
  11. 查看磁盘阵列 使用率(简单)
  12. 呼吸灯效果html,css之呼吸灯效果
  13. svn基础学习之常用知识
  14. webservice:com.sun.xml.internal.ws.server.ServerRtException: [failed to localize]
  15. jumpserver文件的上传和下载
  16. xfs和ext4文件系统大小调整
  17. 计算机应用财会,财会与计算机应用专业生的自我评价
  18. 算法:循环赛日程表_一般化(n可以为奇数,也可以为偶数)
  19. JAVA 特约商户进件对接
  20. 管理SQL Server AlwaysOn(1)——基础维护

热门文章

  1. 【肌肤老化的七大征兆】
  2. 多商户商城系统功能拆解26讲-平台端分销设置
  3. PS 选区抠图 02
  4. 寻仙手游维护公告服务器停服更新,寻仙手游12月21日停服更新公告 海量新内容...
  5. 苹果亮度自动调节怎么关闭_如果你对手机的自动亮度调节不满意,试试这个quot;velis自动亮度quot;...
  6. 如何从论文中实现算法复现(译)
  7. 自己配音很难听,声音不好听可以做博主吗
  8. 价值200元的小程序卡劵开发免费送
  9. bat 判断copy是否执行成功
  10. python排序算法代码