【Sparkstreaming_01】
文章目录
- SparkStreaming
- 1.流处理 /实时计算
- 2.批处理/离线计算
- SparkStreaming简单介绍:
- SparkStreaming数据源:
- 总结:
- SparkStreaming运行(工作原理):
- 流式处理:
- spark程序入口总结:
- idea里创建SparkStreaming代码:
- 1.如何构建DStream
- 如何构建DStream:两种
- 总结: 什么是Receiver:
- 2.转换算子
- sparkstreaming处理数据的方式:
- 1.为什么要指定checkpoint
- 2.checkpoint 目录 生产上 得指定到 hdfs上进行存储
- 2.1 checkpoint 的作用 针对sparkstreaming来说 :
- 需求: wc案例结果写mysql里面
- 1.第一种方式:某个东西没有进行序列化
- 2.正确写法:
- 3.连接池:
- 4.sparksql的方式写出
SparkStreaming
spark 提供的实时计算的模块:SparkStreaming、structuredStreaming
1.流处理 /实时计算
实时:storm、flink (来一条数据处理一条数据 ) event 真正的实时计算
近实时:
SparkStreaming 来一批数据处理一批数据 源源不断的来 mini-batch
flink t0 级 : 实时处理用sql 方式进行开发 structuredStream : 离线去开发实时通过 df api/sql SparkStreaming 双流join :api: flink、ss =》 api SparkStreaming code 很多 =》 api join state 状态+ eventtime+watermaker =》 延迟数据处理: 1.processtime + udf2.eventtime+watermaker 数据和离线 对不上 处理数据的时候 延迟数据 丢了 没有进行处理状态、checkpoint
2.批处理/离线计算
1.一次性处理某一个批次的数据 数据是有始有终的
流处理:
水龙头 数据是远远不断的来 数据没有始终
技术选型:
1.生产上主要占比:
SparkStreaming、structuredStreaming 10% spark
flink 90%
storm 2% 几乎不用
开发角度 :code/sql 处理 实时计算业务角度:1.实时指标:flink和structuredStreaming差不多2.实时数仓:1.代码上 差不多【缺点:不好维护】2.sql文件: flinksql 维护实时数仓好维护
SparkStreaming简单介绍:
1.Spark Streaming is an extension of the core Spark API
sss开发 与sparkcore 算子开发 差不多
2.spark Streaming 数据源:Kafka, Kinesis, or TCP sockets =》 input
3.处理: 算子的方式进行处理 =》 todo
- pushed out to filesystems, databases, and live dashboards. =》 output
SparkStreaming数据源:
kafka **** 流式引擎 + kafka cp
flume 可以使用 一般不用 没有数据缓冲的作用
hdfs 很少使用
tcp sockets =》 测试 +运营商数据(采集数据 )
总结:
建议不要使用flume 缓冲能力很弱 之后数据计算 直接把数据干到 spark里面 会导致 spark计算程序挂掉
SparkStreaming运行(工作原理):
- receives live input data streams 接收数据
2.divides the data into batches 把接收数据 拆分成batches
eg: 按照时间进行拆分
sparkstreaming =》 kafka : 1.5s钟处理一次数据 2.会5s接收的数据切分成batch3.把batch 交给 sparkengine 处理 4.处理完结果 也是 batch sparkstreaming编程模型:DStream
a DStream is represented as a sequence of RDDs.(一个rdd的集合)
sparkcore:rdd
sparksql :ds、df
Stream data =》 按照时间(5s一个批次) data 拆分一个一个的 batch
一个一个的rdd
DStream 就是由一串 rdd构成
流式处理:
对 DStream进行转换操作
实际上就是对 DStream里面的rdd进行操作
对rdd进行操作就是对 rdd里面分区的元素进行操作
spark程序入口总结:
sparkstreaming :StreamingContext
sparkcore: sparkcontext
sparksql: sparksession
idea里创建SparkStreaming代码:
1.引入
org.apache.spark
spark-streaming_2.12
3.2.1
2.构建程序入口
打印的东西:
1.spark处理 当前批次的数据的结果
2.不能处理 累计批次的数据
累计批次:多个批次之间又联系的
1.如何构建DStream
如何构建DStream:两种
1.外部数据源【kafka】
2.高阶算子方式转换
1.Input DStreams 【输入流 kafka】 *****
2.Receivers 【接收流 测试使用 生产上不用】: 为面试准备
并不是所有的接收数据都需要接收器
1.Receivers:底层逐级调用的类 socketTextStream: socketStreamSocketInputDStreammaster =》 local[2] => local[1] code能否处理数据:不能
sparkstreaming: 1 cpu =》 1 core 1.接收流式数据 ok2.流式数据 切分成 batch进行处理 no ok cpu不够 数据没有资源进行处理
sparkstreaming要求:
n > 1 in local mode if you have receivers to get data
master cpu 个数 一定要大于Receiver 数量
总结: 什么是Receiver:
指的就是 ReceiverInputDStream(接收器)
2.转换算子
1.transform ***
2.updateStateByKey
sparkstreaming处理数据的方式:
1.默认仅仅是计算当前批次的数据 只是计算10s一个批次的数据
需求:
统计 从现在时间点开始 b出现的累计次数?
updateStateByKey 用于解决 有状态问题
对于 累计批次的需求? 官方引出一个概念 状态
状态:State: 1.有状态 前后批次有联系的 2.无状态 前后批次是没有联系的
累计批次的需求?
1.updateStateByKey 算子解决
1.Define the state
2.Define the state update function
注意:
3.得指定 The checkpoint directory has not been set
1.为什么要指定checkpoint
1.维护 当前批次和以前的累计批次的数据state
2.checkpoint 目录 生产上 得指定到 hdfs上进行存储
存在问题:
1.checkpoint 每个批次都会产生 文件 =》 hdfs 扛不住 挂掉的风险
2.1 checkpoint 的作用 针对sparkstreaming来说 :
1.作用: 1.为了容错 2.恢复作业【实时计算作业 挂掉之后 可以恢复起来】
2.checkpoint存储的东西:1.Metadata 元数据 Configuration 作业里面配置信息DStream operations 作业code里面的算子操作Incomplete batches 未完成的批次2.Data每个批次里面真正传过来的数据 +stateful(状态)
3.使用场景1.Usage of stateful transformations2.Recovering from failures of the driver running the application(恢复作业)
4.如何正确使用checkpint? 如果你想要 恢复application 需要 正确编写 checkpoint设置代码
注意: checkpoint缺点:1.小文件多2.修改代码程序就用不了【修改业务逻辑代码】
checkpoint 用不了生产上 =》 累计批次指标统计问题 updateStateByKey这个算子 也用不了!!!
那么如何实现 累计批次统计需求? 一: 100%来处理 1.把每个批次数据 写到外部存储 2.然后利用外部存储系统再统计即可二:90%都没有解决 checkpoint 【解决 checkpoint 导致修改代码 报错问题+小文件问题解决】三:面试:
3.输出算子:
1.print
2.foreachRDD =》 db
需求: wc案例结果写mysql里面
1.mysql创建一个表
create table wc(
word varchar(10),
cnt int(10)
);
Serialization stack:
- object not serializable (class: java.lang.Object, value: java.lang.Object@4c03b7b3)
- writeObject data (class: java.util.HashMap)
- object (class java.util.HashMap, {UTF-8=java.lang.Object@4c03b7b3})
- field (class: com.mysql.jdbc.ConnectionImpl, name: charsetConverterMap, type: interface java.util.Map)
- object (class com.mysql.jdbc.JDBC4Connection, com.mysql.jdbc.JDBC4Connection@2d0220cb)
- element of array (index: 0)
1.第一种方式:某个东西没有进行序列化
1.MySQL连接驱动没有进行序列化 【做不了】
2.ClosureCleaner:Closure 闭包的意思
闭包:方法内使用了方法外的变量
2.正确写法:
rdd.foreachPartition{
mysql 连接次数 会减少 rdd有多少个分区 就有多少个连接
}
3.连接池:
如果 rdd.foreachPartition 写数据 存储性能问题: 【一般不用,可以使用!!!】
1.可以使用连接池
2.rdd.coalse =》 减少rdd分区数
4.sparksql的方式写出
【Sparkstreaming_01】相关推荐
- 【CentOS】利用Kubeadm部署Kubernetes (K8s)
[CentOS]利用Kubeadm部署Kubernetes (K8s)[阅读时间:约10分钟] 一.概述 二.系统环境&项目介绍 1.系统环境 2.项目的任务要求 三.具体实验流程 1 系统准 ...
- 【Spring】框架简介
[Spring]框架简介 Spring是什么 Spring是分层的Java SE/EE应用full-stack轻量级开源框架,以IOC(Inverse Of Control:反转控制)和AOP(Asp ...
- 【C#】类——里式转换
类是由面对对象程序设计中产生的,在面向结构的程序设计例如C语言中是没有类这个概念的!C语言中有传值调用和传址调用的两种方式!在c语言中,主方法调用方法,通过传递参数等完成一些操作,其中比较常用的的数据 ...
- 【C#】Out与ref是干什么的?
关于return: 1.最后没有写 return 语句的话,表示程序正常退出 2.不需要返回值时,存在return的作用 例子 void main() {return; //return退出该程序的作 ...
- 【软件工程】RUP与软件开发5大模型
软件开发的5大模型 1.瀑布模型:按照人的思维一步一步的开发下去,如果需求分析得当,每个阶段顺利,结果还不错! 2.快速原型模型:后来人们发现,自己不可能一下子就把所有的需求搞清楚,总是在开发的过程中 ...
- 【VB】学生信息管理系统5——数据库代码
这次学生信息管理系统在代码的理解过程中遇到了一些问题.总结如下: 1. sql server的安装过程各个步骤的意思.在安装SQL Server的时候按照网上的步骤,我觉得这个需要学完整个数据库再返回 ...
- 白化(预处理步骤)【转】
白化(预处理步骤)[转] 介绍 我们已经了解了如何使用PCA降低数据维度.在一些算法中还需要一个与之相关的预处理步骤,这个预处理过程称为白化.举例来说,假设训练数据是图像,由于图像中相邻像素之间具有很 ...
- 【Tensorflow】tf.nn.atrous_conv2d如何实现空洞卷积?膨胀卷积
介绍 关于空洞卷积的理论可以查看以下链接,这里我们不详细讲理论: 1.Long J, Shelhamer E, Darrell T, et al. Fully convolutional networ ...
- 兑换量子计算机,阅读 | 【量子计算机】构造置换量子门
原标题:阅读 | [量子计算机]构造置换量子门 量子计算机的一个基本组成单位叫量子门(quantum gate),下面简单介绍些基本概念. 量子比特和量子态 量子计算机的信息存储单元是一种叫做量子比特 ...
最新文章
- ubuntu 安装Pangolin 过程_余辉亮的学习笔记的博客-CSDN博客_pangolin安装
- 转Java转iOS-第一个项目总结(2):遇到问题和解决方案
- Spring - Java/J2EE Application Framework 应用框架 第 6 章 集成AspectJ
- windows下写的脚本,在linux下执行失败
- 【Python CheckiO 题解】Bigger Price
- Git 安装及 idea 配置 Git
- 技术动态 | 去中心化知识图谱协作平台建设实践
- QQ超极无敌无诚意的回答
- restTemplate重定向问题 cookie问题
- python 核心编程 第一部分
- qlv文件怎么转换成mp4_flv怎么转换成MP4格式
- 【NOIP 2017】宝藏
- 【物理应用】基于Matlab模拟RANS湍流
- 系统重启后接口代理服务器ip地址会变,重启路由器可以IP会变吗
- linux中PATH环境变量的作用和使用方法
- Daily English Dictation Number Three
- openwrt编译ifb.ko模块问题
- div+CSS浏览器兼容问题整理(IE6.0、IE7.0 ,ie8 , FireFox...)
- 正数负数的原码,反码,补码
- 社会网络分析工具—— Gephi 或 NetworkX的简单介绍和比较(源自GPTchat)