Spark 入门基础知识

  • Spark 的特点
    • 速度快
    • 使用方便
    • 通用
    • 兼容
  • Spark 基础
    • 下载
    • 独立部署模式(Standalone)
    • 弹性分布式数据集
    • Scala shell
      • 1. 数组中的最值:
      • 2. RDD的创建
      • 3. 过滤RDD中的数据
      • 4. 读取文件中的数据到RDD对象
      • 5. 统计 RDD对象中的单词的数量
    • RDD 编程(Java)
      • 00. sc.parallelize(list, numSlices)
      • 01. rdd.collect
      • 02. rdd.getNumPartitions()
      • 03. rdd.glom.collect
      • 04. rdd.count()
      • 05. rdd.take(n)
      • 06. rdd.takeOrdered(n, [ordering])
      • 07. rdd.first()
      • 08. r01.top(n, [comparator])
      • 09. rdd.max(comparator)
      • 10. rdd.min(comparator)
      • 11. rdd.foreach(func)
      • 12. rdd.coalesce(numPartitions, shuffle)
      • 13. rdd.union(rdd)
      • 14. rdd.intersection(func)
      • 15. rdd.distinct([numPartitions])
      • 16. rdd.filter(func)
      • 17. rdd.reduce(func)
      • 18. rdd.map(func)
      • 19. rdd.flatMap(func)
      • 20. rdd.sortBy(f, ascending, numPartitions)
      • 21. rdd.mapToPair(func)
      • 22. rdd.groupByKey
      • 23. rdd.mapValues
      • 24. rdd.groupBy
      • 25. sc.textFile
      • 26. rdd.saveAsTextFile
    • Spark 入门级程序 “Word Count”
    • 有向无环图
    • RDD间的依赖
      • 为什么会有宽窄依赖?
        • 窄依赖
        • 宽依赖
    • Shuffle
      • 性能影响
    • Stage
    • 小结

Apache Spark™ 是专为大规模数据处理(离线计算、实时计算、快速查询)而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

Spark 的特点

速度快

运行工作负载的速度提高了100倍。Apache Spark使用最新的DAG(Directed Acyclic Graph)调度程序,查询优化器和物理执行引擎,可实现批处理和流数据的高性能。

为什么 Spark 比Hadoop的MapReduce 要快?

Hadoop的MR是一种高度依赖磁盘IO的框架,在某些业务场景下,需要使用MR 的一系列Job才能完成,但用到某些算法(如:梯度下降,逻辑回归)时,会多次用到之前的Job的结果,就会导致多次Shuffle,进而会导致加大了磁盘IO的开销。

而Spark是高度依赖于内存的计算框架,而且也支持中间结果缓存,所以,Spark也被称之为内存计算框架。

什么是DAG?

DAG是Directed Acyclic Graph的简称,中文名称是有向无环图。在数学中,特别是图论和计算机科学中,指的是一个无回路的有向图。

使用方便

可以使用Java,Scala,Python,R和SQL快速编写应用程序。相比较 MapReduce而言,Spark还提供了80多种高级计算模型,可轻松构建并行应用程序。您可以 从Scala,Python,R和SQL Shell交互地使用它。例如:

df = spark.read.json("logs.json")
df.where("age > 21").select("name.first").show()

通用

Spark支持很多的模块,包括 SQL和DataFrames,MLlib(机器学习), GraphX和Spark Streaming.。您可以在同一应用程序中无缝组合这些库。


Spark框架设计的目的是:可以一站式处理大数据大多数应用场景,包括:

  1. 离线批处理
  2. 实时处理
  3. 交互式查询
  4. 算法建模

兼容

Spark 的部署方式(也可以称之为运行模式)有很多种,例如:

  1. local:本地模式(单机)–开发测试使用
  2. Standalone:无需第三方集群管理器即可快速启动独立集群
  3. On YARN:在Hadoop NextGen(YARN)之上部署Spark
  4. On Cloud:
  • Kubernetes:在 Kubernetes 之上部署Spark 2)
  • Mesos:使用 Apache Mesos 部署私有集群
  • Amazon EC2:可使您在大约5分钟内在 EC2 上启动集群的脚本

故,Spark支持大数据中的Yarn调度,支持 Mesos。也可以处理 Hadoop 计算的数据。跟很多大数据平台兼容。

Spark 基础

下载

Spark的官方网站是:http://spark.apache.org/,我们在下载Spark的时候最好是选择和当前环境中Hadoop版本相对应的版本,例如我们已经安装了Hadoop 2.7 版本,我们可以下载 spark-3.0.0-bin-hadoop2.7.tgz。

独立部署模式(Standalone)

在配置文件夹中没有spark-env.sh文件,但是有spark-env.sh.template 文件,我们需要使用cp 命令将其拷贝为spark-env.sh,再进行修改:

cp spark-env.sh.template spark-env.sh

将spark-env.sh 文件中的SPARK_LOCAL_IP 修改为当前主机的IP地址或者主机名。

