Jbd7:Spark

  • 教程地址
  • 0. 引言
  • 1. Spark概述
    • 1.1 Spark的诞生
    • 1.2 Spark与Hadoop、MapReduce、HDFS的关系
    • 1.3 Spark生态体系
  • 2. Spark编程模型
    • 2.1 RDD概述
    • 2.2 RDD定义
    • 2.3 RDD五大特性
      • 2.3.1 分区
      • 2.3.2 并行计算
      • 2.3.3 依赖关系
      • 2.3.4 KV分区器
      • 2.3.5 优先位置列表
    • 2.4 RDD操作函数
  • 3 Spark架构原理
    • 3.1 Spark计算阶段
    • 3.2 如何划分计算阶段
    • 3.3 Spark 作业管理
    • 3.4 Spark 执行过程
  • 4. Spark 编程实战
    • 4.1 实验一:Spark Local模式的安装
      • 4.1.1 实验准备
      • 4.1.2 实验内容
      • 4.1.3 实验步骤
        • 4.1.3.1 解压安装包
        • 4.1.3.2 更改文件夹名和所属用户
        • 4.1.3.3 修改spark-env.sh
        • 4.1.3.4 设置Spark的环境变量
        • 4.1.3.5 检验Spark是否成功部署
    • 4.2 实验二:通过WordCount观察Spark RDD执行流程
      • 4.2.1 实验准备
      • 4.2.2 实验内容
      • 4.2.3 实验步骤
        • 4.2.3.1 文本数据准备
        • 4.2.3.2 本地模式启动spark-shell
        • 4.2.3.3 创建SparkContext对象
        • 4.2.3.4 创建RDD
        • 4.2.3.5 对数据进行转换处理
        • 4.2.3.6 打印数据
      • 4.2.4 WordCount在RDD的运行原理
        • 4.2.4.1 textFile操作
        • 4.2.4.2 flatMap操作
        • 4.2.4.3 map操作
        • 4.2.4.4 reduceByKey操作
        • 4.2.4.5 输出

教程地址

https://github.com/datawhalechina/juicy-bigdata/

0. 引言

内存储存的是我们正在使用的资源,磁盘储存的是我们暂时用不到的资源

可以把磁盘理解为一个仓库,而内存是进出这个仓库的通道

仓库(磁盘)很大,而通道(内存)很小,通道就很容易塞满

也可以这样理解:

MapReduce每一个步骤发生在内存中,其中间值(溢写文件)会写入在磁盘

下一步操作时又会将这个中间值merge到内存中,如此循环直到最终完成计算

而Spark的每个步骤也是发生在内存之中,但中间值会直接进入下一个步骤

直到所有的步骤完成之后,才会将最终结果保存进磁盘

所以Spark较少进行很多次相对没有意义的读写,节省大量的时间

1. Spark概述

1.1 Spark的诞生

直接看图吧:

1.2 Spark与Hadoop、MapReduce、HDFS的关系

Hadoop处理大数据的流程:首先从HDFS读取输入数据;

接着在 Map 阶段调用mapper function;然后把结果写入磁盘;

在Reduce阶段,从各个处于Map阶段的机器中读取Map计算的中间结果,

使用用户定义的reduce function,最后把结果写回HDFS。

以上过程至少有三次数据读写,即其处理流程高度依赖磁盘读写

如果任务的计算逻辑更复杂,Hadoop在数据处理上就出现了瓶颈

Spark在这样的背景下产生,它不像Hadoop一样采取磁盘读写

而是基于性能更高的内存存储来进行数据存储和读写

注意,这里说的是计算数据的存储,而非持久化的存储

但是Spark并非完美,缺乏对数据存储这一块的支持,即没有分布式文件系统,

其必须依赖外部的数据源,例如HDFS或者其他的文件系统,甚至是MySQL

总之,Hadoop和Spark两者都是大数据框架,但是各自存在的目的不同

Hadoop实质上是一个分布式数据基础设施,用于分布式存储并具有计算处理的功能

Spark则不会进行分布式数据的存储,只是一个计算分布式数据的工具

可以看做是MapReduce的竞品,准确的说是SparkCore作为MapReduce的竞品

综上所述,见下图:

1.3 Spark生态体系

以Spark为基础,其体系内有支持SQL语句的SparkSQL

有支持流计算的Spark Streaming,有支持机器学习的MLlib

