PyS1:RDD编程基础

  • 0. 环境设置
  • 1. RDD的创建方式
    • 1.1 RDD的数据结构
    • 1.2 从本地文件创建RDD
    • 1.3 从集群文件创建RDD
    • 1.4 在代码当中创建RDD
  • 2. RDD的执行函数
    • 2.1 collect函数
    • 2.2 take函数
    • 2.3 first函数
    • 2.4 takeSample函数
    • 2.5 count函数
    • 2.6 reduce函数
    • 2.7 foreach函数
    • 2.8 countByKey函数
    • 2.9 saveAsTextFile函数
  • 3. RDD的转换函数
    • 3.1 map函数
    • 3.2 flatMap函数
    • 3.3 filter函数
    • 3.4 sample函数
    • 3.5 distinct函数
    • 3.6 subtract函数
    • 3.7 union函数
    • 3.8 intersection函数
    • 3.9 cartesian函数
    • 3.10 sortBy函数
    • 3.11 zip函数
    • 3.12 zipWithIndex函数
  • 4. pairRDD的转换函数
    • 4.1 keys函数
    • 4.2 values函数
    • 4.3 mapValues函数
    • 4.4 sortByKey函数
    • 4.5 subtractByKey函数
    • 4.6 reduceByKey函数
    • 4.7 combineByKey函数
    • 4.8 foldByKey函数
    • 4.9 groupByKey函数
    • 4.10 cogroup函数
    • 4.11 join函数
    • 4.12 leftOuterJoin函数
    • 4.13 rightOuterJoin函数

0. 环境设置

