sparkStreaming 是一种流处理框架,支持多种数据源和多种输出,是一中微批处理,
主要的数据结构是:DStream 离散数据流,由多个RDD组成,每一个微批都是一个RDD。
Spark Streaming 的入口需要单独创立,因为sparkSession中灭有整合:
创建如下:
val conf=new SparkConf().setMaster(“local[*]”).setAppName(“kgc streaming demo”)
val ssc=new StreamingContext(conf,Seconds(5))
注意:一个jvm中只有一个StreamingContext启动
StreamingContext停止后,不能在启动
使用scala 编写sparkStreaming程序:
Scoket数据源:

//local[n]  其中n要大于接受器的个数
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
//创建一个接收器
val lines = ssc.socketTextStream("localhost", 9999)//指定数据源
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
//开始
ssc.start()
//等待终止信号
ssc.awaitTermination()

sparkStreaming内建的流式数据:文件系统(不与接收器相关联)、Scoket、kafka、Flume等
文件系统数据源:

val sparkConf = new SparkConf().setAppName("HdfsWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(2))// 创建FileInputDStream去读取文件系统上的数据
val lines = ssc.textFileStream("hdfs://hadoop131:9000/data")
//使用空格进行分割每行记录的字符串
val words = lines.flatMap(_.split(" "))
//类似于RDD的编程,将每个单词赋值为1,并进行合并计算
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()

flume数据源
1、push的方式读取数据

val conf: SparkConf = new SparkConf().setAppName("flumedemo").setMaster("local[3]")val ssc = new StreamingContext(conf,Seconds(5))//push 方式  由主机推送数据给sparkStreaming   需要先启动sparkStreamingval flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(ssc,"hadoop131",5678)//flume 作为sparking streaming 的实时数据流  每一条数据是一个event 故此时形成的dStream中的数据是一个一个的event//event 有body  和headerflumeStream.map(x=>new String(x.event.getBody.array()).trim).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()ssc.start()ssc.awaitTermination()

2、poll的方式获取数据

val conf: SparkConf = new SparkConf().setAppName("flumedemo").setMaster("local[3]")val ssc = new StreamingContext(conf,Seconds(5))//poll方式  主动拉取数据,需要先启动flumeval flumeStream=FlumeUtils.createPollingStream(ssc,"hadoop131",5678)flumeStream.map(x=>new String(x.event.getBody.array()).trim).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()ssc.start()ssc.awaitTermination()

kafka数据源

  //设置主函数的参数  第一个是brokers  第二个是topics  可以使用逗号隔开 传入多个topics//sparkStreaming  可以一次性读取 kafka中的多个topic中的数据val Array(brokers, topics) = argsval sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[1]")val ssc = new StreamingContext(sparkConf, Seconds(2))val topicsSet = topics.split(",").toSetval kafkaParams = Map[String, String]("bootstrap.servers" -> brokers)val messages = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](topicsSet,kafkaParams))messages.map(_.value())       // 取出 value.flatMap(_.split(" "))   // 将字符串使用空格分隔.map(word => (word, 1))      // 每个单词映射成一个 pair.reduceByKey(_+_)    // 根据每个 key 进行累加.print()     // 打印前 10 个数据ssc.start()ssc.awaitTermination()

sparkStreaming基础知识整理相关推荐

  1. python常用变量名_python基础知识整理

    Python Python开发 Python语言 python基础知识整理 序言:本文简单介绍python基础知识的一些重要知识点,用于总结复习,每个知识点的具体用法会在后面的博客中一一补充程序: 一 ...

  2. 计算机二级c语基础知识,计算机二级C语基础知识整理.doc

    计算机二级C语基础知识整理 1.1 算法 算法:是一组有穷指令集,是解题方案的准确而完整的描述.通俗地说,算法就是计算机解题的过程.算法不等于程序,也不等于计算方法,程序的编制不可能优于算法的设计. ...

  3. 使用Aspose.Cells的基础知识整理

    使用Aspose.Cells的基础知识整理 转自 http://www.cnblogs.com/kenblove/archive/2009/01/07/1371104.html 这两天用Aspose. ...

  4. 前端基础知识整理汇总(中)

    前端基础知识整理汇总(中) Call, bind, apply实现 // call Function.prototype.myCall = function (context) {context = ...

  5. 前端基础知识整理汇总(上)

    前端基础知识整理汇总(上) HTML页面的生命周期 HTML页面的生命周期有以下三个重要事件: 1.DOMContentLoaded -- 浏览器已经完全加载了 HTML,DOM 树已经构建完毕,但是 ...

  6. centos7创建asm磁盘_Oracle ASM 磁盘组基础知识整理(收藏版)

    为什么要写这么一篇基础知识呢?还是有那么一点点原因的,不是胡编乱造还真是有真实存在的事件的,前两周里因一套生产环境数据库磁盘不足无法对其进行表空间扩容,需要向存储岗申请存储资源,当存储岗划好资源加完存 ...

  7. Web前端基础知识整理

    1. 前端基础知识 文件分类 XML(扩展标记语言) 装载有格式的数据信息,用于各个框架和技术的配置文件描述 特点: 扩展名为.xml 内容区分大小写 标签要成对出现,形成容器,只能有一个 标签按正确 ...

  8. Kali Linux渗透基础知识整理(四):维持访问

    Kali Linux渗透基础知识整理系列文章回顾 维持访问 在获得了目标系统的访问权之后,攻击者需要进一步维持这一访问权限.使用木马程序.后门程序和rootkit来达到这一目的.维持访问是一种艺术形式 ...

  9. 矩阵论(零):线性代数基础知识整理(1)——逆矩阵、(广义)初等变换、满秩分解

    矩阵论专栏:专栏(文章按照顺序排序) 线性代数是矩阵论的先修课程,本篇博客整理线性代数的基础理论知识,为矩阵论的学习做准备.限于篇幅,梳理的重点将在定理和结论上(只给出部分必要的定义),对最基础的概念 ...

  10. CSP-S初赛基础知识整理

    文章目录 CSP-S初赛基础知识整理 RT [1]计算机基础知识 计算机系统的组成 计算机硬件的五大组成 [1-2]进制及其转化和运算 [1-2]二进制 [1]基本定义及应用 [1]基本运算 [2]位 ...

最新文章

  1. CodeIgniter开发实际案例-新闻网站【转】
  2. 生产路由跳转报错找不到js路径问题
  3. 002_FastDFS单机部署
  4. 029_jdbc-mysql二进制数据
  5. haproxy Consistent Hash浅析
  6. Luogu_2774 方格取数问题
  7. 牛客网 --java问答题
  8. Hybris service layer和SAP CRM WebClient UI架构的横向比较
  9. 如何基于Canal 和 Kafka,实现 MySQL 的 Binlog 近实时同步
  10. Linux Shell文本处理工具集锦
  11. 使用swagger作为restful api的doc文档生成
  12. Python 操作 Azure Blob Storage
  13. Intent调用大全
  14. C# Sqlite For WP7
  15. prerenderspaplugin可以抓取动态数据吗_RPA编程思路之数据抓取
  16. c语言点分十进制转化成长整形,点分十进制IP和长整型转换
  17. 一壶浊酒尽余欢、今宵别梦寒!
  18. 星光不负赶路人|2022年终总结
  19. 睡眠是锁定计算机怎么设置密码,笔记本电脑如何设置睡眠唤醒密码?
  20. 【Vue3】电商网站吸顶功能

热门文章

  1. 【机器学习/MachineLearning】相关基本概念2——归纳,演绎,溯因法
  2. Azkaban Flow 2.0的使用
  3. 小不点浏览器 v1.00 官方
  4. LeetCode刷题笔记- 845.数组中的最长山脉
  5. “闽南金三角”——世丰管道福建漳州高级水电工程师会议
  6. Pytorch报错解决:The size of tensor a (4) must match the size of tensor b (3) at non-singleton dimensio
  7. 编译ionic应用时遇到“To run dex in process, the Gradle daemon needs a larger heap.”
  8. android 安卓手机如何投屏到显示器
  9. 计算机课对小学生的作用,小学信息技术课的最重要性
  10. 如何在同一台电脑上打开多个iPhone模拟器