还有支持图计算的GraphX,其快速且通用的表现如下:

  1. 速度方面:

    Spark的一个主要特点是能够在内存中进行计算

    其速度要比MapReduce计算模型更加高效

    可以面向海量数据的复杂任务进行分析处理

  2. 通用方面:

    Spark框架可以针对任何业务类型分析进行处理

    比如SparkCore离线批处理、SparkSQL交互式分析、

    SparkStreaming和StructuredStreamig流式处理

    以及机器学习和图计算,这些都可以完成

2. Spark编程模型

2.1 RDD概述

RDD是Spark的核心概念,是弹性数据集(Resilient Distributed Datasets)的缩写

RDD既是Spark面向开发者的编程模型,又是Spark自身架构的核心元素

MapReduce的设计类似面向过程进行代码编写工作的大数据计算方式

针对输入数据,根据计算过程分为两个阶段,Map阶段和Reduce阶段

而Spark则直接针对数据进行编程,将大规模数据集合抽象成一个RDD对象

然后对DD上进行处理,得到一个新的RDD,并继续计算直至得到最后结果

所以Spark的设计类似面向对象进行代码编写工作的大数据计算方式

2.2 RDD定义

RDD是分布式内存的一个抽象概念,是只读的记录分区集合

RDD能够横跨集群所有节点进行并行计算,符合分布式的特点

Spark建立在抽象的RDD上,可用统一的方式处理不同的大数据应用场景

其方式为将输入数据转化为RDD,然后对RDD进行一系列的算子运算

接着会通过丰富的API来操作数据,从而得到最终的运算结果

2.3 RDD五大特性

2.3.1 分区

分区的含义是允许Spark将计算以分区为单位,分配到多个机器上并行计算

在某些情况下,比如从HDFS读取数据时,Spark会使用位置信息就近分配

也就是将计算工作发给靠近数据的机器,减少跨网络传输的数据量

2.3.2 并行计算

RDD的每一个分区都会被一个计算任务(Task)处理

每个分区有计算函数(具体执行的计算算子)

计算函数以分片为基本单位进行并行计算

RDD的分区数决定着并行计算的数量

2.3.3 依赖关系

依赖关系列表会告诉Spark如何从必要的输入来构建RDD

当遇到错误需要重算时,Spark可以根据这些依赖关系重新执行操作

这样RDD就得到了重建,即容错机制,由依赖关系赋予

2.3.4 KV分区器

想要理解分区器的概念,我们需要先来比较一下MapReduce的任务机制。

Map阶段的Shuffle过程中会对中间结果进行分片,即根据key进行划分

分片的数量就是Reduce Task的数量,而具体分片的策略由分区器Partitioner决定

Spark目前支持Hash分区(默认分区)和Range分区,用户也可以自定义分区

总之,Partitioner决定了RDD如何分区和下一步的分片数

以及当前并行Shuffle输出的并行数据,使得Spark可以控制数据在不同节点上分区。

值得注意的是,其本身只针对于key-value的形式(key-value形式的RDD才有Partitioner属性)

Partitioner会从0到numPartitions-1区间内映射每一个key到partition ID上

2.3.5 优先位置列表

大数据计算的基本思想是:“移动计算而非移动数据”

Spark在进行调度时,会尽可能将任务分配到计算节点附近

因此在具体计算前,就需要知道它运算的数据在什么地方

所以,分区位置列表会存储每个Partition的优先位置

如果读取的是HDFS文件,这个列表保存的就是每个分区所在的block块的位置

2.4 RDD操作函数

RDD的操作函数包括两种:转换(transformation)函数和执行(action)函数

一种是转换(transformation)函数,这种函数的返回值还是RDD

另一种是执行(action)函数,这种函数不返回RDD

RDD中定义的转换操作函数有:

  • 用于计算的map(func)函数

  • 用于过滤的filter(func)函数

  • 用于合并数据集的union(otherDataset)函数

  • 用于根据key聚合的reduceByKey(func, [numPartitions])函数

  • 用于连接数据集的join(otherDataset, [numPartitions])函数

  • 用于分组的groupByKey([numPartitions])函数等

跟MapReduce一样,Spark也是对大数据进行分片计算

Spark分布式计算的数据分片、任务调度都是以RDD为单位展开的

每个RDD分片都会分配到一个执行进程中进行处理