(pyspark) ubuntu@VM-0-12-ubuntu:~$ python
Python 3.7.13 (default, Mar 29 2022, 02:18:16)
[GCC 7.5.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import warnings
>>> warnings.filterwarnings('ignore')
>>> # 过滤掉提示
>>> from prettyprinter import pprint
>>> # 美化打印输出
>>> from prettytable import PrettyTable
>>> # 美化打印输出
>>>
>>> from pyspark import SparkContext, SparkConf
>>> conf = SparkConf().setAppName("rdd_tutorial").setMaster("local")
>>> sc = SparkContext(conf=conf)
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2022-04-22 21:20:49,436 WARN util.Utils: Your hostname, VM-0-12-ubuntu resolves to a loopback address: 127.0.1.1; using 172.16.0.12 instead (on interface eth0)
2022-04-22 21:20:49,437 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
2022-04-22 21:20:50,103 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
>>> # 创建Spark的上下文环境sc
>>> sc.setLogLevel("Error")
>>> # 设置日志的输出级别
>>> 

哇,我是真的,看到SLF4J的警告就害怕,回忆起在配置HBase时候的痛苦,希望在这里不要出问题。

1. RDD的创建方式

1.1 RDD的数据结构

RDD是一种无schema的数据结构,其与结构化的DataFrame是不同的,所以其支持turpledictlist等数据结构的混用。

>>> rdd = sc.parallelize([
...     ('Mickey','Minnie'),
...     {'Pluto','dog'},
...     ['Goofy','Donald']
... ])
>>> # 使用默认分区数量
>>> type(rdd)
<class 'pyspark.rdd.RDD'>
>>>
>>> # 查看当前分区数量
>>> rdd.getNumPartitions()
1
>>>
>>> rdd.collect()
[('Mickey', 'Minnie'), {'Pluto', 'dog'}, ['Goofy', 'Donald']]
>>> rdd.foreach(print)
('Mickey', 'Minnie')
{'Pluto', 'dog'}
['Goofy', 'Donald']
>>> 

1.2 从本地文件创建RDD

我按照之前在飞桨的操作,直接写入文件的路径,然后报错了

>>> rdd = sc.textFile('/home/ubuntu/foo.txt', 2)
>>> rdd.collect()
Traceback (most recent call last):
···
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.net.ConnectException: Call From VM-0-12-ubuntu/127.0.1.1 to localhost:9000 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
···

遇事不决先搜索,然后都说是HDFS没有开启,但是我明明加载的是本地文件啊

再看看报错,噢,说是9000端口没有返回数据,就是HDFS的问题,应该是把这个路径认作HDFS

但是明明在飞桨的环境当中是可以运行的啊,神奇;而且在教程1的第三节当中也是直接填入的本地路径

此外在书籍2的第二章,也就是13页,也是直接填入的本地路径

但是在书籍3的第四章,也就是54页,在本地路径之前加了file前缀,这个操作有效

估计可能是版本变化导致的问题吧,修正后如下:

>>> # 设置分区数量为2
>>> # 一般来说将分区设置为2-4个
>>> rdd = sc.textFile('file:/home/ubuntu/foo.txt', 2)
>>>
>>> type(rdd)
<class 'pyspark.rdd.RDD'>
>>>
>>> # 查看分区数量
>>> rdd.getNumPartitions()
2
>>> pprint(rdd.collect())
[                                                                               'ckey is a mouse.','Minnie is also a mouse.',"Pluto is their's pet, a dog.","Goofy is also a dog, but he's their friend and stand as human.",'Donald is a duck, also their friend.'
]
>>> 

1.3 从集群文件创建RDD

# rdd = sc.textFile('hdfs://localhost:9000/user/hadoop/foo.txt')
# rdd = sc.textFile('/user/hadoop/foo.txt')
# rdd = sc.textFile('foo.txt')

这种途径的创建语法和上文基本一致,只需要将路径改为HDFS的目录即可。以上三种创建方式效果都一样,但是第三种创建方式需要先将文件放在HDFS中当前Linux登录用户的用户目录下。由于我这里没有配置集群环境,所以就不做代码的运行操作了。

1.4 在代码当中创建RDD

这里就是我们在1.1节当中使用的parallelize函数,一般使用两个参数,第一个是创建RDD所使用的数据集,第二个是指定RDD的分区数量,分区数量可以使用默认值。

>>> rdd = sc.parallelize(range(0,5))
>>> rdd.collect()
[0, 1, 2, 3, 4]
>>> type(rdd)
<class 'pyspark.rdd.PipelinedRDD'>
>>>

比较神奇,这里RDD的类型变为了PipelinedRDD,可能是因为range的原因吧

2. RDD的执行函数

我们知道RDD存在一种惰性机制,在转换函数的处理过程当中,Spark只是记录了转换的轨迹,直到遇到执行函数才真正开始计算。例如在上面的1.2节当中,我们将本地路径当作HDFS读入创建RDD时,并没有报错,直到遇到.collect()函数才报错,也就是这个时候才真正执行代码。

2.1 collect函数

>>> rdd = sc.parallelize([0, 1, 2, 3, 4, 5], 2)
>>> pprint(rdd.collect())
[0, 1, 2, 3, 4, 5]
>>>

我们知道一个RDD的数据是有可能被存在几个不同的分区之上的,例如上面的代码就将数据存入了两个分区。而当我们需要查看整个RDD数据集的时候,往往需要将其汇集到Driver,这就是collect函数。需要注意的是,如果某一个RDD太过于庞大的话,汇集到一起是可能爆掉内存的,所以要谨慎使用这个函数。

2.2 take函数

>>> rdd = sc.parallelize([0, 1, 2, 3, 4, 5], 2)
>>> pprint(rdd.take(3))
[0, 1, 2]
>>>

相比于上面的取整体,take函数就是从开头取定量,这样就好了很多。

2.3 first函数

>>> rdd = sc.parallelize([0, 1, 2, 3, 4, 5], 2)
>>> pprint(rdd.first())
0
>>>

更简洁的情况就是只取第一个元素,这个函数不需要我们喂入参数,那是会报错的。

2.4 takeSample函数

>>> rdd = sc.parallelize([0, 1, 2, 3, 4, 5, 6])
>>> rdd.takeSample(False,5,0)
[4, 2, 1, 0, 5]
>>> rdd.takeSample(False,5,0)
[4, 2, 1, 0, 5]
>>> rdd.takeSample(False,5)
[6, 5, 1, 2, 4]
>>> rdd.takeSample(False,5)
[4, 1, 5, 6, 2]
>>>

RDD当作抽取出一部分数据,第一个参数决定这个抽取是放回式的还是不放回;第二个参数是抽取出的数量;第三个参数是随机数种子,用于固定抽取出的结果。需要我们注意的是这是一个执行函数,也就是说会立即进行计算返回结果。所以说如果抽取的数据过多,内存同样是可能会爆掉的。

2.5 count函数

>>> rdd = sc.parallelize([0, 1, 2, 3, 4, 5, 6])
>>> rdd.count()
7
>>>
>>> rdd = sc.textFile('file:/home/ubuntu/foo.txt')
>>> pprint(rdd.collect())
['ckey is a mouse.','Minnie is also a mouse.',"Pluto is their's pet, a dog.","Goofy is also a dog, but he's their friend and stand as human.",'Donald is a duck, also their friend.'
]
>>> rdd.count()
5

计算RDD内元素的数量

2.6 reduce函数

>>> rdd = sc.parallelize([0, 1, 2, 3, 4, 5, 6])
>>> rdd.reduce(lambda x,y:x+y)
21
>>>

一个规约函数,目前我只知道用来累加

2.7 foreach函数

>>> rdd.foreach(print)
0
1
2
3
4
5
6
>>> rdd.foreach(lambda x:print(x+1))
1
2
3
4
5
6
7
>>>

RDD当中的每个元素执行操作后返回其结果,感觉这个挺酷的。与map相比,foreach是执行函数,是立即执行且无返回值的。而map是转换函数,生成新的rdd及依赖关系,但并不执行。

2.8 countByKey函数

>>> pairRdd = sc.parallelize({...     'Mickey':'mouse',
...     'Minnie':'mouse',
...     'Pluto':'dog',
...     'Goofy':'dog',
...     'Donald':'duck'
... })
>>> pairRdd.countByKey()
defaultdict(<class 'int'>, {'M': 2, 'P': 1, 'G': 1, 'D': 1})
>>>

按照KeyPairRDD的元素进行统计,神奇的是好像只根据首字母来辨识Key

2.9 saveAsTextFile函数

>>> pairRdd = sc.parallelize({...     'Mickey':'mouse',
...     'Minnie':'mouse',
...     'Pluto':'dog',
...     'Goofy':'dog',
...     'Donald':'duck'
... })
>>> pairRdd.saveAsTextFile('file:/home/ubuntu/pairRdd.txt')
>>>

原本我真的以为是直接保存成为文本文件,但没想到的是:

ubuntu@VM-0-12-ubuntu:~$ tree
.
├── foo.txt
└── pairRdd.txt├── part-00000└── _SUCCESS1 directory, 3 files
ubuntu@VM-0-12-ubuntu:~$

好吧,分布式文件存储,再次读入试试:

>>> pairRdd = sc.textFile('file:/home/ubuntu/pairRdd.txt')
>>> pairRdd.collect()
['Mickey', 'Minnie', 'Pluto', 'Goofy', 'Donald']
>>>

好嘛,只保留了Key,挺神奇的;我直接读入字典格式的文件试试:

ubuntu@VM-0-12-ubuntu:~$ head bar.txt
{Mickey':'mouse','Minnie':'mouse','Pluto':'dog','Goofy':'dog','Donald':'duck'
}
ubuntu@VM-0-12-ubuntu:~$ 

结果如下:

>>> pairRdd = sc.textFile('file:/home/ubuntu/bar.txt')
>>> pairRdd.collect()
['{', "\tMickey':'mouse',", "\t'Minnie':'mouse',", "\t'Pluto':'dog',", "\t'Goofy':'dog',", "\t'Donald':'duck'", '}']
>>>

3. RDD的转换函数

3.1 map函数

>>> rdd = sc.parallelize(range(10))
>>> rdd.map(lambda x:x*x).collect()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

一个映射转换的操作,如图4

map相比,foreach是执行函数,是立即执行且无返回值的。而map是转换函数,生成新的rdd及依赖关系,但并不执行。

3.2 flatMap函数

>>> rdd = sc.textFile('file:/home/ubuntu/foo.txt')
>>> pprint(rdd.map(lambda x:x.split(' ')).collect())
[['Mickey', 'is', 'a', 'mouse.'],['Minnie', 'is', 'also', 'a', 'mouse.'],['Pluto', 'is', "their's", 'pet,', 'a', 'dog.'],['Goofy','is','also','a','dog,','but',"he's",'their','friend','and','stand','as','human.'],['Donald', 'is', 'a', 'duck,', 'also', 'their', 'friend.']
]
>>> pprint(rdd.flatMap(lambda x:x.split(' ')).collect())
['Mickey','is','a','mouse.','Minnie','is','also','a','mouse.','Pluto','is',"their's",'pet,','a','dog.','Goofy','is','also','a','dog,','but',"he's",'their','friend','and','stand','as','human.','Donald','is','a','duck,','also','their','friend.'
]
>>>

相比于上面的函数多加了一个拍平操作,将结果中的各个列表打散,如图4

3.3 filter函数

>>> rdd = sc.parallelize(range(10))
>>> rdd.map(lambda x:x>5).collect()
[False, False, False, False, False, False, True, True, True, True]
>>> rdd.map(lambda x:x if x>5 else 0).collect()
[0, 0, 0, 0, 0, 0, 6, 7, 8, 9]
>>> rdd.filter(lambda x:x>5).collect()
[6, 7, 8, 9]

看起来像是一个过滤的感觉,只留取判断为真的数据。如果是map的话,就是对整体的操作,没办法对元素个数做出改变。

3.4 sample函数

>>> rdd = sc.parallelize(range(10))
>>> rdd.sample(False,0.5).collect()
[0, 2, 3, 5, 6, 7, 8]
>>> rdd.sample(False,0.5).collect()
[0, 1, 2, 9]
>>> rdd.sample(False,0.5).collect()
[0, 2, 3, 5, 6, 8]
>>> rdd.sample(False,0.5,0).collect()
[1, 4, 9]
>>> rdd.sample(False, 0.5, 0).collect()
[1, 4, 9]
>>>

该函数也接收三个参数,第一个决定抽样是否放回,第三个决定随机数种子,第二个参数需要依赖第一个参数。如果是不放回抽样,那么第二个参数就代表RDD当中每条数据被抽取的概率;如果是放回抽样,第二个参乎上就意味着每条数据被抽取的可能的次数。挺奇怪的,效果好随机的样子,不太理解这个函数有啥用。

3.5 distinct函数

>>> rdd = sc.parallelize([1, 2, 2, 3, 3, 3, 4, 4, 4, 4])
>>> rdd.distinct().collect()
[1, 2, 3, 4]
>>> rdd = sc.parallelize([1, 2, 2, 3, 3, 3, 4, 4, 4, 4], 2)
>>> rdd.distinct().collect()
[2, 4, 1, 3]
>>>

用于对数据进行去除,如果是只有本地的分区,那么数据保持原顺序,多个分区的情况下就不能够保证了。

3.6 subtract函数

>>> rddA = sc.parallelize([0, 1, 2, 3, 4, 5])
>>> rddB = sc.parallelize([3, 4, 5, 6, 7, 8])
>>> rddA.subtract(rddB).collect()
[0, 2, 1]
>>> rddA.getNumPartitions()
1
>>> rddB.getNumPartitions()
1
>>> rddC = rddA.subtract(rddB)
>>> rddC.getNumPartitions()
2
>>> rddC.collect()
[0, 2, 1]
>>>

这个函数也挺神奇的,反正大概意思就是rddA减去其与rddB的交集,有趣的是这个新生成的rddC的默认分区是2,所以说得到的结果是乱序的。

3.7 union函数

>>> rddA = sc.parallelize([0, 1, 2, 3, 4, 5])
>>> rddB = sc.parallelize([3, 4, 5, 6, 7, 8])
>>> rddA.union(rddB).collect()
[0, 1, 2, 3, 4, 5, 3, 4, 5, 6, 7, 8]
>>> rddA.union(rddB).getNumPartitions()
2
>>> rddA.union(rddB).collect()
[0, 1, 2, 3, 4, 5, 3, 4, 5, 6, 7, 8]
>>>

这个就是直接把两个RDD合并了,而且还是没有去重的那种。神奇的是新生成的RDD明明是2个分区,但是结果的顺序却一点也没有乱。

3.8 intersection函数

>>> rddA = sc.parallelize([0, 1, 2, 3, 4, 5])
>>> rddB = sc.parallelize([3, 4, 5, 6, 7, 8])
>>> rddA.intersection(rddB).collect()
[4, 3, 5]
>>> rddA.intersection(rddB).getNumPartitions()
2
>>>

你看这个求交集的操作就正常多了,两个分区的结果就是乱序。

3.9 cartesian函数

>>> rddA = sc.parallelize(['Mickey', 'Minnie'])
>>> rddB = sc.parallelize(['run', 'walk', 'sit'])
>>> rddA.cartesian(rddB).getNumPartitions()
1
>>> pprint(rddA.cartesian(rddB).collect())
[('Mickey', 'run'),('Mickey', 'walk'),('Mickey', 'sit'),('Minnie', 'run'),('Minnie', 'walk'),('Minnie', 'sit')
]
>>>

该函数用来生成两个数据集的笛卡尔积,比较神奇的是返回结果只有一个分区

3.10 sortBy函数

>>> rdd = sc.parallelize([2, 4, 6, 1, 3, 7, 0, 9, 5, 6, 8])
>>> rdd.sortBy(lambda x: x).collect()
[0, 1, 2, 3, 4, 5, 6, 6, 7, 8, 9]
>>> rdd = sc.parallelize([2, 4, 6, 1, 3, -7, 0, 9, -5, 6, 8])
>>> rdd.sortBy(lambda x: x).collect()
[-7, -5, 0, 1, 2, 3, 4, 6, 6, 8, 9]
>>> rdd.sortBy(lambda x: abs(x)).collect()
[0, 1, 2, 3, 4, -5, 6, 6, -7, 8, 9]
>>> rdd.sortBy(lambda x: abs(x)).getNumPartitions()
1
>>> rdd = sc.parallelize([(1,2,3),(3,2,2),(4,1,1)])
>>> rdd.sortBy(lambda x:x[2]).foreach(print)
(4, 1, 1)
(3, 2, 2)
(1, 2, 3)
>>>

该函数的主要作用是排序,可以通过传入的函数去指定排序的方式,例如我们可以指定元组当中的第三个元素作为关键字去排序,也可以先对数据取绝对值后再排序。

3.11 zip函数

>>> rddA = sc.parallelize(['Mickey', 'Minnie', 'Pluto', 'Goofy', 'Donald'])
>>> rddB = sc.parallelize(['mouse', 'mouse', 'dog', 'dog', 'duck'])
>>> rddA.zip(rddB).getNumPartitions()
1
>>> pprint(rddA.zip(rddB).collect())
[('Mickey', 'mouse'),('Minnie', 'mouse'),('Pluto', 'dog'),('Goofy', 'dog'),('Donald', 'duck')
]
>>>

该函数的作用是打包配对,返回的结果数据是元组形式。

3.12 zipWithIndex函数

>>> rddA = sc.parallelize(['Mickey', 'Minnie', 'Pluto', 'Goofy', 'Donald'])
>>> pprint(rddA.zipWithIndex().collect())
[('Mickey', 0),('Minnie', 1),('Pluto', 2),('Goofy', 3),('Donald', 4)
]
>>>

该函数的作用也是打包,只是默认将数据与序号进行打包,为每一条数据分配对应的序号。

4. pairRDD的转换函数

pairRDD是指二元结构元组的数据,默认将第一个元素当作关键字,将第二个元素当作值。

4.1 keys函数

>>> rdd = sc.parallelize([
...     ('Mickey', 0),
...     ('Mickey', 1),
...     ('Mickey', 2),
...     ('Minnie', 3),
...     ('Minnie', 4),
...     ('Pluto', 5),
...     ('Goofy', 6),
...     ('Goofy', 7),
...     ('Donald', 8)
... ])
>>> pprint(rdd.keys().collect())
['Mickey','Mickey','Mickey','Minnie','Minnie','Pluto','Goofy','Goofy','Donald'
]

该函数的作用显而易见,取Key

4.2 values函数

>>> rdd = sc.parallelize([
...     ('Mickey', 0),
...     ('Mickey', 1),
...     ('Mickey', 2),
...     ('Minnie', 3),
...     ('Minnie', 4),
...     ('Pluto', 5),
...     ('Goofy', 6),
...     ('Goofy', 7),
...     ('Donald', 8)
... ])
>>> rdd.values().collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8]
>>>

