转载自:https://blog.csdn.net/zhaojw_420/article/details/53261965

一、基本RDD

1、针对各个元素的转化操作 
最常用的转化操作是map()和filter()。转化操作map()J接收一个函数,把这个函数用于RDD中的每一个元素,将函数的返回结果作为结果RDD中对应元素。而转化操作filter()则接收一个函数,将RDD满足该函数的元素放入新的RDD中返回。map()的返回值类型不需要和输入类型一样。 
从一个RDD变成另外一个RDD。lazy,懒执行 。比如根据谓词匹配筛选数据就是一个转换操作。 
例:求平均值 
Scala:

val input=sc.parallelize(List(1,2,3,4))
val result=input.map(x=>x*x)
println(result.collect().mkString(","))

java:

@Test
public void rddSquare(){SparkConf sparkConf = new SparkConf().setAppName( "JavaWordCount");JavaSparkContext sc = new JavaSparkContext( sparkConf);JavaRDD<Integer> rdd= sc.parallelize(Arrays. asList(1,2,3,4));JavaRDD<Integer> result= rdd.map( new Function<Integer, Integer>() {@Overridepublic Integer call(Integer x ) throws Exception {return x *x ;}});System. err.println(StringUtils.join( result.collect(), ""));}

flatMap()方法可以实现对每个输入元素生成多个输出元素,返回一个返回值序列的迭代器。其一个简单用途就是把输入的字符串切分为单词。 
Scala:

val lines=sc.parallelize(List("hello word","hi","I'm back"))
val words=lines.flatMap(line=>line.split(" "))
words..first()

Java:

@Test
public void rddFlatMap(){JavaRDD<String> lines= sc.parallelize(Arrays.asList( "hello word","hi" ,"i am back"));JavaRDD<String> words= lines.flatMap( new FlatMapFunction<String, String>() {@Override
public Iterator<String> call(String line ) throws Exception {return Arrays.asList( line.split( " ")).iterator();}
});System. err.println(words .first());}

flatMap()和map()方法的区别:flatMap()相当于看作返回来的迭代器的“压扁”,这样就得到一个由各个列表中的元素组成的RDD。 
例如: 
map()的结果:{[“coffe”,”panda”],[“happy”,”panda”],[“happies”,”panda”,”party”]} 
flatMap()的结果:{“coffe”,”panda” ,”happy”,”panda” ,”happies”,”panda”,”party” } 
filter()操作不会改变已有的inputRDD中的数据。 
通过转化操作,从已有的RDD中派生出新的RDD,spark会使用谱系图来记录这些不同RDD之间的依赖关系。spark需要用这些信息来按需计算每个RDD,也可以依靠谱系图在持久化的RDD丢失部分数据时恢复所丢失的数据。 
Scala:

val inputRDD=sc.textFile("log.txt")
val errorRDD=inputRDD.filter(line=>line.contains("error"))

Java:

JavaRDD<String> inputRDD=sc.textFile("log.txt");
JavaRDD<String> errorRDD=inputRDD.filter(new Function<String,Boolean>(){public Boolean call(String x){return x.contains("error");}}
);

filter()操作不会改变已有的inputRDD中的数据。 
通过转化操作,从已有的RDD中派生出新的RDD,spark会使用谱系图来记录这些不同RDD之间的依赖关系。spark需要用这些信息来按需计算每个RDD,也可以依靠谱系图在持久化的RDD丢失部分数据时恢复所丢失的数据。

2、伪集合操作 
RDD.distinct()方法转换操作生成一个只包含不同元素的一个新的RDD。开销很大。 
RDD.union(otherRDD),会返回一个包含两个RDD中所有元素的RDD,包含重复数据。 
RDD.intersection(otherRDD),只返回两个RDD中都有的元素。可能会去掉所有的重复元素。通过网络混洗来发现共有元素。 
RDD.subtract(otherRDD)返回只存在第一个RDD中而不存在第二个RDD中的所有的元素组成的RDD。也需要网络混洗。 
RDD.cartesian(otherRDD),计算两个RDD的笛卡尔积,转化操作会返回所有可能的(a,b)对,其中a是源RDD中的元素,而b则来自于另一个RDD。 

对一个数据为{1,2,3,3}的RDD进行操作进行基本的RDD转化操作 

3、行动操作 
RDD最常见的行动操作:reduce()操作,它接受一个函数作为参数,这个函数要操作两个相同类型的RDD数据并返回一个同样类型的新元素。 
reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。 
行动操作会对RDD计算一个结果,并把结果返回到驱动程序中,或把结果存储到外部存储系统中(如HDFS)中。 
Scala:

val rdd=sc.parallelize(List(1,2,3,3))
val sum=rdd.reduce((x,y)=>x+y)

Java:

/*** java中的reduce()方法*/
public void testReduce(){JavaRDD<Integer> rdd= sc.parallelize(Arrays.asList(1,2,3,3));Integer sum= rdd.reduce( new Function2<Integer, Integer, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer x , Integer y ) throws Exception {return x +y ;}});System. err.println(sum );
}

flod()方法与reduce()方法类似,接受一个与redce()接受的函数相同的函数,再加上一个“初始值”来作为分区第一次调用时的结果。 
两者都要求函数的返回值类型需要和我们所操作的RDD中的元素类型相同。 
aggregate()函数则把我们从返回值类型必须与所操作的RDD类型相同的限制中解放出来。可以计算两个RDD的平均值。 
Scala:

val rdd=sc.parallelize(List(1,2,3,4,5,6))
val result=rdd.aggregate((0,0))((acc,value)=>(acc._1+value,acc._2+1),(acc1,acc2)=>(acc1._1+acc2._1,acc1._2+acc2._2))
val avg=result._1/result._2.toDouble

Java:

public class AvgCount implements Serializable {private static final long serialVersionUID = 1L;private final static SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount" );private final static JavaSparkContext sc = new JavaSparkContext( sparkConf);public int total ;public int num ;public AvgCount(int total , int num) {this. total = total;this. num = num;}public double avg(){return total /(double)num;}static Function2<AvgCount,Integer,AvgCount> addAndCount=new Function2<AvgCount, Integer, AvgCount>() {@Overridepublic AvgCount call(AvgCount a , Integer x ) throws   Exception {                                                                                                                     a. total+= x;a. num+=1;                                                               return a ;}
};
static Function2<AvgCount, AvgCount, AvgCount> combine=new Function2<AvgCount, AvgCount, AvgCount>() {@Overridepublic AvgCount call(AvgCount a , AvgCount b ) throws Exception {                                                       a. total+= b. total;a. num+= b. num;                                                              return a ;}};public static void main(String[] args) {AvgCount initial= new AvgCount(0, 0);JavaRDD<Integer> rdd= sc.parallelize(Arrays.asList(1,2,3,4,5,6));AvgCount result= rdd.aggregate( initial, addAndCount, combine );System. err.println(result .avg());}
}

collect()方法会返回整个RDD的内容。测试中使用。RDD内容不多。 
take(n)返回RDD的第n个元素。并且尝试只访问尽量少的分区,因此该操作会得到一个不均衡的集合。可能返回的元素会跟预期的不太一样。 
top()按照RDD元素的顺序,返回RDD的前几个元素。 
first()就是一个行动操作,他会返回RDD的第一个元素。 
触发计算,进行实际的数据处理 
Scala:

print "input had "+badLinesRDD.count()+" concering lines"
print "Here are 10 examples:"
for line in badLinesRDD.take(10).foreach(println)

Java:

System.out.println("input had "+badLinesRDD.count()+" concering lines" );
System.out.print("Here are 10 examples:" );
for(Sring line:badLinesRDD.take(10)){System.out.println(line);
}

对一个数据为{1,2,3,3}的RDD进行基本的RDD行动操作如表:。 

两者的区别在于Spark计算RDD的方式不同。虽然你可以在任何时候去定义新的RDD,但Spark只会惰性计算这些RDD,他们只有在第一次在一个行动操作中用到时,才会真正计算。

二、在不同RDD类型间转换 
在Scala中将RDD转为特定函数的RDD是由隐式转换自动处理的。需要加上import org.apache.spark.SparkContext._来使用在这些隐式转换。这些隐式转换可以隐式的将一个RDD转换为各种封装,比如DoubleRDDFunctions(数值数据的RDD)和PairRDDFunction(键值对RDD)。 
在Java中有两个专门的类JavaDoubleRDD和JavaPairRDD,来处理特殊类型的RDD。 
Java中针对专门类型的函数接口: 

/*** java创建DoubleRDD* @author Administrator**/
public class DoubleRDD implements Serializable {private static final long serialVersionUID = 1L;private final static SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount" );private final static JavaSparkContext sc = new JavaSparkContext( sparkConf);public void testDoubleRDD(){JavaRDD<Integer> rdd= sc.parallelize(Arrays.asList(1,2,3,4,5));JavaDoubleRDD result= rdd.mapToDouble( new DoubleFunction<Integer>() {private static final long serialVersionUID = 1L;@Overridepublic double call(Integer x) throws Exception {return (double )x *x ;}});System. err.println(result );}
}