RDD上的转换操作分成两种:

  1. 不产生新的分片

    比如map、filter等操作产生的RDD不会出现新的分片

    一个RDD数据分片,经过map或者filter转换操作后,其结果还在当前的分片中

    就像用map函数对每个数据加1,得到的还是这样一组数据,只是值不同

    实际上,Spark并不是按照代码写的操作顺序生成RDD

    比如rdd2 = rdd1.map(func)这样的代码并不会在物理上生成一个新的RDD

    物理上,Spark只有在产生新的RDD分片时候,才会真的生成一个RDD

    Spark的这种特性也被称作惰性计算

  2. 会产生新的分片

    转换操作产生的RDD会产生新的分片,比如reduceByKey

    来自不同分片的相同key 必须聚合在一起进行操作,这样就会产生新的RDD分片

    实际执行过程中,是否会产生新的RDD分片,并不是根据转换函数名就能判断出来的

3 Spark架构原理

3.1 Spark计算阶段

MapReduce中,一个应用一次只运行一个map和一个reduce

而Spark可以根据应用的复杂程度,将过程分割成更多的计算阶段(stage)

这些计算阶段组成一个有向无环图(DAG)

Spark任务调度器根据DAG的依赖关系执行计算阶段(stage)

因为某些机器学习算法可能需要进行大量的迭代计算,产生数万个计算阶段

这些计算阶段在一个应用中处理完成,而不是像MapReduce那样需要启动数万个应用

Spark比MapReduce快100多倍,正是因为这样的设计极大地提高了运行效率。

DAG是有向无环图,即是说不同阶段的依赖关系是有向的,计算过程只能沿着依赖关系方向执行

被依赖的阶段执行完成之前,依赖的阶段不能开始执行

同时,这个依赖关系不能是环形依赖,否则就造成死循环

下面这张图描述了一个典型的Spark运行DAG的不同阶段:


从图上看,整个应用被切分成3个阶段

阶段3需要依赖阶段1和阶段2,阶段1和阶段2互不依赖

Spark在执行调度时,先执行阶段1和阶段2,完成以后再执行阶段3

如果有更多的阶段,Spark的策略是一样的

Spark大数据应用的计算过程为:

Spark会根据程序初始化DAG,由DAG再建立依赖关系

根据依赖关系顺序执行各个计算阶段

Spark作业调度执行核心是DAG

由DAG可以得出整个应用就被切分成哪些阶段及其依赖关系

再根据每个阶段要处理的数据量生成相应的任务集合(TaskSet)

为每个任务都分配一个相对应的任务进程去处理该任务的计算工作

而DAG则由对应的DAGScheduler组件负责应用DAG的生成和管理

DAGScheduler会根据程序代码生成DAG

然后将程序分发到分布式计算集群

按计算阶段的先后关系调度执行

3.2 如何划分计算阶段

从上面来看,只有这一个地方发生了阶段转换

也就是,RDD之间的转换连接线呈现多对多交叉连接的时候,就会产生新的阶段

图中每个RDD里面都包含多个小块,每个小块都表示RDD的一个分片

我们知道,一个RDD表示一个数据集

该数据集中的多个数据分片需要进行分区传输

传输到另一个数据集并写入不同分片中

这种涉及到数据分区交叉传输的操作,在MapReduce的过程也存在:


MapReduce把这种从数据集跨越,由多个分区传输的过程,叫做Shuffle

同样,Spark也需要通过Shuffle将数据进行重新组合,把相同key的数据放一起

由于会进行新的聚合、关联等操作,所以Spark每次Shuffle都会产生新的计算阶段

而每次计算时,需要的数据都是由前面一个或多个计算阶段产生的

所以计算阶段需要依赖关系,必须等待前面的阶段执行完毕后,才能进行Shuffle

计算阶段划分的依据是Shuffle而不是操作函数的类型,并不是所有的函数都有Shuffle

比如Spark计算阶段示例图中,RDD B和RDD F进行join后,得到RDD G

RDD B不需要Shuffle,因为RDD B在上一个阶段中,已经进行了数据分区

分区数和分区key不变,就不需要进行Shuffle

而RDD F的分区数不同,就需要进行Shuffle

Spark把不需要Shuffle的依赖,称为窄依赖

需要Shuffle的依赖,称为宽依赖