该函数的作用显而易见,取Value

4.3 mapValues函数

>>> rdd = sc.parallelize([
...     ('Mickey', 0),
...     ('Mickey', 1),
...     ('Mickey', 2),
...     ('Minnie', 3),
...     ('Minnie', 4),
...     ('Pluto', 5),
...     ('Goofy', 6),
...     ('Goofy', 7),
...     ('Donald', 8)
... ])
>>> pprint(rdd.mapValues(lambda x: x+1).collect())
[('Mickey', 1),('Mickey', 2),('Mickey', 3),('Minnie', 4),('Minnie', 5),('Pluto', 6),('Goofy', 7),('Goofy', 8),('Donald', 9)
]
>>>

仅对Value做出改变,不影响Key

4.4 sortByKey函数

>>> rdd = sc.parallelize([
...     'Mickey', 'Mickey', 'Mickey',
...     'Minnie', 'Minnie',
...     'Pluto',
...     'Goofy', 'Goofy',
...     'Donald'
... ])
>>> rdd = rdd.zipWithIndex()
>>> pprint(rdd.sortByKey(False).collect())
[('Pluto', 5),('Minnie', 3),('Minnie', 4),('Mickey', 0),('Mickey', 1),('Mickey', 2),('Goofy', 6),('Goofy', 7),('Donald', 8)
]
>>> pprint(rdd.sortByKey().collect())
[('Donald', 8),('Goofy', 6),('Goofy', 7),('Mickey', 0),('Mickey', 1),('Mickey', 2),('Minnie', 3),('Minnie', 4),('Pluto', 5)
]
>>>

