参考文章:http://blog.csdn.net/dabokele/article/details/52602412

 根据Spark官方文档中的描述,在Spark Streaming应用中,一个DStream对象可以调用多种操作,主要分为以下几类

  • Transformations
  • Window Operations
  • Join Operations
  • Output Operations

一、Transformations

1、map(func)

  map操作需要传入一个函数当做参数,具体调用形式为

val b = a.map(func)

  主要作用是,对DStream对象a,将func函数作用到a中的每一个元素上并生成新的元素,得到的DStream对象b中包含这些新的元素。 
  下面示例代码的作用是,在接收到的一行消息后面拼接一个”_NEW”字符串

val linesNew = lines.map(lines => lines + "_NEW" )

  程序运行结果如下: 
   
  注意与接下来的flatMap操作进行比较。

2、flatMap(func)

  类似于上面的map操作,具体调用形式为

val b = a.flatMap(func)

  主要作用是,对DStream对象a,将func函数作用到a中的每一个元素上并生成0个或多个新的元素,得到的DStream对象b中包含这些新的元素。

  下面示例代码的作用是,在接收到的一行消息lines后,将lines根据空格进行分割,分割成若干个单词

val words = lines.flatMap(_.split( " " ))

  结果如下: 
  

3、 filter(func)

  filter传入一个func函数,具体调用形式为

val b = a.filter(func)

  对DStream a中的每一个元素,应用func方法进行计算,如果func函数返回结果为true,则保留该元素,否则丢弃该元素,返回一个新的DStream b。

  下面示例代码中,对words进行判断,去除hello这个单词。

val filterWords = words.filter(_ != "hello" )

  结果如下: 
  

4、union(otherStream)

  这个操作将两个DStream进行合并,生成一个包含着两个DStream中所有元素的新DStream对象。 
  下面代码,首先将输入的每一个单词后面分别拼接“_one”和“_two”,最后将这两个DStream合并成一个新的DStream

val wordsOne = words.map(_ + "_one" )
val wordsTwo = words.map(_ + "_two" )
val unionWords = wordsOne.union(wordsTwo) wordsOne.print() wordsTwo.print() unionWords.print()

  运行结果如下: 
  

5、count()

  统计DStream中每个RDD包含的元素的个数,得到一个新的DStream,这个DStream中只包含一个元素,这个元素是对应语句单词统计数值。 
  以下代码,统计每一行中的单词数

val wordsCount = words.count()

  运行结果如下,一行输入4个单词,打印的结果也为4。 
  

6、reduce(func)

  返回一个包含一个元素的DStream,传入的func方法会作用在调用者的每一个元素上,将其中的元素顺次的两两进行计算。 
  下面的代码,将每一个单词用"-"符号进行拼接

val reduceWords = words.reduce(_ + "-" + _)
  • 1
  • 1

  运行结果如下: 
  

7、countByValue()

  某个DStream中的元素类型为K,调用这个方法后,返回的DStream的元素为(K, Long)对,后面这个Long值是原DStream中每个RDD元素key出现的频率。 
  以下代码统计words中不同单词的个数

val countByValueWords = words.countByValue()

  结果如下: 
  

8、reduceByKey(func, [numTasks])

  调用这个操作的DStream是以(K, V)的形式出现,返回一个新的元素格式为(K, V)的DStream。返回结果中,K为原来的K,V是由K经过传入func计算得到的。还可以传入一个并行计算的参数,在local模式下,默认为2。在其他模式下,默认值由参数spark.default.parallelism确定。 
  下面代码将words转化成(word, 1)的形式,再以单词为key,个数为value,进行word count。

val pairs = words.map(word => (word , 1))
val wordCounts = pairs.reduceByKey(_ + _)

  结果如下, 
  

9、join(otherStream, [numTasks])

  由一个DStream对象调用该方法,元素内容为(k, V),传入另一个DStream对象,元素内容为(k, W),返回的DStream中包含的内容是(k, (V, W))。这个方法也可以传入一个并行计算的参数,该参数与reduceByKey中是相同的。 
  下面代码中,首先将words转化成(word, (word + "_one"))(word, (word + "_two"))的形式,再以word为key,将后面的value合并到一起。