虽然都有Shuffle,但是Spark会比MapReduce更高效:

  1. 从本质上:

    Spark可以算是一种MapReduce计算模型的不同实现

    Hadoop MapReduce根据Shuffle将大数据计算分为Map和Reduce两个阶段

    而Spark更流畅,将前一个的Reduce和后一个的Map进行连接,当作一个阶段进行计算

    从而形成了一个更高效流畅的计算模型,但其本质仍然是Map和Reduce

    但是这种多个计算阶段依赖执行的方案可以有效减少对HDFS的访问(落盘)

    也就是减少作业的调度执行次数,因此执行速度也更快

  2. 从存储方式上:

    MapReduce主要使用磁盘存储Shuffle过程的数据

    而Spark优先使用内存进行数据存储(RDD也优先存于内存)

    这也是Spark比Hadoop性能高的另一个原因

3.3 Spark 作业管理

Spark的DAGScheduler遇到Shuffle时,会生成一个计算阶段

在遇到action函数时,会生成一个作业(Job)

RDD里的每个数据分片,Spark都会创建一个计算任务进行处理

一个计算阶段会包含多个计算任务(Task),而一个作业至少包含一个计算阶段

每个计算阶段由多个任务组成,这些任务(Task)组成一个任务集合。

DAGScheduler根据代码生成DAG图,Spark的任务调度以任务为单位进行分配

将任务分配到分布式集群的不同机器上进行执行。

3.4 Spark 执行过程

Spark支持多种部署方案(Standalone、Yarn、Mesos等),但大体上相似

  1. 构造DAG

    首先,Spark在自己的JVM进程里启动应用程序,即Driver进程

    启动后,Driver调用SparkContext初始化执行配置和输入数据

    再由SparkContext启动DAGScheduler构造执行的DAG图

    切分成计算任务这样的最小执行单位

  2. 请求资源

    接着,Driver向Cluster Manager请求计算资源,用于DAG的分布式计算

    ClusterManager收到请求以后

    将Driver的主机地址等信息通知给集群的所有计算节点Worker

  3. 发送任务

    最后,Worker收到信息后,根据Driver的主机地址,向Driver通信并注册

    然后根据自己的空闲资源向Driver通报可以领用的任务数

    Driver根据DAG图向注册的Worker分配任务

4. Spark 编程实战

4.1 实验一:Spark Local模式的安装

4.1.1 实验准备

Ubuntu 20.04JavaHadoop

4.1.2 实验内容

基于上述实验环境,完成Spark Local模式的安装。

4.1.3 实验步骤

4.1.3.1 解压安装包

master@VM-0-12-ubuntu:/opt/JuciyBigData$ ls
apache-hive-2.3.9-bin.tar.gz  hbase-2.4.8-bin.tar.gz      mysql-connector-java_8.0.27-1ubuntu20.04_all.deb
hadoop-3.3.1.tar.gz           jdk-8u311-linux-x64.tar.gz  spark-3.2.0-bin-without-hadoop.tgz
master@VM-0-12-ubuntu:/opt/JuciyBigData$ sudo tar -zxvf spark-3.2.0-bin-without-hadoop.tgz -C /opt/
···
spark-3.2.0-bin-without-hadoop/licenses/LICENSE-re2j.txt
spark-3.2.0-bin-without-hadoop/licenses/LICENSE-kryo.txt
spark-3.2.0-bin-without-hadoop/licenses/LICENSE-cloudpickle.txt
master@VM-0-12-ubuntu:/opt/JuciyBigData$ 

4.1.3.2 更改文件夹名和所属用户