该函数的作用是根据Key值进行排序,默认是升序,可以将参数改为False进行降序排序

4.5 subtractByKey函数

>>> rddA = sc.parallelize([
...     ('Mickey', 'mouse'),
...     ('Pluto', 'dog'),
...     ('Goofy', 'dog'),
...     ('Donald', 'duck')
... ])
>>> rddB = sc.parallelize([
...     ('Mickey', 'sit'),
...     ('Minnie', 'run'),
...     ('Goofy', 'stand'),
...     ('Donald', 'walk')
... ])
>>> rddC = rddA.subtractByKey(rddB)
>>> pprint(rddC.collect())
[('Pluto', 'dog')]
>>>

该函数与上面的subtract函数效果类似,都是用当前的数据集减去交集的部分来得到最终的结果,只不过该函数是以Key为关键字。

4.6 reduceByKey函数

>>> rdd = sc.parallelize([
...     'Mickey', 'Mickey', 'Mickey',
...     'Minnie', 'Minnie',
...     'Pluto',
...     'Goofy', 'Goofy',
...     'Donald'
... ])
>>> rdd = rdd.zipWithIndex()
>>> pprint(rdd.collect())
[('Mickey', 0),('Mickey', 1),('Mickey', 2),('Minnie', 3),('Minnie', 4),('Pluto', 5),('Goofy', 6),('Goofy', 7),('Donald', 8)
]
>>> pprint(rdd.countByKey())
collections.defaultdict(int,{'Mickey': 3,'Minnie': 2,'Pluto': 1,'Goofy': 2,'Donald': 1}
)
>>> pprint(rdd.reduceByKey(lambda x,y:x+y).collect())
[('Mickey', 3),('Minnie', 7),('Pluto', 5),('Goofy', 13),('Donald', 8)
]
>>>