# Options read when launching programs locally with
# ./bin/run-example or ./bin/spark-submit
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
SPARK_LOCAL_IP=centos
# - SPARK_PUBLIC_DNS, to set the public dns name of the driver program

弹性分布式数据集

在较高级别上,每个Spark应用程序都包含一个驱动程序,该程序运行用户的main函数并在集群上执行各种并行操作。Spark提供的主要抽象是弹性分布式数据集(Resilient Distributed Dataset),即RDD,它是跨集群节点划分的元素的集合,可以并行操作。

通过从Hadoop文件系统(或任何其他Hadoop支持的文件系统)中的文件或驱动程序中现有的Scala集合开始并进行转换来创建RDD。用户还可以要求Spark将RDD保留在内存中,从而使其能够在并行操作中有效地重用。最后,RDD会自动从节点故障中恢复。
初学时,可以把RDD看作是一种分布式的,弹性的,可容错的集合类型(类比List),有如下特点:

  1. 分布式的:有分区机制,可以并行(分布式)处理RDD数据集
  2. 弹性的:可以在作业期间动态增加或者减少分区数量
  3. 可容错的:数据丢失是可以借助DAG恢复

创建RDD的两种方式:

  1. 将普通集合,Array或者List转换为RDD
  2. 通过Spark读取外部文件,从任何支持的文件系统中的数据转变为RDD

Scala shell

通过Scala shell以交互方式运行Spark,能帮助我们更好地了解 Spark。切换目录到spark的bin目录下,输入如下指令即可进入到scala shell:

spark-shell --master=local

成功进入到 shell后的界面如下:

1. 数组中的最值:


scala> val al = Array(1,2,3,66)
al: Array[Int] = Array(1, 2, 3, 66)scala> al.max
res0: Int = 66scala> al.min
res1: Int = 1scala> al
res2: Array[Int] = Array(1, 2, 3, 66)

2. RDD的创建

使用Spark上下文(Spark Context,也就是sc对象)的parallelize函数将普通集合转换为了RDD。


scala> val r1 = sc.parallelize(al)
r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

因为RDD有分区的特性,默认情况下,一个RDD只有一个分区,如果需要有多个分区,则通过函数的第二个参数来指定分区的数量,如下所示:


scala> val r2 = sc.parallelize(al, 3)
r2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:26

查看RDD对象的分区数量:


scala> r1.partitions.size
res3: Int = 1scala> r2.partitions.size
res4: Int = 3

查看RDD对象每个分区中的数据:


scala> r2.glom.collect
[Stage  0:=======================================>       (2 + 1) / 3
res5: Array[Array[Int]] = Array(Array(1), Array(2), Array(3, 66))

从结果5中的可以看到,r2对象总共3个分区,第一个分区中的数据是Array(1),第二个分区中的数据是Array(2),第三个分区中的苏剧是Array(3, 66)。

另外,还可以通过makeRDD来创建RDD对象,它和parallelize函数一样,第一个参数室一个普通的集合,Array或者List都行,第二个参数用来指定分区的数量,例如:


scala> val r3 = sc.makeRDD(List(3, 6, 8, 11, 10), 3)
r3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at <console>:24

3. 过滤RDD中的数据

上例中,我们得到了r2 是一个拥有三个分区的RDD对象,现在我们需要从这个对象中过滤所有的偶数:

scala> r2.filter{x => x % 2 == 0}
res6: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at filter at <console>:26scala> res6.glom.collect
res7: Array[Array[Int]] = Array(Array(), Array(2), Array(66))

通过 filter 函数,传递的匿名函数,可以过滤所有的偶数。如果没有声明变量来接收过滤之后的数据,shell脚本自动生成变量来接收结果,例如本例中的 res6,如果不想看分区的情况,直接显示RDD对象中所有的数据,可以去掉 glom,就像这样:

scala> res6.collect
res8: Array[Int] = Array(2, 66)

4. 读取文件中的数据到RDD对象

我们先在 /root 下建立一个文件 words,内容如下:

tom 19
jack 20
mary 18

尝试使用sc的textFile函数读取words 文件中的内容并创建一个RDD对象:

scala> val r4 = sc.textFile("file:///root/words", 4)
r4: org.apache.spark.rdd.RDD[String] = file:///root/words MapPartitionsRDD[1] at textFile at <console>:24scala> r4.collect
res9: Array[String] = Array(tom 19, jack 20, mary 18)scala> r4.collect.size
res10: Int = 3scala> r4.glom.collect
res11: Array[Array[String]] = Array(Array(tom 19), Array(jack 20), Array(mary 18), Array(), Array())

textFile函数的第一个参数是指定文件的地址,采用的是 file协议,如果文件在 hdfs 上,就得采用hdfs 协议,第二个参数就是被创建的RDD对象的分区数。从本例中可知:

  1. 得到的RDD集合大小和words文件的总行数相同
  2. 不是每一个分区数中一定有数据

同样的,我们将 words 文件上传到 hdfs 上,然后通过 textFile 来读取,结果如下:


scala> val r5 = sc.textFile("hdfs://centos:9000/words", 2)
r5: org.apache.spark.rdd.RDD[String] = hdfs://centos:9000/words MapPartitionsRDD[8] at textFile at <console>:24scala> r5.collect
res12: Array[String] = Array(tom 19, jack 20, mary 18)scala> r5.collect.size
res13: Int = 3scala> r5.glom.collect
res14: Array[Array[String]] = Array(Array(tom 19, jack 20), Array(mary 18))

5. 统计 RDD对象中的单词的数量

在上例中,我们得到了 r5对象,r5中每个元素都是一行,我们来统计下每个单词的出现的次数,操作如下:

scala> r5.flatMap{line => line.split(" ")}.groupBy{word => word}.mapValues{list=>list.size}
res15: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[16] at mapValues at <console>:26scala> res15.collect
res16: Array[(String, Int)] = Array((20,1), (19,1), (tom,1), (18,1), (jack,1), (mary,1))

RDD 编程(Java)

在较高的层次上,每个Spark应用程序都由一个驱动程序组成,该驱动程序运行用户的主功能并在集群上执行各种并行操作。Spark提供的主要抽象是一个弹性分布式数据集(RDD),它是一个跨集群节点划分的元素集合,可以并行操作。RDD是从Hadoop文件系统(或任何其他支持Hadoop的文件系统)中的一个文件或驱动程序中现有的Scala集合开始创建的,并对其进行转换。用户还可以要求Spark将RDD持久化在内存中,这样就可以跨并行操作高效地重用RDD。最后,RDD会自动从节点故障中恢复。

—— Spark 官方开发文档

Spark对RDD进行运算的过程中主要依赖Transformation函数和Action函数。

Transformation函数主要是对 RDD 进行变换操作,都是懒方法,即调用后,并没有立即执行。

Action 函数是具体实施运算的函数,并得到运算结果,在执行 Action 函数的时候,会根据 DAG 找到需要的数据变换方式,进而调用Transformation函数实现运算。

00. sc.parallelize(list, numSlices)

使用Spark上下文(Spark Context,也就是sc对象)的parallelize函数将普通集合转换为了RDD对象。

JavaRDD<Integer> r01 = sc.parallelize(Arrays.asList(20, 20, 12, 7, 3));

因为RDD有分区的特性,在默认情况下,一个RDD只有一个分区,如果需要有多个分区,则通过函数的第二个参数来指定分区的数量。

JavaRDD<Integer> r02 = sc.parallelize(Arrays.asList(20, 21, 1, 10, 14), 3);

01. rdd.collect

【Action函数】,将数据集的所有元素作为数组返回。这通常在返回足够小的数据子集的过滤器或其他操作之后很有用。

System.out.println("r01.collect = " + r01.collect());
System.out.println("r02.collect = " + r02.collect());

02. rdd.getNumPartitions()

查看RDD对象的分区数量

System.out.println("r01.partitions.size = " + r01.getNumPartitions());
System.out.println("r02.partitions.size = " + r02.getNumPartitions());

设置默认的分区数量的两种方法:

  1. 修改 spark-defaults.conf 文件,增加 spark.default.parallelism 1
  2. 通过 SparkConf 的实例修改,示例: conf.set(“spark.default.parallelism”, “3”)

03. rdd.glom.collect

【Action函数】,查看RDD对象每个分区中的数据

System.out.println("r01.glom.collect = " + r01.glom().collect());
System.out.println("r02.glom.collect = " + r02.glom().collect());

从结果中可以看出:

  • r1 总共一个分区,其分区的数据是:[20, 20, 12, 7, 3];
  • r2 总共三个分区,第一个分区的数据是:[20],第二个分区的数据是:[21, 1],第三个分区的数据是:[10, 14]

04. rdd.count()

【Action函数】,查看RDD对象中的元素总数

System.out.println("r01.count = " + r01.count());
System.out.println("r02.count = " + r02.count());

05. rdd.take(n)

【Action函数】,返回具有数据集的前n个元素的数组。

System.out.println("r01.take(1) = " + r01.take(1));
System.out.println("r02.take(3) = " + r02.take(3));

06. rdd.takeOrdered(n, [ordering])

【Action函数】,使用自然顺序或自定义比较器返回RDD的前n个元素。

System.out.println("r01.takeOrdered(1) = " + r01.takeOrdered(1));
System.out.println("r02.takeOrdered(3, desc) = "
+ r02.takeOrdered(3, (Comparator<Integer> & Serializable)(a, b) -> (b - a)));

07. rdd.first()

【Action函数】,返回数据集的第一个元素(类似于take(1))。

System.out.println("r01.first = " + r01.first());
System.out.println("r02.first = " + r02.first());

08. r01.top(n, [comparator])

【Action函数】,返回数据集中最大的几个

System.out.println("r01.top(1) = " + r01.top(1));
System.out.println("r02.top(2) = " + r02.top(2));

09. rdd.max(comparator)

【Action函数】,通过 Comparator 的实例得到最大值

System.out.println("r01.max = "
+ r01.max((Comparator<Integer> & Serializable)(a, b) -> (a - b)));
System.out.println("r02.max = "
+ r02.max((Comparator<Integer> & Serializable)(a, b) -> (a - b)));

10. rdd.min(comparator)

【Action函数】,通过 Comparator 的实例得到最小值

System.out.println("r01.min = "
+ r01.min((Comparator<Integer> & Serializable)(a, b) -> (a - b)));
System.out.println("r02.min = "
+ r02.min((Comparator<Integer> & Serializable)(a, b) -> (a - b)));

11. rdd.foreach(func)

【Action函数】,在数据集的每个元素上运行函数func。通常这样做是出于副作用,例如更新累加器或与外部存储系统交互。
注意:在之外修改除累加器以外的变量foreach()可能会导致未定义的行为。

r01.foreach(x -> System.out.print("\t" + x));

12. rdd.coalesce(numPartitions, shuffle)

【Transformation函数】,将RDD中的分区数减少到numPartitions。筛选大型数据集后,对于更有效地运行操作很有用。如果需要扩大分区数量,需要让RDD重新执行shuffle,只需要为第二个参数指定为 true 即可。

JavaRDD<Integer> r1201 = r02.coalesce(2);
System.out.println("s> ");
System.out.println("r1201.collect = " + r1201.collect());
System.out.println("r1201.partitions.size = " + r1201.getNumPartitions());
System.out.println("r1201.glom.collect = " + r1201.glom().collect());
JavaRDD<Integer> r1202 = r02.coalesce(5, true);
System.out.println("r1202.collect = " + r1202.collect());
System.out.println("r1202.partitions.size = " + r1202.getNumPartitions());
System.out.println("r1202.glom.collect = " + r1202.glom().collect());

13. rdd.union(rdd)

【Transformation函数】,返回一个新的数据集,其中包含源数据集中的元素和参数的并集。例如,我们需要得到 r01 和 r02 中的数据并集元素

JavaRDD<Integer> r13 = r01.union(r02);
System.out.println("s> ");
System.out.println("r13.collect = " + r13.collect());
System.out.println("r13.partitions.size = " + r13.getNumPartitions());
System.out.println("r13.glom.collect = " + r13.glom().collect());

union 的结果:

  • 分区数是各个 rdd 的分区数量之和
  • 没有操作任何一个集合中的分区的元素,而是用一个更大的容器来容纳两个集合的数据的副本,示例:
    [[1]] ∪ [[2], [3], [4]] = [[1], [2], [3], [4]]

14. rdd.intersection(func)

【Transformation函数】,返回一个新的RDD,其中包含源数据集中的元素与参数的交集。例如,我们需要得到 r01 和 r02 中的数据交集元素

JavaRDD<Integer> r14 = r01.intersection(r02);
System.out.println("s> ");
System.out.println("r14.collect = " + r14.collect());
System.out.println("r14.partitions.size = " + r14.getNumPartitions());
System.out.println("r14.glom.collect = " + r14.glom().collect());

intersection 的结果:

  • 从该案例种看分区只有一个,但不足以说明只要是交集就一定得到一个分区的数据集
  • 数据是每个集合的所有分区的交集的结果,示例:
    [[1], [2]] ∩ [[2], [3]] = [[2]]

15. rdd.distinct([numPartitions])

【Transformation函数】,返回一个新的数据集,其中包含源数据集的不同元素。例如,我们需要将 r01 中的数据进行去重

JavaRDD<Integer> r15 = r01.distinct();
System.out.println("s> ");
System.out.println("r15.collect = " + r15.collect());

16. rdd.filter(func)

【Transformation函数】,返回一个新的数据集,该数据集是通过选择源中func返回true的那些元素形成的。例如,我们需要得到上文中 r2 中的所有的偶数

JavaRDD<Integer> r16 = r02.filter(x -> x % 2 == 0);
System.out.println("s> ");
System.out.println("r16.collect = " + r16.collect());

17. rdd.reduce(func)

Integer reduce = r01.reduce(Integer::sum);
System.out.println("s> ");
System.out.println("r01.reduce((v1, v2) => v1 + v2) = " + reduce);
reduce = r01.reduce((v1, v2) -> v1 - v2);
System.out.println("r01.reduce((v1, v2) => v1 - v2) = " + reduce);

18. rdd.map(func)

【Transformation函数】,返回一个新的分布式数据集,该数据集是通过将源的每个元素传递给函数func形成的。例如我们需要将 r01 中的所有数据都是做是C类IP地址,试将 r01 中的所有数据转换成 192.168.10.? 的格式
A类 10.0.0.0–10.255.255.255
B类 172.16.0.0–172.31.255.255
C类 192.168.0.0–192.168.255.255

JavaRDD<String> r18 = r01.map(x -> "192.168.10." + x);
System.out.println("s> ");
System.out.println("r18.collect = " + r18.collect());

19. rdd.flatMap(func)

【Transformation函数】,与map相似,但是每个输入项都可以映射到0个或多个输出项(因此func应该返回Seq而不是单个项,也就是需要返回一个迭代器)。
例如我们需要将 r18 中的所有数据都根据“.”来切分,得到数字格式的集合。

JavaRDD<String[]> r1901 = r18.map(x -> x.split("\\."));
System.out.println("s> ");
System.out.println("s> val r1901 = r18.map(x => x.split('.'))");
System.out.println("r1901.collect = " + r1901.collect());
JavaRDD<Integer> r1902 = r1901.flatMap(vs -> {List<Integer> nums = new ArrayList<Integer>();for (String v : vs) {nums.add(Integer.valueOf(v));}return nums.iterator();
});
System.out.println("s> val r1902 = r1901.flatMap(vs => vs)");
System.out.println("r1902.collect = " + r1902.collect());

RDD 对象的链式写法

JavaRDD<Integer> r19 = r18.map(x -> x.split("\\.")).flatMap(vs -> {List<Integer> nums = new ArrayList<Integer>();for (String v : vs) {nums.add(Integer.valueOf(v));}return nums.iterator();
});

20. rdd.sortBy(f, ascending, numPartitions)

【Transformation函数】,为 rdd 排序

JavaRDD<Integer> r20 = r01.sortBy(v -> v, true, 1);
System.out.println("s> ");
System.out.println("r20.collect = " + r20.collect());

21. rdd.mapToPair(func)

【Transformation函数】,将集合中的元素转换为 K/V 格式的元组,例如将 r01 中的元素转换为元组,元组的键根据值的奇偶性来确定 even 或者 odd。

JavaPairRDD<String, Integer> r21 = r01.mapToPair(v -> new Tuple2<>(v % 2 == 0 ? "even" : "odd", v));System.out.println("s> ");
System.out.println("r21.collect = " + r21.collect());

22. rdd.groupByKey

【Transformation函数】,将集合中的元组根据键进行分组,并且将值合并到一个元组中。例如将 r21 的数据按照元组的键进行转换,效果如下:
[(even,20), (even,20), (even,12), (odd,7), (odd,3)]
↓↓↓↓↓
[(even,[20, 20, 12]), (odd,[7, 3])]

JavaPairRDD<String, Iterable<Integer>> r22 = r21.groupByKey();
System.out.println("s> ");
System.out.println("s> val r22 = r21.groupByKey()");
System.out.println("r22.collect = " + r22.collect());

23. rdd.mapValues

【Transformation函数】,遍历每个元组的值,并将其转换为一个值作为该元组的新值,例如:
[(even,[20, 20, 12]), (odd,[7, 3])]
↓↓↓↓↓
[(even,52), (odd,10)]

JavaPairRDD<String, Integer> r23 = r22.mapValues(new Function<Iterable<Integer>, Integer>() {@Overridepublic Integer call(Iterable<Integer> it) throws Exception {Iterator<Integer> iterator = it.iterator();int total = 0;while (iterator.hasNext()) {iterator.next();total++;}return total;}
});
System.out.println("s> ");
System.out.println("r23.collect = " + r23.collect());

结合 mapToPair, groupByKey 的链式写法

JavaPairRDD<String, Integer> rdd23 = r01.mapToPair(v
-> new Tuple2<>(v % 2 == 0 ? "even" : "odd", v))// 按键分组.groupByKey()// 合并元组的值.mapValues(it -> {Iterator<Integer> iterator = it.iterator();int total = 0;while (iterator.hasNext()) {iterator.next();total++;}return total;});
System.out.println("rdd23.collect = " + r23.collect());

24. rdd.groupBy

【Transformation函数】,通过自定义元组的键实现分组,可以简单的理解为是 mapToPair 和 groupByKey 的结合得到

JavaPairRDD<String, Iterable<Integer>> r24 = r01.groupBy(v -> v % 2 == 0 ? "even" : "odd");
System.out.println("s> ");System.out.println("r24.collect = " + r24.collect());
// 结合 mapValues 的链式写法
JavaPairRDD<String, Integer> rdd24 = r01.groupBy(v -> v % 2 == 0 ? "even" : "odd").mapValues(it -> {Iterator<Integer> iterator = it.iterator();int total = 0;while (iterator.hasNext()) {iterator.next();total++;}return total;
});
System.out.println("r24.collect = " + r24.collect());

25. sc.textFile

使用sc 的 textFile 的可以从 HDFS 或通过转换其他支持的数据集来创建RDD。
读取本地文件

JavaRDD<String> r25 = sc.textFile("file:///mr/word.txt");
System.out.println("r25 is read from local file.");
System.out.println("r25.collect = " + r25.collect());
System.out.println("r25.partitions.size = " + r25.getNumPartitions());
System.out.println("r25.glom.collect = " + r25.glom().collect());

读取HDFS文件

JavaRDD<String> rfh = sc.textFile("hdfs://hadoop01:9000/mr/word.txt");System.out.println("rfh is read from hdfs file.");
System.out.println("rfh.collect = " + rfh.collect());
System.out.println("rfh.partitions.size = " + rfh.getNumPartitions());
System.out.println("rfh.glom.collect = " + rfh.glom().collect());

26. rdd.saveAsTextFile

将 rdd 的数据存储到磁盘或者其他可以支持的存储系统中

写入本地文件

r24.saveAsTextFile("file:///mr/r24.out");

写入HDFS文件

r24.saveAsTextFile("hdfs://hadoop01:9000/r24.out");

注意:修改 Hadoop 的 hdfs-site.xml 配置中的 dfs.permissions.enabled 的值为 false,以避免出现权限被拒绝的异常

Spark 入门级程序 “Word Count”

统计单词出现的次数,Java 代码示例:

JavaPairRDD<String, Integer> rwc = sc.textFile("file:///root/word.txt").flatMap((line) -> Arrays.asList(line.split(" ")).iterator())// 分组.groupBy(w -> w)// 统计.mapValues((it) -> {int total = 0;Iterator<String> iterator = it.iterator();while (iterator.hasNext()) {iterator.next(); total++;}return total;}).saveAsTextFile("file:///mr/word.out");

统计单词出现的次数,Spark Scala Shell 代码示例:

sc.textFile("/root/word.txt").flatMap{l => l.split(' ')}.groupBy{w => w}.mapValues{vs => vs.size}

有向无环图

在图论中,如果一个有向图无法从某个顶点出发经过若干条边回到该点,则这个图是一个有向无环图,简称DAG(Directed Acyclic Graph)。例如:


在 Java API的章节中的实例 “Word Count” 中,每个变换操作都会产生一个结果,也就是一个RDD对象,该对象将被下游的变换操作来使用。

通过DAG,会记录了每个RDD之间的依赖关系,也就是记录下每个RDD是通过何种变换操作生成的。记录这种依赖关系有助于当下游对象的数据丢失后,可以通过DAG找到上游对象重新恢复数据。这也是RDD容错机制的重要体现。

它的每一个操作都是DAG的一个节点,如下图所示:

在此鱼骨图的六个节点中,节点 ① – ⑤ 均是 Transformation 函数,在实际运行过程中,没有被立即执行。而节点⑥是 Action函数,它会立即执行,且会先执行上游操作并得到其结果。

原始的RDD通过一系列转换就形成了DAG,DAG会记录RDD之间的依赖关系。包含了RDD有哪些父RDD转换而来,以及它依赖的父RDD的哪些分区,这是DAG的重要属性。

借助这些依赖关系,DAG可以认为这些RDD之间形成了Lineage(血统,血缘关系),借助Lineage,能保证一个RDD被计算前,它所依赖的父RDD都已经计算完成,同时也体现了RDD的容错性。即如果一个RDD的部分或者全部的计算结果丢失了,那么就需要重新计算这部分丢失的数据。

RDD间的依赖

处于DAG链的上游操作被称为父RDD。例如 “Word Count” 实例中的 ① 就是 ② 的父RDD,② 就是 ① 的子RDD。

RDD和它依赖的父依赖(parent RDD)关系分为窄依赖(narrow dependency)和宽依赖(shuffle dependency)两种类型。

简单来说,宽依赖是指父RDD的每个分区都可能被子RDD的多个分区使用,窄依赖是指父RDD的每个分区只被某一个子RDD分区使用。如下图所示:

为什么会有宽窄依赖?

在 Spark 中,数据抽象表示为统一的数据模型,即 RDD。而每一次对 RDD 进行转换操作时,我们都会得到一个新的RDD。例如,rdd2 = rdd1.map(func)。那么,前后的RDD 自然就形成了某种联系,即新生成的子 RDD 会依赖上游的父RDD。而这种联系实际上就是新生成的RDD的分区如何依赖父RDD 的分区的。

对于某些一元操作,比如 map()、filter() 等,子RDD 的各个分区分别只依赖父RDD 中的各个分区,是一一映射的关系。而对于某些聚合操作,比如groupBy等,在计算的时候需要对父RDD 的各个分区进行计算,子RDD 的各个分区可能都只依赖父RDD 各个分区中的一部分,不是一对一的映射关系。所以,Spark 所做的就是抽象出可以通用的方法,来处理各种情况的依赖。

目前,Spark 将这些依赖关系分为两大类:宽依赖(ShuffleDependency)和窄依赖(NarrowDependency)。

窄依赖

参考类:org.apache.spark.NarrowDependency

对于窄依赖的操作,它们只是将Partition的数据根据转换规则进行转换,并不涉及其他的操作,可以简单的认为只是将数据从一种形式转变为另外一种形式,其父分区和子分区是一对一的关系。

子RDD的每个分区都依赖于父RDD的少量分区的依赖关系的基类。狭窄的依赖关系允许流水线执行。

对于窄依赖,并不会引入昂贵的 Shuffle,所以执行效率非常高。如果整个DAG中,存在多个连续的窄依赖,则可以将这些连续的窄依赖整合到一起连续执行,中间不执行 Shuffle,从而提高效率,这样的优化方式称之为流水线优化

此外,针对窄依赖,如果子RDD的某个分区数据丢失,只需要找到父RDD对应依赖的分区,即可恢复数据。

宽依赖

参考类:org.apache.spark.ShuffleDependency.ShuffleDependency

对于宽依赖,父子分区并不是一对一的关系,在实际执行过程中,会发生Shuffle,也会发生磁盘I/O,所以,Spark并不是完全基于内存的,也是要依赖于磁盘的,但是已经尽可能地减少 Shuffle的产生。

Shuffle

在Spark中,数据通常不会跨分区分布在特定操作的必要位置。在计算期间,单个任务将在单个分区上运行。因此,要组织所有数据groupByKey以执行单个group任务,Spark需要执行所有操作。它必须从所有分区读取以找到所有键的所有值,然后将各个分区的值汇总在一起以计算每个键的最终结果,这个过程称为shuffle。

性能影响

Shuffle是昂贵的操作,因为它涉及的磁盘I / O,数据序列,和网络I / O。为了整理数据,Spark生成一组Task任务(map任务以整理数据)和一组Reduce任务来汇总数据。

—— 此术语来自MapReduce,与Spark的map和reduce操作没有直接关系。

在内部,单个Map任务的结果会保留在内存中,直到无法容纳为止。然后,根据目标分区对它们进行排序并写入单个文件。在Reduce端,任务读取的是相关的已经排好顺序的块。

Shuffle还会在磁盘上生成大量中间文件。从Spark 1.3开始,将保留这些文件,直到不再使用相应的RDD并进行垃圾回收为止。这样做是为了在重新计算沿袭时无需重新创建Shuffle文件。如果应用程序保留了对这些RDD的引用,或者如果GC不经常启动,则可能仅在很长一段时间后才发生垃圾回收。这意味着长时间运行的Spark作业可能会占用大量磁盘空间。spark.local.dir在配置Spark上下文时,用于指定临时存储目录由配置参数。

Stage

Spark在执行任务的时候,首先会根据依赖关系,划分为不同的阶段(Stage),其处理流程是:

  1. Spark在执行 Transformation操作时都不会立即执行,而是懒执行(计算)
  2. 执行若干步的 Transformation操作后,一旦遇到 Action 类型的操作,才会真正触发执行(计算)
  3. 执行时,从当前 Action 方法向前回溯
  4. 如果遇到窄依赖则应用流水线优化,继续回溯,直到碰到宽依赖
  5. 因为宽依赖必须要进行 Shuffle,无法实现优化,所以,将这一次执行过程组装为一个 Stage
  6. 再从当前宽依赖开始继续回溯,重复4和5,从而将整个DAG划分为若干个 Stage

在实例 “Word Count” 中,就存在两个 Stage,执行顺序如下图所示:

小结

  1. 弹性分布式数据集(RDD),是Spark最核心的数据结构。它是一种分布式的可容错的弹性数据集。可以分布式处理数据,也可以在数据丢失时通过依赖关系恢复数据。
  2. RDD的依赖关系是通过各种Transformation来得到的。父RDD和子RDD之间的依赖关系分为两种:①窄依赖,②宽依赖
    a) 窄依赖,父子RDD的分区关系是一对一,且不会发生 Shuffle,执行效率高,框架底层会针对多个连续的窄依赖进行流水线优化,从而提升性能。
    b) 宽依赖,父子RDD的分区关系是多对多,且会发生Shuffle。
  3. 有向无环图(DAG),当一整条RDD的依赖关系形成后,就形成了一个DAG。一般来说,一个DAG最后都至少会触发一个Action操作,来触发执行。一个Action对应一个 Job 任务。
  4. 执行阶段(Stage),一个DAG会分局RDD之间的依赖关系进行Stage的划分。流程是:以Action为基准,向前回溯,遇到宽依赖就形成一个Stage,遇到窄依赖就执行流水线优化(将多个窄依赖放到一起执行)。
  5. RDD的Transformation操作,属于懒执行(计算),不会立即执行。
  6. RDD的Action操作,触发真正的执行(计算)。
  7. Spark默认的调度模式是FIFO