master@VM-0-12-ubuntu:/opt$ ll
total 1496476
drwxr-xr-x  9 root   root         4096 Mar 23 13:13 ./
drwxr-xr-x 20 root   root         4096 Mar 23 13:13 ../
drwxr-xr-x 14 master master       4096 Mar 18 23:14 hadoop/
drwxr-xr-x  8 master master       4096 Mar 19 20:19 hbase/
drwxr-xr-x 10 master master       4096 Mar 21 19:51 hive/
drwxr-xr-x  8 master master       4096 Sep 27 20:29 java/
drwxr-xr-x  2 root   root         4096 Feb 12 17:51 JuciyBigData/
-rw-r--r--  1 root   root   1532346446 Mar 15 18:28 JuciyBigData.zip
drwxr-xr-x  2 master master       4096 Mar 21 21:10 master/
drwxr-xr-x 13 ubuntu ubuntu       4096 Oct  6 20:45 spark-3.2.0-bin-without-hadoop/
master@VM-0-12-ubuntu:/opt$ sudo mv /opt/spark-3.2.0-bin-without-hadoop/ /opt/spark
master@VM-0-12-ubuntu:/opt$ sudo chown -R master:master /opt/spark/
master@VM-0-12-ubuntu:/opt$ ll
total 1496476
drwxr-xr-x  9 root   root         4096 Mar 23 13:13 ./
drwxr-xr-x 20 root   root         4096 Mar 23 13:14 ../
drwxr-xr-x 14 master master       4096 Mar 18 23:14 hadoop/
drwxr-xr-x  8 master master       4096 Mar 19 20:19 hbase/
drwxr-xr-x 10 master master       4096 Mar 21 19:51 hive/
drwxr-xr-x  8 master master       4096 Sep 27 20:29 java/
drwxr-xr-x  2 root   root         4096 Feb 12 17:51 JuciyBigData/
-rw-r--r--  1 root   root   1532346446 Mar 15 18:28 JuciyBigData.zip
drwxr-xr-x  2 master master       4096 Mar 21 21:10 master/
drwxr-xr-x 13 master master       4096 Oct  6 20:45 spark/
master@VM-0-12-ubuntu:/opt$ 

4.1.3.3 修改spark-env.sh

master@VM-0-12-ubuntu:/opt$ cd /opt/spark/conf
master@VM-0-12-ubuntu:/opt/spark/conf$ # 切换进入配置目录
master@VM-0-12-ubuntu:/opt/spark/conf$ cp ./spark-env.sh.template ./spark-env.sh
master@VM-0-12-ubuntu:/opt/spark/conf$ # 从模板拷贝一份文件
master@VM-0-12-ubuntu:/opt/spark/conf$ vim spark-env.sh
master@VM-0-12-ubuntu:/opt/spark/conf$ # 编辑文件,加入内容
master@VM-0-12-ubuntu:/opt/spark/conf$ head spark-env.sh
#!/usr/bin/env bash
export SPARK_DIST_CLASSPATH=$(/opt/hadoop/bin/hadoop classpath)
master@VM-0-12-ubuntu:/opt/spark/conf$ 

4.1.3.4 设置Spark的环境变量

master@VM-0-12-ubuntu:/opt/spark/conf$ sudo vim /etc/profile
master@VM-0-12-ubuntu:/opt/spark/conf$ # 编辑环境变量
master@VM-0-12-ubuntu:/opt/spark/conf$ source /etc/profile
master@VM-0-12-ubuntu:/opt/spark/conf$ # 刷新文件
master@VM-0-12-ubuntu:/opt/spark/conf$ tail /etc/profile# spark
export SPARK_HOME=/opt/spark
export PATH=$PATH:$SPARK_HOME/binmaster@VM-0-12-ubuntu:/opt/spark/conf$ # 在文件结尾加上上述内容
master@VM-0-12-ubuntu:/opt/spark/conf$ 

4.1.3.5 检验Spark是否成功部署

master@VM-0-12-ubuntu:/opt/spark/bin$ run-example SparkPi 2>&1 | grep "Pi is"
Pi is roughly 3.137795688978445
master@VM-0-12-ubuntu:/opt/spark/bin$

4.2 实验二:通过WordCount观察Spark RDD执行流程

4.2.1 实验准备

Ubuntu 20.04JavaHadoopSpark Local

4.2.2 实验内容

基于上述实验环境,通过WordCount观察Spark RDD执行,进一步理解Spark RDD的执行逻辑。

4.2.3 实验步骤

4.2.3.1 文本数据准备

master@VM-0-12-ubuntu:/opt/spark/data$ mkdir wordcount
master@VM-0-12-ubuntu:/opt/spark/data$ cd wordcount/
master@VM-0-12-ubuntu:/opt/spark/data/wordcount$ vim helloSpark.txt
master@VM-0-12-ubuntu:/opt/spark/data/wordcount$ cat helloSpark.txt
Hello Spark Hello Scala
Hello Hadoop
Hello Flink
Spark is amazingmaster@VM-0-12-ubuntu:/opt/spark/data/wordcount$ 

