1.1 什么是RDD?

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。

1.2 RDD的属性

(1)一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。

(2)一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。

(3)RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

(4)一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

(5)一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

1.3 WordCount粗图解RDD

其中hello.txt

2、DataSet、DataFrame是什么?

DataSet是分布式数据集合。DataSet是Spark 1.6中添加的一个新接口,它提供了RDD的优势(强类型,使用强大的lambda函数的能力)和Spark SQL优化执行引擎的优点。DataSet可以被构造从JVM对象,然后使用功能性的转换(操作mapflatMapfilter等等)。DataSet API在Scala和 Java中可用。Python没有对Dataset API的支持。但由于Python的动态特性,数据集API的许多好处已经可用(即您可以自然地按名称访问行的字段row.columnName)。R的情况类似。

DataFrame是一个组织成命名列的DataSet。它在概念上等同于关系数据库中的表或R / Python中的数据框,但在底层具有更丰富的优化。DataFrame可以从多种来源构建,例如:结构化数据文件,Hive中的表,外部数据库或现有RDD。DataFrame API在Scala,Java,Python和R中可用。在Scala和Java中,DataFrame由Rows 的数据集表示。在Scala API中,DataFrame它只是一个类型别名Dataset[Row]。而在Java API中,用户需要使用Dataset<Row>来表示DataFrame

在本文档中,我们经常将Rows的Scala / Java数据集称为DataFrame。

Dataset<Row>  == DataFream 示例:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)// Select only the "name" column
df.select("name").show();
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+// Register the DataFrame as a global temporary view
df.createGlobalTempView("people");// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

DataSet 示例:

import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;public static class Person implements Serializable {private String name;private int age;public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}
}// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(Collections.singletonList(person),personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+// Encoders for most common types are provided in class Encoders
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map((MapFunction<Integer, Integer>) value -> value + 1,integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;// Create an RDD of Person objects from a text file
JavaRDD<Person> peopleRDD = spark.read().textFile("examples/src/main/resources/people.txt").javaRDD().map(line -> {String[] parts = line.split(",");Person person = new Person();person.setName(parts[0]);person.setAge(Integer.parseInt(parts[1].trim()));return person;});// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map((MapFunction<Row, String>) row -> "Name: " + row.getString(0),stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+// or by field name
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map((MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+import java.util.ArrayList;
import java.util.List;import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;// Create an RDD
JavaRDD<String> peopleRDD = spark.sparkContext().textFile("examples/src/main/resources/people.txt", 1).toJavaRDD();// The schema is encoded in a string
String schemaString = "name age";// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {String[] attributes = record.split(",");return RowFactory.create(attributes[0], attributes[1].trim());
});// Apply the schema to the RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map((MapFunction<Row, String>) row -> "Name: " + row.getString(0),Encoders.STRING());
namesDS.show();
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+

3、小结:

RDD适用于迭代计算、数据各种操作等。(Spark Core)

DataSet、DataFrame使用与规范化数据处理。(SparkSQL)

RDD、DataSet与DataFream相关推荐

  1. sql能查到数据 dataset对象里面没有值_spark系列:RDD、DataSet、DataFrame的区别

    RDD与DataSet的区别 二者都是由元素构成的分布式数据集合 1. 对于spark来说,并不知道RDD元素的内部结构,仅仅知道元素本身的类型,只有用户才了解元素的内部结构,才可以进行处理.分析:但 ...

  2. Spark总结之RDD(四)

    Spark总结之RDD(四) 1. 背景 Spark针对RDD的整体设计思想很多地方和MapReduce相似,但又做了很多优化. Spark整体API设计针对分布式数据处理做了很多优化,并且做了更高层 ...

  3. Apache Spark 3.0 SQL DataFrame和DataSet指南

    目录 简介 SQL 数据集和数据框 入门 起点:SparkSession Scala语言 Java语言 Python语言 R语言 创建DataFrame Scala语言 Java语言 Python语言 ...

  4. SparkSQL系列-5、什么是Dataset?

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 传送门:大数据系列文章目录 官方网址:http://spark.apache.org/. ht ...

  5. SparkSQL核心编程

    目录 基本介绍 DataFrame 创建 DataFrame DataSet 创建 DataSet RDD 转换为 DataSet DataSet 转换为 RDD DataFrame 和 DataSe ...

  6. 初识Spark2.0之Spark SQL

    内存计算平台Spark在今年6月份的时候正式发布了spark2.0,相比上一版本的spark1.6版本,在内存优化,数据组织,流计算等方面都做出了较大的改变,同时更加注重基于DataFrame数据组织 ...

  7. Spark之SparkSQL理论篇

    Spark SQL 理论学习: 简介 Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用. 特点 1)易整合 2) ...

  8. 详细解读Spark的数据分析引擎:Spark SQL

    一.spark SQL:类似于Hive,是一种数据分析引擎 什么是spark SQL? spark SQL只能处理结构化数据 底层依赖RDD,把sql语句转换成一个个RDD,运行在不同的worker上 ...

  9. 基于java的随机森林算法_基于Spark实现随机森林代码

    本文实例为大家分享了基于Spark实现随机森林的具体代码,供大家参考,具体内容如下 public class RandomForestClassficationTest extends TestCas ...

最新文章

  1. linux mysql设置数据库utf_Linux系统下MySQL数据库服务器字符集设置
  2. 模型开发-GBDT决策树模型开发代码
  3. Spread for Windows Forms快速入门(2)---设置Spread表单
  4. 基本机器学习面试问题 ---- Company/Industry Specific/Interest
  5. 原生JS写Ajax的请求函数
  6. 【英语学习】【Daily English】U12 E-World L02 All you have to do is download this taxi app
  7. 编程语言的语法与语义
  8. 【CCPC-Wannafly Winter Camp Day4 (Div1) A】夺宝奇兵(水题)
  9. HTML期末大作业~ 仿新浪微博个人主页html网站模板4个页面(HTML+CSS+JavaScript)
  10. python实现sip协议_SIP协议的常见命令 - HouWeiGui的个人空间 - OSCHINA - 中文开源技术交流社区...
  11. 企业之pacemaker基本概念及其原理
  12. java韩信点兵_韩信点兵练习题(死循环的应用)
  13. 【总结】有三AI所有原创人脸相关的学习资料汇总(2022年12月)
  14. KaTex各种语法汇总
  15. [ACTF2020 新生赛]Exec1命令注入
  16. Eric靶机渗透测试
  17. 评价体系的构建之指标筛选
  18. MYSQL UDF提权
  19. matlab 绘制图标,用于Matlab绘图的自定义标记
  20. 安卓编译x264与集成使用ffmpeg-demo

热门文章

  1. goim Java_GitHub - alberliu/gim: golang写的IM服务器(服务组件形式)
  2. 我的世界服务器地皮文件,我的世界服务器怎么创建地皮世界.doc
  3. GitHub学习总结(二)——创建仓库以及仓库主页说明
  4. TX Text Control: Version 30.0 Crack
  5. 系统显示新装Ubuntu字体使用
  6. 电脑字体模糊解决方法
  7. deepin linux版本回退,·已解决」deepin-wine-tim 无法使用中文输入法
  8. 我人生的第一个 1000 万
  9. python pycrypto_python3.6安装pycrypto,pycrytodome和crypto
  10. jQuery深入浅出(二)石头剪刀布动画实现