三、持久化(缓存) 
当我们让Spark持久化存储一个RDD时,计算出RDD的节点会分别保存他们所有求出的分区数据。如果一个有持久化数据的节点发生故障,spark会在需要用到的缓存数据时重算丢失的数据分区。可以把数据备份到多个节点上。 
在scala和java中,默认情况下persist()会把数据以序列化的形式缓存到JVM的堆空间中。 
org.apache.spark.storage.StorageLevel和py.StorageLevel中的持久化级别;如有必要可以通过在存储级别的末尾加上”_2”来把持久化数据存为两份: 

在Scala中使用persist();

import org.apache.spark.storage.StorageLevel
val result=input.map(x=>x*x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))

在第一次对这个RDD调用行动操作前就调用了persist()方法,persist()调用本身不会触发强制求值。 
如果缓存的数据太多,内存中放不下,Spark会自动利用最近最少使用(LRU)的缓存策略把最老的分区从内存中移除。当然对于使用内存和磁盘缓存级别的分区来说,移除的数据会写如磁盘。 
最后,还可以使用unpersist()方法手动把持久化的RDD从缓存中移除。 
cache()方法,是延迟执行,需要在一个action执行之后,进行缓存RDD。是persist特殊缓存方式。将RDD放入内存中,缓存级别是MEMORY_ONLY

Spark——RDD操作详解相关推荐

  1. Spark RDD 论文详解(三)Spark 编程接口

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  2. Spark RDD 论文详解(一)摘要和介绍

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  3. Spark RDD 论文详解(二)RDDs

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  4. Spark RDD 论文详解(七)讨论

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  5. Spark RDD 论文详解(四)表达 RDDs

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  6. Spark RDD 论文详解(五)实现

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  7. Spark RDD使用详解1--RDD原理

    RDD简介 在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(Resilient Distributed Dataset,RDD).RDD是Spark的最基本抽象,是对分布式内存的抽象使 ...

  8. Spark RDD API详解

    1. RDD简单介绍 RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD.从编程的角度来看,RDD可以简单看成是一个数组.和普通数组的区别是,RDD中的数据是分区存储的,这 ...

  9. Spark RDD使用详解--RDD原理

    RDD简介 在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(Resilient Distributed Dataset,RDD).RDD是Spark的最基本抽象,是对分布式内存的抽象使 ...

最新文章

  1. 零基础入门学习Python(2)
  2. CPU和GPU跑深度学习差别有多大?
  3. 数据结构-冒泡排序过程
  4. 【Android 应用开发】Activity 任务亲和性 taskAffinity 设置 ( taskAffinity 属性 )
  5. linux安全加固(2)
  6. CSLA.Net 3.0.5 版本 教学程序,代码附教学注释
  7. WindowsPhone8可缩放图片控件的实现
  8. 25k英里高速建48个充电走廊,美国电动汽车产业迎来春天
  9. 图像处理-直方图均衡化
  10. PCL1.8.0/ Qt5.7.0开发环境配置
  11. I2C driver编写常用接口
  12. 32位x86处理器操作模式和寄存器简介
  13. 不忘来时路 心系梦归处
  14. SiT3808:1 -80MHz 单端压控振荡器VCXO
  15. 如何定义智慧与关于生活的美好 - 与子同 Yue 001
  16. 关于微信小程序授权登陆之后需要在个人信息页展示信息,如微信头像,昵称这件事
  17. 几款有意思的html游戏推荐(在线云玩+源码)
  18. 2017全国计算机高校排名,全国计算机专业大学排名_2017计算机专业大学排名
  19. 知識は潜在能力、行動は力CommentsAdd Star
  20. 1 STM32F407ZG的简单介绍

热门文章

  1. 计算机考研就业率,21考研同学需谨慎,三个专业就业率持续走低,包括热门计算机专业...
  2. Java开发技术总结!小米java校招面试题
  3. html无法获取图片高宽,如何解决谷歌浏览器下jquery无法获取图片的尺寸
  4. 人工智能ai用什么编程语言_2020年人工智能的5种最佳编程语言
  5. s905l android5,魔百盒M101晶晨S905L处理器专用安卓纯净刷机rom固件
  6. 树莓派(二) adb命令控制手机拨打/接听电话
  7. 远程计算机已加入AAD凭据不工作
  8. Access-Control-Allow-Origin配置报错
  9. 【硬件FPGA 】xilinx_A7调试问题总结(FPGA异常发烫)
  10. Python例题8-3~8-4 T恤