val wordsOne = words.map(word => (word , word + "_one" )) val wordsTwo = words.map(word => (word , word + "_two" )) val joinWords = wordsOne.join(wordsTwo)
  • 1
  • 2
  • 3
  • 1
  • 2
  • 3

  运行结果如下: 
  

10、cogroup(otherStream, [numTasks])

  由一个DStream对象调用该方法,元素内容为(k, V),传入另一个DStream对象,元素内容为(k, W),返回的DStream中包含的内容是(k, (Seq[V], Seq[W]))。这个方法也可以传入一个并行计算的参数,该参数与reduceByKey中是相同的。 
 下面代码首先将words转化成(word, (word + "_one"))(word, (word + "_two"))的形式,再以word为key,将后面的value合并到一起。 
 结果如下: 
 

11、transform(func)

  在Spark-Streaming官方文档中提到,DStream的transform操作极大的丰富了DStream上能够进行的操作内容。使用transform操作后,除了可以使用DStream提供的一些转换方法之外,还能够直接调用任意的调用RDD上的操作函数。 
  比如下面的代码中,使用transform完成将一行语句分割成单词的功能。

val words = lines.transform(rdd =>rdd.flatMap(_.split(" ")))
  • 1
  • 2
  • 3
  • 1
  • 2
  • 3

  运行结果如下: 
  

12、updateStateByKey(func)

二、Window Operations

  我觉得用一个成语,管中窥豹,基本上就能够很形象的解释什么是窗口函数了。DStream数据流就是那只豹子,窗口就是那个管,以一个固定的速率平移,就能够每次看到豹的一部分。 
  窗口函数,就是在DStream流上,以一个可配置的长度为窗口,以一个可配置的速率向前移动窗口,根据窗口函数的具体内容,分别对当前窗口中的这一波数据采取某个对应的操作算子。需要注意的是窗口长度,和窗口移动速率需要是batch time的整数倍。接下来演示Spark Streaming中提供的主要窗口函数。

1、window(windowLength, slideInterval)

  该操作由一个DStream对象调用,传入一个窗口长度参数,一个窗口移动速率参数,然后将当前时刻当前长度窗口中的元素取出形成一个新的DStream。 
  下面的代码以长度为3,移动速率为1截取源DStream中的元素形成新的DStream。

val windowWords = words.window(Seconds( 3 ), Seconds( 1))
  • 1
  • 1

  运行结果如下: 
   
  基本上每秒输入一个字母,然后取出当前时刻3秒这个长度中的所有元素,打印出来。从上面的截图中可以看到,下一秒时已经看不到a了,再下一秒,已经看不到b和c了。表示a, b, c已经不在当前的窗口中。

2、 countByWindow(windowLength,slideInterval)

  返回指定长度窗口中的元素个数。 
  代码如下,统计当前3秒长度的时间窗口的DStream中元素的个数:

val windowWords = words.countByWindow(Seconds( 3 ), Seconds( 1))
  • 1
  • 1

  结果如下: 
  

3、 reduceByWindow(func, windowLength,slideInterval)

  类似于上面的reduce操作,只不过这里不再是对整个调用DStream进行reduce操作,而是在调用DStream上首先取窗口函数的元素形成新的DStream,然后在窗口元素形成的DStream上进行reduce。 
  代码如下:

val windowWords = words.reduceByWindow(_ + "-" + _, Seconds( 3) , Seconds( 1 ))
  • 1
  • 1

  结果如下: 
  

4、 reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])

  调用该操作的DStream中的元素格式为(k, v),整个操作类似于前面的reduceByKey,只不过对应的数据源不同,reduceByKeyAndWindow的数据源是基于该DStream的窗口长度中的所有数据。该操作也有一个可选的并发数参数。 
  下面代码中,将当前长度为3的时间窗口中的所有数据元素根据key进行合并,统计当前3秒中内不同单词出现的次数。

val windowWords = pairs.reduceByKeyAndWindow((a:Int , b:Int) => (a + b) , Seconds(3 ) , Seconds( 1 ))
  • 1
  • 1

  结果如下: 
  