该函数的作用是根据二元关系当中的Key去对数据进行累计,神奇的是countByKey()在对元组数据的处理上表现正常,前面表现不正常的是字典数据。

4.7 combineByKey函数

>>> rdd = sc.parallelize([
...     ('Mickey', 0),
...     ('Mickey', 1),
...     ('Mickey', 2),
...     ('Minnie', 3),
...     ('Minnie', 4),
...     ('Pluto', 5),
...     ('Goofy', 6),
...     ('Goofy', 7),
...     ('Donald', 8)
... ])
>>> def initCombiner(firstValue):
...     return (firstValue, 1)
...
>>> def mergeValues(combiner, value):
...     return (combiner[0]+value, combiner[1]+1)
...
>>> def mergeCombiner(combinerA, combinerB):
...     return (combinerA[0]+combinerB[0], combinerA[1]+combinerB[1])
...
>>> res = rdd.combineByKey(initCombiner, mergeValues, mergeCombiner).collect()
>>> pprint(res)
[('Mickey', (3, 3)),('Minnie', (7, 2)),('Pluto', (5, 1)),('Goofy', (13, 2)),('Donald', (8, 1))
]
>>> 

这个函数的作用有点像聚合的意思,传入的参数是三个函数。

第一个函数是决定数据是初始化方式,也就是每当我们在某个分区当中读取到一个新的、在该分区内不存在CombinerKey的时候,我们要对它的Value做出怎样的处理操作,来为这个Key生成对应的Combiner