4.2.3.2 本地模式启动spark-shell

master@VM-0-12-ubuntu:/opt/spark/bin$ # 通过进入bin目录,启动spark-shell的本地环境,指定核数为2个
master@VM-0-12-ubuntu:/opt/spark/bin$ spark-shell --master local[2]
2022-03-23 13:28:21,991 WARN util.Utils: Your hostname, VM-0-12-ubuntu resolves to a loopback address: 127.0.1.1; using 172.16.0.12 instead (on interface eth0)
2022-03-23 13:28:21,992 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-03-23 13:28:35,169 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://172.16.0.12:4040
Spark context available as 'sc' (master = local[2], app id = local-1648013317063).
Spark session available as 'spark'.
Welcome to____              __/ __/__  ___ _____/ /___\ \/ _ \/ _ `/ __/  '_//___/ .__/\_,_/_/ /_/\_\   version 3.2.0/_/Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_311)
Type in expressions to have them evaluated.
Type :help for more information.scala> 

4.2.3.3 创建SparkContext对象

SparkContext是Spark程序所有功能的唯一入口

不管是使用scala,还是python语言编程,都必须有一个SparkContext。

Spark-shell中会默认为我们创建了SparkContext入口

后续无需再进行创建,可以直接用sc来进行编码。

SparkContext的核心作用:

  • 初始化Spark应用程序,运行所需要的核心组件

  • 包括DAGScheduler、TaskScheduler、SchedulerBackend

  • 同时还会负责Spark向Master注册等

如果不是spark-shell,可以通过如下方法新建:

val conf = new SparkConf()
// 创建SparkConf对象
conf.setAppName("First Spark App")
//设置app应用名称,在程序运行的监控解面可以看到名称
conf.setMaster("local")
//本地模式运行
val sc = new SparkContext(conf)
// 创建SparkContext对象,通过传入SparkConf实例来定制Spark运行的具体参数和配置信息

4.2.3.4 创建RDD

根据具体的数据来源,如HDFS,通过SparkContext来创建RDD

创建的方式有三种:根据外部数据源、根据Scala集合、由其他的RDD操作转换

数据会被RDD划分为一系列的Partitions

分配到每个Partition的数据属于一个Task的处理范畴

具体代码如下:

scala> val lines = sc.textFile("file:///opt/spark/data/wordcount/helloSpark.txt", 1)
lines: org.apache.spark.rdd.RDD[String] = file:///opt/spark/data/wordcount/helloSpark.txt MapPartitionsRDD[1] at textFile at <console>:23scala> // 读取本地文件并设置为一个Partitionscala> // 也可以将helloSpark.txt上传到hdfs中,直接读取hdfs中的文件scala> // 此时path路径不需要加"file://"前缀scala> 

4.2.3.5 对数据进行转换处理

对初始读入数据的RDD进行transformation级别的处理

如通过map、filter等高阶函数编程,进行具体的数据计算。

  1. 将每一行的字符串拆分为单个单词

    scala> val words = lines.flatMap{line => line.split(" ")}
    words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:23scala> // 把每行字符串进行单词拆分,把拆分结果通过flat合并为一个大的单词集合scala> 
  2. 在单词拆分的基础上对每个单词实例计数为1,也就是word ->(word, 1)

    scala> val pairs = words.map{word => (word, 1)}
    pairs: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:23scala>
    
  3. 在每个单词实例计数为1基础之上统计每个单词在文件中出现的总次数

    scala> val wordCountOdered = pairs.reduceByKey(_+_).map(pair=>(pair._2, pair._1)).sortByKey(false).map(pair => (pair._2, pair._1))
    wordCountOdered: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[7] at map at <console>:23scala>
    

4.2.3.6 打印数据

scala> wordCountOdered.collect.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))
Hello:4
Spark:2
Flink:1
is:1
:1
Scala:1
amazing:1
Hadoop:1scala> :quit
master@VM-0-12-ubuntu:/opt/spark/bin$ jps
1034400 Jps
master@VM-0-12-ubuntu:/opt/spark/bin$

4.2.4 WordCount在RDD的运行原理

4.2.4.1 textFile操作


textFile之后,产生了两个RDD:HadoopRDD 和 MapPartitionRDD

  1. HadoopRDD

    先产生HadoopRDD的原因是先从HDFS中抓取数据,导致先产生HadoopRDD

    HadoopRDD会从HDFS上读取分布式文件

    并将输入文件以数据分片的方式存在于集群中

    数据分片就是把要处理的数据分成不同的部分。

    例如,集群现在有4个节点,将数据分成4个数据分片(当然,这是一种粗略的划分)

    “Hello Spark"在第1台机器,"Hello Hadoop"在第2台机器

    "Hello Flink“在第3台机器,”Spark is amazing“在第4台机器

    HadoopRDD会从磁盘上读取数据,在计算的时候将数据以分布式的方式存储在内存中

    在默认情况下,Spark分片的策略是分片的大小与存储数据的Block块的大小是相同的

    假设现在有4个数据分片(partition),每个数据分片有128M左右

    这里描述为"左右"的原因是分片记录可能会跨越两个Block来存储

    如果最后一条数据跨了两个Block块,那它会被放在前面的一个分片中

    此时分片大小会大于128M(Block块大小)

  2. MapPartitionsRDD

    MapPartitionsRDD是基于HadoopRDD产生的RDD

    MapPartitionsRDD将HadoopRDD产生的数据分片((partition) 去掉相应行的key,只留value

    产生RDD的个数与操作并不一一对应

    在textFile操作产生了2个RDD,Spark中一个操作可以产生一个或多个RDD

4.2.4.2 flatMap操作

哇,教程这里的那个图好像还是个错的,下面是我改的

flatMap操作产生了一个MapPartitionsRDD

其作用是对每个Partition中的每一行内容进行单词切分

并合并成一个大的单词实例的集合

4.2.4.3 map操作


map操作产生了一个MapPartitionsRDD

其作用是在单词拆分的基础上,对单词计数为1

例如将“Hello”和“Spark“变为(Hello, 1),(Spark, 1)

4.2.4.4 reduceByKey操作


reduceByKey操作是对相同key进行value的统计

其包括了本地级别和全局级别的统计

该操作实际上产生了两个 RDD:MapPartitionsRDD与ShuffledRDD。

  1. MapPartitionsRDD

    reduceByKey在MapPartitionRDD之后,首先,进行本地级别(local)的归并操作

    把统计后的结果按照分区策略放到不同的分布式文件中

    例如将(Hello, 1),(Spark, 1),(Hello, 1)汇聚为(Hello, 2), (Spark, 1)

    以此进行局部统计然后将统计的结果传给下一个阶段

    如果下一个阶段是3个并行度,每个Partition进行local reduce后

    将自己的数据分成了3种类型传给下一个阶段

    分成3种类型最简单的方式是通过HashCode按3进行取模

    这个步骤发生在Stage1的末尾端,能够基于内存进行计算

    减少网络的传输,并加快计算速度

  2. ShuffledRDD

    reduceByKey进行Shuffle操作会产生ShuffleRDD

    因为在全局进行聚合的操作时,网络传输不能在内存中进行迭代

    因此需要一个新的Stage来重新分类

    把结果收集后,会进行全局reduce级别的归并操作

    对照上述流程图,4个机器对4行数据进行并行计算

    并在各自内存中进行了局部聚集,将数据进行分类

    图中,第1台机器获取数据为(Hello, 2),第2台机器获取数据为(Hello, 1)

    第3台机器获取数据为(Hello, 1),全局reduce在内部变成(Hello, 4)

    产生reduceByKey的最后结果,其他数据也类似操作

所以,reduceByKey包含两个阶段:

第一个是本地级别的reduce,另一个是全局级别的reduce

其中第一个本地级别是我们容易忽视的

4.2.4.5 输出

reduceByKey操作之后,我们得到了数据的最后结果,需要对结果进行输出

在该阶段会产生MapPartitionsRDD,这里的输出有两种情况:Collect或saveAsTextFile。

  1. 对于Collect

    MapPartitionsRDD的作用是把结果收集起来发送给Driver

  2. 对于saveAsTextFile

    将Stage2产生的结果输出到HDFS中时

    数据的输出要符合一定的格式,而现在的结果只有value,没有key

    所以MapPartitionsRDD会生成相应的key

    例如输出(Hello, 4),这里(Hello, 4)是value

    而不是"Hello"是key,4是value的形式

由于最初在textFile读入数据时,split分片操作将key去掉了,只对value计算

所以,最后需要将去掉的key恢复。这里的key只对Spark框架有意义(满足格式)

在向HDFS写入结果时,生成的key为null即可。

Jbd7:Spark相关推荐

  1. sparkcore分区_Spark学习:Spark源码和调优简介 Spark Core (二)

    本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了源码中作为注释,因此正文中是主要内容. 第一部分内容见: S ...

  2. 基于大数据的Uber数据实时监控(Part 1:Spark机器学习)

    导言 据Gartner称:到2020年,25亿辆联网汽车将成为物联网的主要对象.联网车辆预计每小时可以生成25GB的数据,对这些数据进行分析实现实时监控.大数据目前是10个主要领域之一,利用它可以使城 ...

  3. Spark详解(四):Spark组件以及消息通信原理

    1. Spark核心基本概念 Application(应用程序):指用户编写的Spark应用程序,包含驱动程序(Driver)和分布在集群中多个节点之上的Executor代码,在执行过程中由一个或多个 ...

  4. Spark详解(三):Spark编程模型(RDD概述)

    1. RDD概述 RDD是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现.RDD是Spark最核心的东西,它表示已被分区,不可变的并能够被并行 ...

  5. Spark详解(一):Spark及其生态圈概述

    1. Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架. Spark 运行速度快 易用性好 ...

  6. rdd数据存内存 数据量_spark系列:spark core 数据交互技术点(数据模型)

    spark core实现了spark的基本功能:存储交互.任务调度.内存管理.错误恢复等:本片文章主要介绍与数据交互相关的核心技术点. 本文目录: RDD特性及交互 shuffle操作及调优 RDD持 ...

  7. BigData之Spark:Spark计算引擎的简介、下载、经典案例之详细攻略

    BigData之Spark:Spark计算引擎的简介.下载.经典案例之详细攻略 目录 Spark的简介 1.Spark三大特点 Spark的下载 Spark的经典案例 1.Word Count 2.P ...

  8. spark重要参数调优建议:spark.default.parallelism设置每个stage默认的task数量

    spark.default.parallelism 参数说明:该参数用于设置每个stage的默认task数量.这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能. 参数调优建议:Spar ...

  9. spark指定python版本_1.Spark学习(Python版本):Spark安装

    Step1: 更新apt sudo apt-get update Step2: 安装SSH.配置SSH无密码登陆 sudo apt-get install openssh-server ssh loc ...

最新文章

  1. 【性格心理学】为什么我在关键时刻总是紧张?
  2. python引用numpy出错_使用numpy时出错
  3. 位运算使奇数+1 偶数-1_C ++程序打印从1到N的所有偶数和奇数
  4. win10缺少 `VCRUNTIME140.dll` 文件(解决篇)
  5. access汇总_区块链或密码学相关论文汇总,持续更新中
  6. html输入地址提示错误,高德地图开发之输入框内伴随地址的输入,动态给出地址选择提示...
  7. JavaScript 和 Java 有关系吗?
  8. 阶段面试题_关于文案、交易系统、付费渗透率、阶段性目标的面试题解答
  9. 归一化灰度直方图 Matlab
  10. 如何对大数据进行数据分析
  11. Win10自定义开始菜单磁贴背景颜色及图标
  12. 用“企业架构”方法指导信息化规划
  13. stm32驱动TFTLCD液晶屏显示图片+汉字(快速上手,只教怎么用,不讲原理!)
  14. 04-HTML标签(链接标签)
  15. 简述扁平式管理的技术手段
  16. SAP会计借贷(转载)
  17. 2022考研资料每日更新(2021.05.09)
  18. 串行传输,并行传输,异步同步传输
  19. 《Linux那些事儿之我是USB》我是U盘(7)狂欢是一群人的孤单--总结
  20. java实现简单区块链毕业设计

热门文章

  1. 宝塔Linux面板介绍安装命令!
  2. poi下载,富文本html转word,及微软office打不开问题
  3. 微信批量删除朋友圈脚本,使用auto.js编写的免root运行
  4. 文章阅读:3D U2-Net
  5. python excel转csv_python将excel转换为csv的代码方法总结
  6. python数据可视化分析-matplotlib
  7. Linux系列---NTP配置总结
  8. java调用远程 接口_java通过url调用远程接口返回json数据
  9. so easy!网页骨架屏自动生成方案(dps)
  10. js逆向案例-rus5逻辑学习