5、 reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks])

  这个窗口操作和上一个的区别是多传入一个函数invFunc。前面的func作用和上一个reduceByKeyAndWindow相同,后面的invFunc是用于处理流出rdd的。 
  在下面这个例子中,如果把3秒的时间窗口当成一个池塘,池塘每一秒都会有鱼游进或者游出,那么第一个函数表示每由进来一条鱼,就在该类鱼的数量上累加。而第二个函数是,每由出去一条鱼,就将该鱼的总数减去一。

val windowWords = pairs.reduceByKeyAndWindow((a: Int, b:Int ) => (a + b) , (a:Int, b: Int) => (a - b) , Seconds( 3 ), Seconds( 1 ))
  • 1
  • 1

  下面是演示结果,最终的结果是该3秒长度的窗口中历史上出现过的所有不同单词个数都为0。 
   
  一段时间不输入任何信息,看一下最终结果 
  

6、 countByValueAndWindow(windowLength,slideInterval, [numTasks])

  类似于前面的countByValue操作,调用该操作的DStream数据格式为(K, v),返回的DStream格式为(K, Long)。统计当前时间窗口中元素值相同的元素的个数。 
  代码如下

val windowWords = words.countByValueAndWindow(Seconds( 3 ), Seconds( 1))
  • 1
  • 1

  结果如下 
  

三、Join Operations

  Join主要可分为两种,

1、DStream对象之间的Join

  这种join一般应用于窗口函数形成的DStream对象之间,具体可以参考第一部分中的join操作,除了简单的join之外,还有leftOuterJoin, rightOuterJoin和fullOuterJoin。

2、DStream和dataset之间的join

  这一种join,可以参考前面transform操作中的示例。

四、Output Operations

  在Spark Streaming中,DStream的输出操作才是DStream上所有transformations的真正触发计算点,这个类似于RDD中的action操作。经过输出操作DStream中的数据才能与外部进行交互,比如将数据写入文件系统、数据库,或其他应用中。   
  

1、print()

  print操作会将DStream每一个batch中的前10个元素在driver节点打印出来。 
  看下面这个示例,一行输入超过10个单词,然后将这行语句分割成单个单词的DStream。

val words = lines.flatMap(_.split(" "))
words.print()
  • 1
  • 2
  • 1
  • 2

  看看print后的效果。 
   
  

2、saveAsTextFiles(prefix, [suffix])

  这个操作可以将DStream中的内容保存为text文件,每个batch的数据单独保存为一个文夹,文件夹名前缀参数必须传入,文件夹名后缀参数可选,最终文件夹名称的完整形式为prefix-TIME_IN_MS[.suffix] 
  比如下面这一行代码

lines.saveAsTextFiles("satf", ".txt")
  • 1
  • 1

  看一下执行结果,在当前项目路径下,每秒钟生成一个文件夹,打开的两个窗口中的内容分别是nc窗口中的输入。 
   
  另外,如果前缀中包含文件完整路径,则该text文件夹会建在指定路径下,如下图所示 
   
  

3、saveAsObjectFiles(prefix, [suffix])

  这个操作和前面一个类似,只不过这里将DStream中的内容保存为SequenceFile文件类型,这个文件中保存的数据都是经过序列化后的Java对象。 
  实验略过,可参考前面一个操作。 
  

4、saveAsHadoopFiles(prefix, [suffix])

  这个操作和前两个类似,将DStream每一batch中的内容保存到HDFS上,同样可以指定文件的前缀和后缀。 
  

5、foreachRDD(func)

转载于:https://www.cnblogs.com/liugh/articles/6906522.html