Spark —— 闪电般快速的统一分析引擎 —— 入门基础知识相关推荐

  1. 【Rust日报】2021-12-14 Lapce: 用Rust编写的闪电般快速且功能强大的代码编辑器

    Lapce: 用Rust编写的闪电般快速且功能强大的代码编辑器 Lightning-fast and Powerful Code Editor written in Rust Lapce完全是用Rus ...

  2. Electron 快速开始(一)-入门基础、使用 JavaScript,HTML 和 CSS 构建跨平台的桌面应用程序

    文章目录 Electron 快速开始(一)-入门基础.使用 JavaScript,HTML 和 CSS 构建跨平台的桌面应用程序 Electron简介 多进程模型​ Electron 快速开始 管理窗 ...

  3. SpringCloud系列知识快速复习 -- part 1(SpringCloud基础知识,Docker,RabbitMQ)

    SpringCloud知识快速复习 SpringCloud基础知识 微服务特点 SpringCloud常用组件 服务拆分和提供者与消费者概念 Eureka注册中心 原理 Ribbon负载均衡 原理 负 ...

  4. 三 计算机知识的重要性分析,学习计算机基础知识对中专学生的重要性分析

    学习计算机基础知识对中专学生的重要性分析 [摘 要]本文主要介绍了计算机基础知识的内容,阐述了学习计算机基础知识对中专学生的作用,并且通过对计算机基础操作的学习,提高中专学生的计算机应用水平.希望本文 ...

  5. 计算机基础知识教程算法,快速掌握!计算机二级公共基础知识教程:算法

    小编所收集到的相关计算机二级公共基础知识教程:算法的资料 大家要认真阅读哦! 算法是指解题方案的准确而完整的描述.即是一组严谨地定义运算顺序的规则,并且每一个规则都是有效的,且是明确的,没有二义性,同 ...

  6. 【软考】《希赛教育·软件设计师考前冲刺与考点分析》计算机硬件基础知识——学习笔记

    Content 第1章 计算机硬件基础知识 第2章 操作系统基础知识 第3章 程序语言和语言处理程序基础知识 第4章 数据结构 第5章 数据库系统基础知识 第6章 网络基础知识 第7章 软件工程基础知 ...

  7. 中专计算机基础知识汇总,【职业中专计算机基础教育分析】 计算机基础知识...

    摘要:职业教育是指培养面向具体的职业岗位(岗位群)所需职业能力人才的专业基础教育.而在职业教育中除了开设各专业所需的专业课程外,还要开设一些基础教育来充实学生的能力,为其就业打下坚实的基础.计算机基础 ...

  8. dw 快速html注释,笔记整理1-HTML基础知识与DW简单使用-工具-站长头条

    笔记整理1 -- HTML基础知识与DW简单使用 笔记整理1 -- HTML基础知识与DW简单使用 概念 客户端和服务器端 文件名.基本名.扩展名 资源文件和站点 什么是HTML 关于W3C W3C的 ...

  9. 模板引擎Freemarker基础知识

    Freemarker基础知识 Freemarker是什么 FreeMarker 基础指令 List指令 遍历Map数据 if指令 其它指令 运算符 空值处理 内建函数 入门Demo 要导入的依赖 配置 ...