第二个函数决定的是数据的合并方式,也就是当我们遇到Key值在当前分区内存在对应Combiner的时候,我们要怎样把这个Key对应的Value合并到Combiner里面去。

第三个函数决定Combiner的合并方式,也就是如何将某个Key在不同分区的Combiner合并起来。

我们在上文当中采取独立定义函数的方式来使得展示的效果更加清晰

第一个函数传入的参数是一个在其所属分区内不存在CombinerKey所对应的Value,其初始化结果是生成一个元组也就是Combiner。该元组的第一个元素是正在处理的Key所对应的Value,第二个元素则是1。我们在这里给第二个元素的定义的该Key出现的次数,初始化该KeyCombiner的时候捕捉到该Key的一次出现,所以出现次数记为1

第二个函数传入的参数依次是该Key在当前分区的Combiner以及现在读取到的Value值,其合并结果也是一个元组。该元组第一个元素是当前所读取到的Value与该KeyCombiner当中累计的Value值的和,第二个元素则是该Key记录在Combiner当中的出现次数+1,因为本次又捕捉到了该Key的一次出现。

第三个函数传入的参数是某个Key在两个不同分区的Combiner,其合并结果同样还是一个元组。第一个元素是两个分区的Combiner当中记录的Value值之和,第二个元素是两个分区的Combiner当中记录的当前Key的出现次数之和。

4.8 foldByKey函数

>>> rdd = sc.parallelize([
...     'Mickey', 'Mickey', 'Mickey',
...     'Minnie', 'Minnie',
...     'Pluto',
...     'Goofy', 'Goofy',
...     'Donald'
... ])
>>> rdd = rdd.zipWithIndex()
>>> pprint(rdd.collect())
[('Mickey', 0),('Mickey', 1),('Mickey', 2),('Minnie', 3),('Minnie', 4),('Pluto', 5),('Goofy', 6),('Goofy', 7),('Donald', 8)
]
>>> pprint(rdd.reduceByKey(lambda x,y:x+y).collect())
[('Mickey', 3),('Minnie', 7),('Pluto', 5),('Goofy', 13),('Donald', 8)
]
>>> pprint(rdd.foldByKey(0, lambda x,y:x+y).collect())
[('Mickey', 3),('Minnie', 7),('Pluto', 5),('Goofy', 13),('Donald', 8)
]
>>> pprint(rdd.foldByKey(1, lambda x,y:x+y).collect())
[('Mickey', 4),('Minnie', 8),('Pluto', 6),('Goofy', 14),('Donald', 9)
]
>>>

该函数的效果与reduceByKey十分相似,只不过该函数需要加一个参数作为累计的初始值,在Python当中reduce函数自带初始值功能。

4.9 groupByKey函数

>>> rdd = sc.parallelize([
...     'Mickey', 'Mickey', 'Mickey',
...     'Minnie', 'Minnie',
...     'Pluto',
...     'Goofy', 'Goofy',
...     'Donald'
... ])
>>> rdd = rdd.zipWithIndex()
>>> pprint(rdd.collect())
[('Mickey', 0),('Mickey', 1),('Mickey', 2),('Minnie', 3),('Minnie', 4),('Pluto', 5),('Goofy', 6),('Goofy', 7),('Donald', 8)
]
>>> rdd.groupByKey().foreach(print)
('Mickey', <pyspark.resultiterable.ResultIterable object at 0x7fe783452650>)
('Minnie', <pyspark.resultiterable.ResultIterable object at 0x7fe783452550>)
('Pluto', <pyspark.resultiterable.ResultIterable object at 0x7fe783452650>)
('Goofy', <pyspark.resultiterable.ResultIterable object at 0x7fe783452550>)
('Donald', <pyspark.resultiterable.ResultIterable object at 0x7fe783452650>)
>>> rdd.groupByKey().foreach(lambda x:print(x[0], [t for t in x[1]]))
Mickey [0, 1, 2]
Minnie [3, 4]
Pluto [5]
Goofy [6, 7]
Donald [8]
>>> rdd.groupByKey().foreach(lambda x:print(x[0], list(x[1])))
Mickey [0, 1, 2]
Minnie [3, 4]
Pluto [5]
Goofy [6, 7]
Donald [8]
>>> rdd.groupByKey().foreach(lambda x:print(x[0], tuple(x[1])))
Mickey (0, 1, 2)
Minnie (3, 4)
Pluto (5,)
Goofy (6, 7)
Donald (8,)
>>> rdd.groupByKey().foreach(lambda x:print(x[0], sum(x[1])))
Mickey 3
Minnie 7
Pluto 5
Goofy 13
Donald 8
>>>

该函数的作用是将具有相同Key值的Value收集起来作为一个可迭代对象,我们可以直接对这个可迭代对象应用求和函数,与上面的reduceByKey函数十分相似,也可以应用其他的函数。

4.10 cogroup函数

