PySpark简介、搭建以及使用
目录
- 一、PySpark简介
- 使用场景
- 结构体系
- 二、PySpark集成搭建
- 三、 PySpark的使用
- PySpark包介绍
- PySpark处理数据
- PySpark中使用匿名函数
- 加载本地文件
- PySpark中使用SparkSQL
- Spark与Python第三方库混用
- Pandas DF与Spark DF
- 使用PySpark通过图形进行数据探索
一、PySpark简介
使用场景
大数据处理或机器学习时的原型( prototype)开发
- 验证算法
- 执行效率可能不高
- 要求能够快速开发
结构体系
二、PySpark集成搭建
准备环境:JDK、Spark需要提前安装好
下载Anaconbda
- 地址:点击这里
- 选择:Anaconda3-5.1.0-Linux-x86_64.sh
至于版本最好不要使用过低版本,可能无法使用
安装bzip2
缺少 bzip2 安装 Anaconda 会失败
- 在Linux下安装bzip2:
yum install -y bzip2
上传/解压Anaconbda
将下载好的Anaconbda上传至Linux中
解压安装Anaconbda:
bash Anaconda3-5.1.0-Linux-x86_64.sh
回车,开始安装,然后提示接受协议(输入yes回车),然后指定安装到的位置,根路径必须已存在,(否则默认安装在/root/anaconbda3下面)
处理完上面的步骤后会提示是否自动添加环境变量,输入yes即可
然后还会提示是否安装VSCode,这里linux不需要安装,输入no即可
Linux默认自带python,安装Anacondd会覆盖原有的Python,可以通过修改.bashrc使两个版本pyrhon共存
设置两个版本的python共存
- 配置文件:
vim /root/.bashrc
#添加以下内容,自行修改自己安装的路径
export PATH="/opt/install/anaconda3/bin:$PATH"
alias pyana="/opt/install/anaconda3/bin/python"
alias python="/bin/python"
- 保存退出后生效配置文件:
source /root/.bashrc
生成 PySpark 配置文件
在当前用户文件夹下运行以下命令生成配置文件:
jupyter notebook --generate-config
查看生成的配置文件:
ll /root/.jupyter/
修改配置文件,但在这之前,需要先执行以下操作
使用 pyana,进入交互模式,运行以下代码
from notebook.auth import passwd
passwd()
#按照提示设置密码后会生成与之对应的加密密码,然后保存这个生成的字符串,后面会赋值给 c.NotebookApp.password 属性
- 修改配置文件,允许从外部访问 Jupyter:
vi ./.jupyter/jupyter_notebook_config.py
c.NotebookApp.allow_root = True
c.NotebookApp.ip = '*'
c.NotebookApp.open_browser = False
c.NotebookApp.password = 'sha1:*****************'#将前面生成的值放到这里
c.NotebookApp.port = 7070 #指定外部访问的端口号
- 修改环境变量,将Jupyter作为PySpark的编辑运行工具:
vim /root/.bashrc
export PYSPARK_PYTHON=/opt/install/anaconda3/bin/python3 #指定/anaconda3/bin/python3
export PYSPARK_DRIVER_PYTHON=/opt/install/anaconda3/bin/jupyter #指定/anaconda3/bin/jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
ipython_opts="notebook -pylab inline"
生效环境变量:
source /root/.bashrc
注意关闭防火墙
启动pyspark:
pyspark
使用浏览器打开Jupyter:
192.168.**.**:7070
,并输入预先设置的密码
这里安装就算完成了
三、 PySpark的使用
- 初次使用建议创建一个文件夹,在这个文件夹保存操作过的代码
- 进入到新创建的文件夹下面,new->python3
- 然后就可以开始操作学习
- 执行命令
shift+回车:执行并开启新的一行
ctrl+回车:仅执行
PySpark包介绍
PySpark
Core Classes:
pyspark.SparkContext
pyspark.RDD
pyspark.sql.SQLContext
pyspark.sql.DataFrame
pyspark.streaming
pyspark.streaming.StreamingContext
pyspark.streaming.DStream
pyspark.ml
pyspark.mllib
PySpark处理数据
- 导包
from pyspark import SparkContext
- 获取SparkContext对象
sc=SparkContext.getOrCreate()
- 创建RDD
#不支持
makeRDD()
#支持
parallelize()
textFile()
wholeTextFiles()
- 演示
PySpark中使用匿名函数
- 使用Python的Lambda函数实现匿名函数
- scala与python对比
#scala
val a=sc.parallelize(List("dog","tiger","lion","cat","panther","eagle"))
val b=a.map(x=>(x,1))
b.collect#python
a=sc.parallelize(("dog","tiger","lion","cat","panther","eagle"))
b=a.map(lambda x:(x,1))
b.collect()
- 演示
加载本地文件
addFile(path, recursive = False)
- 接收本地文件
- 通过SparkFiles.get()方法来获取文件的绝对路径
addPyFile( path )
- 加载已存在的文件并调用其中的方法
- 现在本地创建一个文件:
vi sci.py
写入下面两个方法人,然后保存退出
#sci.py
def sqrt(num):return num * numdef circle_area(r):return 3.14 * sqrt(r)
- 在pyspark中通过addPyFile加载该文件
#加载预写入方法的文件
sc.addPyFile("file:///root/sci.py")
#导入文件中的方法
from sci import circle_area
#创建rdd并使用文件中的方法
sc.parallelize([5, 9, 21]).map(lambda x : circle_area(x)).collect()
- 演示
PySpark中使用SparkSQL
- 导包
from pyspark.sql import SparkSession
- 创建SparkSession对象
ss = SparkSession.builder.getOrCreate()
- 加载csv文件
ss.read.format("csv").option("header", "true").load("file:///xxx.csv")
演示
- 测试数据
Afghanistan 48.673000 SAs
Albania 76.918000 EuCA
Algeria 73.131000 MENA
Angola 51.093000 SSA
Argentina 75.901000 Amer
Armenia 74.241000 EuCA
Aruba 75.246000 Amer
Australia 81.907000 EAP
Austria 80.854000 EuCA
Azerbaijan 70.739000 EuCA
Bahamas 75.620000 Amer
Bahrain 75.057000 MENA
Bangladesh 68.944000 SAs
Barbados 76.835000 Amer
Belarus 70.349000 EuCA
Belgium 80.009000 EuCA
Belize 76.072000 Amer
Benin 56.081000 SSA
Bhutan 67.185000 SAs
Bolivia 66.618000 Amer
Bosnia_and_Herzegovina 75.670000 EuCA
Botswana 53.183000 SSA
Brazil 73.488000 Amer
Brunei 78.005000 EAP
Bulgaria 73.371000 EuCA
Burkina_Faso 55.439000 SSA
Burundi 50.411000 SSA
Cambodia 63.125000 EAP
Cameroon 51.610000 SSA
Canada 81.012000 Amer
Cape_Verde 74.156000 SSA
Central_African_Rep. 48.398000 SSA
Chad 49.553000 SSA
Channel_Islands 80.055000 EuCA
Chile 79.120000 Amer
China 73.456000 EAP
Colombia 73.703000 Amer
Comoros 61.061000 SSA
Congo_Dem._Rep. 48.397000 SSA
Congo_Rep. 57.379000 SSA
Costa_Rica 79.311000 Amer
Cote_d'Ivoire 55.377000 SSA
Croatia 76.640000 EuCA
Cuba 79.143000 Amer
- 操作代码
#导包
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
#创建sparkSession对象
ss = SparkSession.builder.getOrCreate()#读取本地csv文件,并为每列设置名称
#pyspark中一条语句换行需要加斜杠
df = ss.read.format("csv").option("delimiter", " ").load("file:///root/example/LifeExpentancy.txt") \.withColumn("Country", col("_c0")) \.withColumn("LifeExp", col("_c2").cast(DoubleType())) \.withColumn("Region", col("_c4")) \.select(col("Country"), col("LifeExp"), col("Region"))
df.describe("LifeExp").show()
- 效果展示
Spark与Python第三方库混用
- 使用Spark做大数据ETL
- 处理后的数据使用Python第三方库分析或展示
1.Pandas做数据分析#Pandas DataFrame 转 Spark DataFrame
spark.createDataFrame(pandas_df)#Spark DataFrame转Pandas DataFrame
spark_df.toPandas() 2.Matplotlib实现数据可视化3.Scikit-learn完成机器学习
Pandas DF与Spark DF
- PandasDF与SparkDF间的转换方法
- 测试数据
- 操作代码
# Pandas DataFrame to Spark DataFrame
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
pandas_df = pd.read_csv("./products.csv", header=None, usecols=[1, 3, 5])
print(pandas_df)# convert to Spark DataFrame
spark_df = spark.createDataFrame(pandas_df)
spark_df.show()
df2 = spark_df.withColumnRenamed("1", "id").withColumnRenamed("3", "name").withColumnRenamed("5", "remark")# convert back to Pandas DataFrame
df2.toPandas()
- 演示
使用PySpark通过图形进行数据探索
- 将数据划分为多个区间,并统计区间中的数据个数
# 获取上面演示示例中的第一个df对象
rdd = df.select("LifeExp").rdd.map(lambda x: x[0])
#把数据划为10个区间,并获得每个区间中的数据个数
(countries, bins) = rdd.histogram(10)
print(countries)
print(bins)#导入图形生成包
import matplotlib.pyplot as plt
import numpy as np plt.hist(rdd.collect(), 10) # by default the # of bins is 10
plt.title("Life Expectancy Histogram")
plt.xlabel("Life Expectancy")
plt.ylabel("# of Countries")
- 演示
PySpark简介、搭建以及使用相关推荐
- 【Youtobe trydjango】Django2.2教程和React实战系列一【项目简介 | 搭建 | 工具】
[Youtobe trydjango]Django2.2教程和React实战系列一[项目简介 | 搭建 | 工具] 1.环境与选型说明 2.技术栈选型说明 3.django搭建详解 3.1. 项目虚拟 ...
- hbase简介 搭建
1 概念和地位: 是Hadoop领域的数据库,(类比于 hive在Hadoop领域文件系统的更高层抽象使用和封装) 高可靠性.高性能.面向列.可伸缩的分布式存储系统,(可伸缩:非常容易增加节点让计算和 ...
- pySpark环境搭建
1.序 由于笔者目前用python比较多,所以想安装下pySpark,并且在Anaconda2中调用. (1)jdk-8u91-windows-x64.exe (2)spark-1.6.0-bin-h ...
- 使用PySpark搭建机器学习模型
使用PySpark搭建机器学习模型 文章目录 使用PySpark搭建机器学习模型 前言 搭建回归模型 1.加载数据集 2.拆分数据集 3.创建模型 4&5 模型训练与预测 6.模型评估 绘制折 ...
- Windows下搭建PySpark环境
Windows下搭建PySpark环境 文章目录 Windows下搭建PySpark环境 前言 方法一 安装单机版Hadoop 安装单机版Spark PySpark环境整合 方法二 测试PySpark ...
- PyCharm搭建Spark开发环境windows下安装pyspark
目录 windows下安装pyspark PyCharm搭建Spark开发环境 windows下安装pyspark spark和hadoop版本版本之间有对应关系 安装jdk 安装hadoop 下载 ...
- Python应用实战案例-pyspark库从安装到实战保姆级讲解
01 pyspark简介及环境搭建 pyspark是python中的一个第三方库,相当于Apache Spark组件的python化版本(Spark当前支持Java Scala Python和R 4种 ...
- [Spark]PySpark入门学习教程---介绍(1)
一 安装指引 (91条消息) [Hadoop] mac搭建hadoop3.X 伪分布模式_小墨鱼的专栏-CSDN博客https://zengwenqi.blog.csdn.net/article/de ...
- PySpark与GraphFrames的安装与使用
PySpark环境搭建 配置hadoop spark访问本地文件并执行运算时,可能会遇到权限问题或是dll错误.这是因为spark需要使用到Hadoop的winutils和hadoop.dll,首先我 ...
最新文章
- localparam和parameter的区别
- 解决Ubuntu无法进行SSH连接的问题(以及如何使用SSH)
- go mysql存储过程_Golang 调用MySQL存储过程
- Spark K-Means
- routing zuul_尚学堂0131之zuul的相关概念及如何性能调优
- sql 如何查询上次的记录_学会SQL并不难,小白学习记录之五(多表查询)
- python安装过程的一些问题解决方案
- python socket thread,python实现socket+threading处理多连接的方法
- spring cloud微服务分布式云架构 - Spring Cloud集成项目简介( java ssm spring boot b2b2c o2o 多租户电子...
- heidi修改mysql的账号密码_HeidiSQL使用教程
- kaldi运行yesno例程
- 扩展数组方法 给数组原形链追加方法(原型链中的this)
- 【宅男福利】百度云下载不限速软件,电脑和看视频无广告软件
- 思科路由器学习初步基础--- CCNA入门
- android打包工具多渠道批量打包,android 二次打包完成apk多渠道打包的方法
- SQL Network Interfaces, error: 26 – Error Locating Server/Instance Specified
- php 生成斜体字,JavaScript italics方法入门实例(把字符串显示为斜体)
- (转)IBM MQ 创建以及常见问题集锦
- 使用扩展卡尔曼滤波(EKF)估计电池SOC(附MATLAB程序及详解)part1主函数篇
- 对话Atlassian认证专家叶燕秀:Atlassian产品进入后Server时代,中国用户应当何去何从?
热门文章
- MySQL-第五章-SQL基础
- 测试人经验谈:需求不明确也能写出测试用例
- excel中多条件判断求和
- 超图发布SuperMap GIS 8C(2017) 进一步挖掘空间大数据价值
- CTF题库—实验吧(密码学)之困在栅栏里的凯撒
- 1、某班期末考试科目为数学(MT)、英语(EN)和物理(PH),有最多不超过30人参加考试。考试后要求:(1)计算每个学生的总分和平均分;(2)按总分成绩由高到低排出成绩的名次;(3)打印出名
- 网易开源游戏服务器框架-Pomelo实践(一)
- HDU-4489-The King’s Ups and Downs
- 正点原子LoRa模块的使用
- 是哪些操作让你的亚马逊账号被封?站斧浏览器给你答案