Spark Streaming中的操作函数分析相关推荐

  1. 通过Spark Streaming的window操作实战模拟热点搜索词案例实战

    本博文主要内容包括: 1.在线热点搜索词实现解析 2.SparkStreaming 利用reduceByKeyAndWindow实现在线热点搜索词实战 一:在线热点搜索词实现解析 背景描述:在社交网络 ...

  2. Spark Streaming的窗口操作

    2019独角兽企业重金招聘Python工程师标准>>> Spark Streaming的窗口操作 博客分类: spark Spark Streaming的Window Operati ...

  3. Spark Streaming的IDEA操作在spark操作的差别和解决

    Spark Streaming的IDEA操作 博客https://blog.csdn.net/qq_43688472/article/details/86499291 这里就不重复操作了 [hadoo ...

  4. DStream实战之Spark Streaming整合fulme实战, Flume向Spark Streaming中push推数据 36

    前言 本文所需要的安装包&Flume配置文件,博主都已上传,链接为本文涉及安装包&Flume配置文件本文涉及的安装包&Flume配置文件,请自行下载~ flume作为日志实时采 ...

  5. Spark Streaming Direct Approach (No Receivers) 分析

    前言 这个算是Spark Streaming 接收数据相关的第三篇文章了. 前面两篇是: Spark Streaming 数据产生与导入相关的内存分析 Spark Streaming 数据接收优化 S ...

  6. Spark GraphX 中的 pregel函数(转载)

    文章目录 pregel函数源码 与 各个参数介绍: 案例: 求顶点5 到 其他各顶点的 最短距离 pregel原理分析 一篇关于 Spark GraphX 中 pregel函数 的笔记,通过一个小案例 ...

  7. RTEMS 4.9.4 bootcard.c 中的 boot_card 函数分析

    RTEMS 4.9.4 进入 boot_card 函数即开始全面的初始化. boot_card函数是~RTEMS~提供的一个通用初始化框架,不论在何种处理器上,都是使用这个框架为系统做初始化的工作. ...

  8. Python中字符串操作函数string.split('str1')和string.join(ls)

    Python中的字符串操作函数split 和 join能够实现字符串和列表之间的简单转换, 使用 .split()可以将字符串中特定部分以多个字符的形式,存储成列表 1 def split(self, ...

  9. dz3.0数据库操作函数分析说明

    开 发dz3.0插件的朋友都知道,在开发插件中,大部分插件都要涉及到数据库的操作,而官方提醒开发插件要尽量用官方提供的数据库操作函数,但是我在网上找 了很多资料,都没有看到数据库操作函数的一些具体说明 ...

最新文章

  1. 使用 xcworkspace 管理 iOS 工程
  2. 3.2.4 控制图层显示的范围
  3. 在 Azure 中的 Linux VM 上创建 MongoDB、Express、AngularJS 和 Node.js (MEAN) 堆栈
  4. 快速多尺度人脸检测--Multi-Scale Fully Convolutional Network for Fast Face Detection
  5. 开源大佬面对面:InsightFace带你打造开箱即用的人脸识别系统
  6. Boost:libbz2.dll测试程序
  7. 获取数组中元素值为偶数的累加和与元素值为奇数的累加和,并计算他们之间的差值
  8. python如何导入numpy简书_Webpack 之常用配置(一)
  9. Spring Boot 核心原理与源码解析 - 目录
  10. 使用BERT进行跨领域情感分析
  11. 【语音去噪】基于matlab先验信噪比的维纳滤波算法语音去噪【含Matlab源码 572期】
  12. 谷歌地球(Google Earth) 7.3.2.5776
  13. 思约云音乐 (绿色免费版)
  14. java发包工具_小米范工具系列之四:小米范HTTP批量发包器
  15. python 进化树_7款物种分类(进化树地位)信息检索工具使用方法
  16. 花1分钟用Word手动绘制流程图,看完我学会了!
  17. 社区团购小程序有哪些赚钱方式
  18. 通用的产品功能设计方法
  19. 一场安防IPC模组方案发布会引发的吐槽风波与反思
  20. 基于nodejs的二手物物交换平台【毕业设计源码】

热门文章

  1. 深度学习与计算机视觉:基于Python的神经网络的实现
  2. mysql 常用操作命令
  3. eclipse修改默认工作空间
  4. filter 在CSS用的效果
  5. 酒桌游戏c语言,最受欢迎的12种酒桌游戏
  6. 用JTAG将mcs文件烧写到flash中
  7. ajax 阻止默认提交,jQuery验证插件:在对ajax调用servlet时,submitHandler不会阻止默认提交-返回false无效...
  8. 如何完全卸载VMware
  9. numpy和scipy安装
  10. leetcode算法第7题