>>> rddA = sc.parallelize([
...     ('Mickey', 'mouse'),
...     ('Pluto', 'dog'),
...     ('Goofy', 'dog'),
...     ('Goofy', 'human'),
...     ('Donald', 'duck')
... ])
>>> rddB = sc.parallelize([
...     ('Mickey', 'sit'),
...     ('Mickey', 'eat'),
...     ('Mickey', 'walk'),
...     ('Minnie', 'run'),
...     ('Goofy', 'stand'),
...     ('Goofy', 'jump'),
...     ('Donald', 'walk')
... ])
>>> rddC = rddA.cogroup(rddB)
>>> pprint(rddC.collect())
[('Mickey',(<pyspark.resultiterable.ResultIterable object at 0x7f10c192bad0>,<pyspark.resultiterable.ResultIterable object at 0x7f10c17fe410>)),('Goofy',(<pyspark.resultiterable.ResultIterable object at 0x7f10c192ba90>,<pyspark.resultiterable.ResultIterable object at 0x7f10c29eddd0>)),('Minnie',(<pyspark.resultiterable.ResultIterable object at 0x7f10c1817cd0>,<pyspark.resultiterable.ResultIterable object at 0x7f10c1817d90>)),('Pluto',(<pyspark.resultiterable.ResultIterable object at 0x7f10c1806ad0>,<pyspark.resultiterable.ResultIterable object at 0x7f10c1817350>)),('Donald',(<pyspark.resultiterable.ResultIterable object at 0x7f10c1817890>,<pyspark.resultiterable.ResultIterable object at 0x7f10c1817390>))
]
>>> rddC.foreach(lambda x:print(x[0], (list(x[1][0]), list(x[1][1]))))
Mickey (['mouse'], ['sit', 'eat', 'walk'])
Goofy (['dog', 'human'], ['stand', 'jump'])
Minnie ([], ['run'])
Pluto (['dog'], [])
Donald (['duck'], ['walk'])
>>>

该函数的作用是先把每个数据集进行goupByKey,然后再对两个结果进行goupByKey。在例子当中我们可以看到,高飞的狗属性和人类属性同时在第一个列表,它们是第一个数据集的数据,即第一个数据集goupByKey的结果;而站立行为和跳跃行为同时在第二个列表,这是第二个数据集的结果;这两个列表在同一个元组内,是这个函数的最终结果,即对上面的两个结果进行goupByKey

4.11 join函数

>>> rddA = sc.parallelize([
...     ('Mickey', 'mouse'),
...     ('Pluto', 'dog'),
...     ('Goofy', 'dog'),
...     ('Donald', 'duck')
... ])
>>> rddB = sc.parallelize([
...     ('Mickey', 'sit'),
...     ('Minnie', 'run'),
...     ('Goofy', 'stand'),
...     ('Donald', 'walk')
... ])
>>> rddC = rddA.join(rddB)
>>> rddC.getNumPartitions()
2
>>> pprint(rddC.collect())
[('Mickey', ('mouse', 'sit')),('Goofy', ('dog', 'stand')),('Donald', ('duck', 'walk'))
]
>>>

该函数的作用是对两个数据集进行内连接,也就是取Key的交集,然后拼接这些Key对应的Value生成新的数据集

4.12 leftOuterJoin函数

>>> rddA = sc.parallelize([
...     ('Mickey', 'mouse'),
...     ('Pluto', 'dog'),
...     ('Goofy', 'dog'),
...     ('Donald', 'duck')
... ])
>>> rddB = sc.parallelize([
...     ('Mickey', 'sit'),
...     ('Minnie', 'run'),
...     ('Goofy', 'stand'),
...     ('Donald', 'walk')
... ])
>>> rddC = rddA.leftOuterJoin(rddB)
>>> pprint(rddC.collect())
[('Mickey', ('mouse', 'sit')),('Goofy', ('dog', 'stand')),('Pluto', ('dog', None)),('Donald', ('duck', 'walk'))
]
>>>

该函数的作用是左连接,也就是将右边数据集补充到左边的数据集上,给对应的Key添加对应的Value,没有的话就是None

4.13 rightOuterJoin函数

>>> rddA = sc.parallelize([
...     ('Mickey', 'mouse'),
...     ('Pluto', 'dog'),
...     ('Goofy', 'dog'),
...     ('Donald', 'duck')
... ])
>>> rddB = sc.parallelize([
...     ('Mickey', 'sit'),
...     ('Minnie', 'run'),
...     ('Goofy', 'stand'),
...     ('Donald', 'walk')
... ])
>>> rddC = rddA.rightOuterJoin(rddB)
>>> pprint(rddC.collect())
[('Mickey', ('mouse', 'sit')),('Goofy', ('dog', 'stand')),('Minnie', (None, 'run')),('Donald', ('duck', 'walk'))
]
>>>

该函数的作用是右连接,效果类似上面的函数


  1. 10天吃掉那只pyspark ↩︎

  2. PySpark实战指南:利用Python和Spark构建数据密集型应用并规模化部署 ↩︎

  3. Spark编程基础(Python版) ↩︎

  4. 第4章-RDD编程 ↩︎ ↩︎