最新文章

  1. 小米云能同步到华为手机上吗_有没有小米还没涉足的产业?对标百度网盘,小米云盘即将上线...
  2. 张艾迪(创始人): 梦想与未来
  3. 2016四季度 服务器收入和出货量双下滑
  4. Docker volume使用
  5. 服务器系统盘安装在sdb,从U盘自动安装centos5.3到服务器
  6. 超炫酷的 Docker 终端 UI lazydocker,想看哪里点哪里
  7. 9-2 go语言的调度器
  8. 需求文档中容易出的错误
  9. 【故障诊断预测】基于matlab FFT与DBN轴承故障诊断预测【含Matlab源码 1741期】
  10. SourceTree使用笔记 ssh-key配置
  11. 佳能G3800黄灯绿灯交替闪烁7次,错误代码5B00
  12. Django中文文档
  13. ubuntu 14.04 安装 diffmerge
  14. 手机定位(原生android定位)
  15. Python基础——文件
  16. linux pvs命令安装,使用linux的pvs命令格式化输出物理卷信息报表
  17. flutter 使用 高德地图选取位置
  18. 三明梅列:社区服务走进“微时代”
  19. 计算机存储器组成结构,计算机组成原理——存储器内部组成
  20. 控制系统仿真技术(二)-连续系统的数字仿真二

热门文章

  1. 音视频传输协议众多, 5G时代不同业务应该如何选择?
  2. unity发布ios高通AR的问题
  3. matlab泰勒公式含义,泰勒公式的哲学意义与敏捷研发
  4. node.js - Nodejs 分布式事务_个人文章 - SegmentFault 思否
  5. 【XMR】/usr/bin/ld: 找不到 -lstdc++
  6. CSS让文字超过部分省略号显示
  7. python输出数字序列0 1 2 3 4 5_python从0到1:3.列表
  8. 13台根服务器位置,根服务器13台地址
  9. 各种友(e)善(xin)数论总集,从入门到绝望2---快速判断素数
  10. 爬虫学习-Web-Harvest