PyS2:RDD编程基础(一)相关推荐

  1. spark编程基础python版实验报告_Spark编程基础(Python版)

    章 大数据技术概述 1.1 大数据概念与关键技术 1.1.1 大数据的概念 1.1.2 大数据关键技术 1.2 代表性大数据技术 1.2.1 Hadoop 1.2.2 Spark 1.2.3 Flin ...

  2. 【大数据开发】SparkCore——Spark作业执行流程、RDD编程的两种方式、简单算子

    文章目录 一.Spark作业执行流程(重点) 二.RDD编程 2.1创建RDD的⼆种⽅式: 2.2Transformation算⼦ 2.3Action算子 三.简单算子(必须掌握) 3.1 map.m ...

  3. Spark:RDD编程总结(概述、算子、分区、共享变量)

    目录 1.RDD概述 1.1.RDD是什么 1.2.RDD的弹性 1.3.RDD的特点 1.3.1.分区 1.3.2.只读 1.3.3.依赖 1.3.4.缓存 1.3.5.检查点 2.RDD编程 2. ...

  4. spark编程基础python版 pdf_Spark编程基础Python版-第5章-Spark-SQL.pdf

    <Spark编程基础(Python版)> 教材官网:/post/spark-python/ 温馨提示:编辑幻灯片母版,可以修改每页PPT的厦大校徽和底部文字 第5章Spark SQL (P ...

  5. 关于介绍编程前景的html文档,HTML编程基础稿件(32页)-原创力文档

    * * 第2章 HTML编程基础 优选文档 * 内容提要 本章首先介绍HTML的发展历史,然后介绍HTML的基本框架 详细介绍了HTML的各种常用标记:文字标记.图片标记和超级链接标记,等等. 介绍C ...

  6. QT开发(五十)——QT串口编程基础

    QT开发(五十)--QT串口编程基础 一.QtSerialPort简介 1.串口通信基础 目前使用最广泛的串口为DB9接口,适用于较近距离的通信.一般小于10米.DB9接口有9个针脚. 串口通信的主要 ...

  7. 编程基础 垃圾回收_为什么我回收编程问题

    编程基础 垃圾回收 by Amy M Haddad 通过艾米·M·哈达德(Amy M Haddad) 为什么我回收编程问题 (Why I Recycle Programming Problems) M ...

  8. Spark入门系列(二)| 1小时学会RDD编程

    作者 | 梁云1991 转载自Python与算法之美(ID:Python_Ai_Road) 导读:本文为 Spark入门系列的第二篇文章,主要介绍 RDD 编程,实操性较强,感兴趣的同学可以动手实现一 ...

  9. Python要了解哪些编程基础 如何学Python比较好

    Python要了解哪些编程基础?如何学Python比较好?不管怎么说,Python都是大家进军IT行业值得选择的语言.毕竟它不但使用,而且还好用.更值得一提的是,它非常容易入门.而且在人工智能.传统编 ...

  10. (2)神经网络编程基础

    文章目录 神经网络编程基础 2.1 二分分类 2.2 logistic 回归(logistic Regression) 2.3 logistic 回归损失函数 损失函数 (Loss (Error)Fu ...

最新文章

  1. php 5/0,PHP 5.5.0 released.该怎么解决
  2. c语言 爬虫 socket,爬虫遇到 Socket,莫慌,肝就完了!
  3. 启动php服务命令,启动|停止服务
  4. python企业微信回调_python 微信企业号-回调模式接收微信端客户端发送消息并被动返回消息...
  5. 210108阶段三进程管理,多进程编程
  6. python本地编译器_Python学习札记(0)——Python开发环境搭载及推荐几款Python编译器...
  7. CodeForces - 1017D The Wu
  8. c语言解数独算法递归,How to think:递归和数独试探算法
  9. MongoDB数据库基础教程
  10. PyCharm创造起名自动生成起名·创造者·日期·时间模板
  11. 介绍一个很好的英语学习软件——单词风暴
  12. 服务器系统试用,“雪豹”安装篇(3)
  13. 源自神话的写作要义之英雄之旅
  14. 时间同步服务器,保障您的计算机系统时钟精准
  15. java使用免费日期API获取当年节假日
  16. 自动化测试面试题及答案,看完后吊打面试官!
  17. 【高德地图进阶】--- 带图片的点(1)
  18. OpenAI ChatGPT,爆火的OpenAi的ChatGPT聊天机器人注册和使用攻略,满满诚意哦
  19. include,include_once,require,require_once的区别
  20. php引用公有类方法_php利用ReflectionClass反射机制获取类public公有方法

热门文章

  1. 批量重命名文件、图片、去除括号
  2. 通俗易懂专利分类、专利申请流程
  3. 专利写作技巧以及流程
  4. 如何删除PPT中自带的切换动画?
  5. 第三次作业(尤心心)
  6. 电子式电能表试行检定规程
  7. App 快捷方式——创建快捷方式
  8. 向量场_方向向量和梯度
  9. imp遇到重复数据_oracle的imp导入时覆盖目标数据库
  10. 互联网金融系统技术沙龙:小米风控实践