文章目录

  • 1. 弹性分布式数据集RDD
    • 1.1. 什么是RDD
    • 1.2. RDD的属性
    • 1.3. 通过RDD的转换方式对RDD详细解释
    • 1.4. 如何创建RDD
    • 1.5. Transformation和Action详解
    • 1.6. 常用的算子详细解释(==一天搞懂一个算子==)
    • 1.7. RDD的依赖关系和Stage划分
    • 1.8 集群运行原理
    • 1.9. 缓存Cache设置和CheckPoint设置
  • 2. RDD小案例
    • 2.1. 快速输出每个分区中的数据
    • 2.2. 服务器访问日志根据ip地址查询区域
  • 3. [三种任务提交流程standalone、yarn-cluster、yarn-client](https://www.cnblogs.com/lillcol/p/11159114.html)
    • 3.1. standalone模式
    • 3.2. Spark on Yarn
  • 4. 海阔凭鱼跃,天高任鸟飞

1. 弹性分布式数据集RDD

1.1. 什么是RDD

  1. RDD(Resilient Distributed Dataset)叫做分布式数据集,包含了只读的、分区的、分布式计算的概念。
  2. RDD是个类。

1.2. RDD的属性

  1. 一个数据分区的列表(hdfs的所有数据块的位置信息,保存在RDD类成员变量Array中
    protected def getPartitions: Array[Partition]
  2. 保存了在数据块上的计算方法,这个计算方法会应用到每一个数据块上,Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
  3. RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
  4. 一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量
  5. 一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

1.3. 通过RDD的转换方式对RDD详细解释

1.4. 如何创建RDD

  • 通过序列化集合的方式创建RDD(spark中makerdd和parallelize的区别)

    sc.parallelize(1 to 9, 2)
    
  • 由外部存储系统的数据集创建(textFile),包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等

    val rdd2 = sc.textFile("hdfs://mini1:9000/a.txt")
    
  • 通过其他的RDD做transformation操作转换成新的RDD

1.5. Transformation和Action详解

  • RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

  • 官网链接2.4.1版本

  • 博客链接

1.6. 常用的算子详细解释(一天搞懂一个算子)

  • Transformation算子(懒加载

  • Action算子(触发任务的进行

1.7. RDD的依赖关系和Stage划分

  • 宽依赖: 依赖的RDD产生的数据不只是给我的,父RDD不只包含一个子RDD的数据。
  • 窄依赖:依赖的RDD产生的数据只给我自己。父RDD只包含一个子RDD的数据。
  • 血统(Lineage):RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
  • Stage划分

1.8 集群运行原理

1.9. 缓存Cache设置和CheckPoint设置

  • 如何设置cache和checkpoint

    • cache:
      someRdd.cache(): 缓存到内存
      someRdd.persist(StorageLevel.MEMORY_AND_DISK): 根据自己的需要设置缓存的位置(内存和硬盘)
    • CheckPoint:可以把RDD计算后的数据缓存在本地磁盘,也可以是hdfs
      sc.setCheckpointDir(“hdfs://mini1:9000/checkpoint”)
      someRdd.checkpoint()
  • 什么时候设置cache,什么时候设置checkpoint
    • 通常:遇到宽依赖设置checkpoint,窄依赖想缓存的话设置cache
  • cache 和checkpoint的区别
    • cache只是缓存数据,不改变RDD的依赖关系
    • checkpoint是生成了一个新的RDD,后面的RDD依赖关系已经改变
    • RDD发生异常寻找数据的过程:checkpoint --> cache --> 重算

2. RDD小案例

2.1. 快速输出每个分区中的数据

val a = sc.parallelize(1 to 9, 3)
a.glom.collect

2.2. 服务器访问日志根据ip地址查询区域

  • 需求分析

    • 在互联网中,我们经常会见到城市热点图这样的报表数据,例如百度统计中,会统计今年的热门旅游城市、热门报考学校等,会将这样的信息显示在热点图中。因此,我们需要通过日志信息(运营商或者网站自己生成)和城市ip段信息来判断用户的ip段,统计热点经纬度。
  • 数据下载 我的百度云提取码;3rzp
    • 城市ip段信息
    • ip日志信息
  • 源代码
    • gradle文件配置

      plugins {id 'java'id 'scala'
      }version '1.0.0'sourceCompatibility = 1.8
      targetCompatibility = 1.8sourceSets {main {scala {srcDirs = ['src/main/scala', 'src/main/java']}java {srcDirs = []}}test {scala {srcDirs = ['src/test/scala', 'src/test/java']}java {srcDirs = []}}
      }repositories {mavenCentral()maven {url 'http://maven.aliyun.com/nexus/content/groups/public/'}maven {url 'https://maven.ibiblio.org/maven2/'}
      }dependencies {compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.12'       //scala基本库compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.3.2'compile group: 'mysql', name: 'mysql-connector-java', version: '8.0.19'
      }jar {manifest {attributes 'Main-Class': 'com.xiaofan.ip_location.IPLocation'}
      }
      
    • scala源码逻辑

      package com.xiaofan.ip_locationimport java.sql.{Connection, DriverManager, PreparedStatement, Timestamp}import org.apache.spark.broadcast.Broadcast
      import org.apache.spark.rdd.RDD
      import org.apache.spark.{SparkConf, SparkContext}/*** @author xiaofan*/
      object IPLocation {val data2Mysql = (iterator: Iterator[(String, Int)]) => {var conn: Connection = nullvar ps: PreparedStatement = nullval sql = "INSERT INTO location_info (location, counts, access_date) VALUES (?, ?, ?)"try {conn = DriverManager.getConnection("jdbc:mysql://mini1:3306/db_ip_location?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8", "root", "123456")iterator.foreach(line => {ps = conn.prepareStatement(sql)ps.setString(1, line._1)ps.setInt(2, line._2)ps.setTimestamp(3, new Timestamp(System.currentTimeMillis()))ps.executeUpdate()})} catch {case e: Exception => println("Mysql Exception:" + e.toString)} finally {if (ps != null) {ps.close()}if (conn != null) {conn.close()}}}def ip2Long(ip: String): Long = {val fragments: Array[String] = ip.split("[.]")var ipNum = 0Lfor (i <- 0 until fragments.length) {ipNum = fragments(i).toLong | ipNum << 8L}ipNum}def binarySearch(lines: Array[(String, String, String)], ip: Long): Int = {var low = 0var high = lines.length - 1while (low <= high) {val middle = (low + high) / 2if ((ip >= lines(middle)._1.toLong) && (ip <= lines(middle)._2.toLong)) {return middle}if (ip < lines(middle)._1.toLong) {high = middle - 1} else {low = middle + 1}}-1}def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[5]").setAppName("ip_location")val sc = new SparkContext(conf)// 读取基站数据val ipRulesArrayRDD: RDD[(String, String, String)] = sc.textFile("D:\\ip.txt").map(line => {val fields = line.split("\\|")val start_num = fields(2)val end_num = fields(3)val province = fields(6)(start_num, end_num, province)})// 全部的ip映射规则val ipRulesArray: Array[(String, String, String)] = ipRulesArrayRDD.collect()// 广播规则val ipRulesBroadcast: Broadcast[Array[(String, String, String)]] = sc.broadcast(ipRulesArray)// 加载要处理的数据val ipsRDD: RDD[String] = sc.textFile("D:\\20090121000132.394251.http.format").map(line => {val fields = line.split("\\|")fields(1)})// 对数据进行处理,取的结果val result: RDD[(String, Int)] = ipsRDD.map(line => {val ipNum: Long = ip2Long(line)// 注意: 这里传递的是广播的变量值val index: Int = binarySearch(ipRulesBroadcast.value, ipNum)// (ip的起始Num, ip的结束Num, 省份名)val info: (String, String, String) = ipRulesBroadcast.value(index)info}).map(t => (t._3, 1)).reduceByKey(_ + _)// 打印数据result.foreach(println)// 把数据写入mysqlresult.foreachPartition(data2Mysql(_))sc.stop()}
      }
      
    • mysql表逻辑

      • 建表语句

        create table location_info(location varchar(255),counts int unsigned,access_date timestamp
        );
        
      • 表结构

  • 运行结果
  • 打jar包部署到集群进行测试
    • standalone模式提交

      • 脚本

        bin/spark-submit  --master spark://192.168.1.27:7077,192.168.1.28:7077 --executor-memory 512m --total-executor-cores 2 --class com.xiaofan.ip_location.IPLocation /home/hadoop/fanjh/demo001-1.0.0.jar hdfs://cluster/ip_location/in/ip.txt hdfs://cluster/ip_location/in/20090121000132.394251.http.format hdfs://cluster/ip_location/out_3
      • spark控制台运行结果显示



      • hdfs运行结果显示

    • yarn模式提交

      • client客户端模式

        bin/spark-submit  --master yarn --deploy-mode client --executor-memory 512m --total-executor-cores 1 --class com.xiaofan.ip_location.IPLocation /home/hadoop/fanjh/demo001-1.0.0.jar hdfs://cluster/ip_location/in/ip.txt hdfs://cluster/ip_location/in/20090121000132.394251.http.format hdfs://cluster/ip_location/out_6
      • cluster集群模式

        bin/spark-submit  --master yarn --deploy-mode cluster --driver-memory 1g  --executor-memory 512m --total-executor-cores 1 --class com.xiaofan.ip_location.IPLocation /home/hadoop/fanjh/demo001-1.0.0.jar hdfs://cluster/ip_location/in/ip.txt hdfs://cluster/ip_location/in/20090121000132.394251.http.format hdfs://cluster/ip_location/out_6
      • yarn的client和cluster模式区别

  • iplocation(ip的热力图)注意点
    • 广播变量: 共享的内存,只读的,只能追加; Spark Broadcast 原理链接
    • foreachPartition: 对每个分区的数据进行操作,可以在分区操作的时候创建外部链接(jedis, mysql, hbase)

3. 三种任务提交流程standalone、yarn-cluster、yarn-client

3.1. standalone模式

  • 任务提交流程

    • spark-submit 提交任务给 Master
    • Master 收到任务请求后通过 LaunchDriver 向 Worker 请求启动 Driver
    • Worker 收到请求后启动 Driver
    • Driver 启动后向 Master 注册(用户App信息)
    • Master 收到 App 信息后根据资源的情况向 Worker 发送 launchExecutor 启动 Excutor
    • Worker 收到 Master 的请求后启动相应的 Excutor
    • Excutor 启动后负责与 Driver 通信, 执行相关任务

3.2. Spark on Yarn

  • Cluster集群模式

    • 作业提交流程

      • 由client向RM提交请求,并上传jar到HDFS上(这期间包括四个步骤:)

        • 连接到RM
        • 从 RM ASM(Applications Manager )中获得metric、queue和resource等信息
        • 上传 app jar and spark-assembly jar
        • 设置运行环境和container上下文(launch-container.sh等脚本)
      • ASM 向 Scheduler 申请空闲 container
      • Scheduler 向 ASM 返回空闲 container 信息(NM 等)
      • RM(ASM)根据返回信息向 NM 申请资源。
      • NM 分配创建一个container 并创建Spark Application Master(AM),此时 AM 上运行的是 Spark Driver。(每个SparkContext都有一个 AM)
      • AM启动后,和RM(ASM)通讯,请求根据任务信息向RM(ASM)申请 container 来启动 executor
      • RM(ASM)将申请到的资源信息返回给AM
      • AM 根据返回的资源信息区请求对应的 NM 分配 container 来启动 executor
      • NM 收到请求会启动相应的 container 并启动 executor
      • executor 启动成后 反向向 AM 注册
      • executor 和 AM 交互 完成任务
      • 后续的DAGScheduler、TaskScheduler、Shuffle等操作都是和standalone一样
      • 等到所有的任务执行完毕后,AM 向 ASM 取消注册并释放资源
  • Client客户端模式

    • 在yarn-client模式下,Driver运行在Client上,通过ApplicationMaster向RM获取资源。本地Driver负责与所有的executor container进行交互,并将最后的结果汇总。整体的过程与yarn-cluster类似。
    • 不同点在于 Driver 是运行在本地客户端,它的 AM 只是作为一个 Executor 启动器,并没有 Driver 进程。而且 Executor启动后是与 Client 端的 Driver 进行交互的,所以 Client 如果挂了 任务也就挂了。
    • 在yarn-client、yarn-cluster 提交模式中,可以不启动Spark集群,应为相关的jvm环境有yarn管理(启动、结束等)。standalone 提交模式中 Spark 集群一定要启动,因为需要依赖worker、Master进行任务的启动、调度等。

4. 海阔凭鱼跃,天高任鸟飞

8. spark学习之旅(二)相关推荐

  1. HALCON学习之旅(二)

    HALCON学习之旅(二) 文章目录 HALCON学习之旅(二) 1.HALCON用户界面操作符 2.HACLON基础语法 ①.运算符 ②.Tuple数组 ③.字符数字格式化 1.HALCON用户界面 ...

  2. SSE指令集学习之旅(二)

    SSE指令集学习之旅(二) 文章目录 SSE指令集学习之旅(二) 1.BGR->GRAY 2.summarize(归纳总结) 1.BGR->GRAY 知识来源:SSE图像算法优化系列一 代 ...

  3. 滴滴Booster移动APP质量优化框架 学习之旅 二

    推荐阅读: 滴滴Booster移动App质量优化框架-学习之旅 一 Android 模块Api化演练 不一样视角的Glide剖析(一) 续写滴滴Booster移动APP质量优化框架学习之旅,上篇文章分 ...

  4. 我的Go语言学习之旅二:入门初体验 Hello World

    好吧,所有的程序员们都已经习惯了,学习任何一门语言,我们都会以Hello World实例开始我们的学习,我也不例外.先来一个简单的例子 打开编辑器 (可以用记事本,我已经习惯 Notepad++了)输 ...

  5. Spring学习之旅(二) AOP(面向切面编程)的使用

    辛苦堆砌,转载请注明出处,谢谢! 上一篇说了Spring的依赖注入,今天再看看Spring的AOP,牵扯的AOP的理论知识,大家可以搜索一些文章了解一下,这里不做过多解释,本文主要介绍使用Spring ...

  6. dotnet Core学习之旅(二):安装IDE

    [重要:文中所有外链不能确保永久有效] >开发工具 高效的开发必然需要一个优秀的集成开发环境(IDE) 对于.NET Core 2.x可以使用包括但不限于以下IDE来进行开发. Visual S ...

  7. 滴滴Booster移动APP质量优化框架 学习之旅 三

    推荐阅读: 滴滴Booster移动App质量优化框架-学习之旅 一 Android 模块Api化演练 不一样视角的Glide剖析(一) 滴滴Booster移动App质量优化框架-学习之旅 二对重复资源 ...

  8. SPARK学习之 --- eclipse / sbt / scala 配置

    为什么80%的码农都做不了架构师?>>>    工作以及兴趣所致,开始了spark学习之旅,浏览网上大牛们的博客 文章,并且结合官网docs,刚开始云里雾里,现在也能雾里看到点花了. ...

  9. Hadoop学习之旅三:MapReduce

    MapReduce编程模型 在Google的一篇重要的论文MapReduce: Simplified Data Processing on Large Clusters中提到,Google公司有大量的 ...

最新文章

  1. 讲透彻什么是Linux零拷贝?
  2. 是不是“异常”让我的脑子糊涂了?
  3. Python:打印目录下最大的十个文件
  4. epoll 的accept , read, write
  5. Google发布Anthos:Google背书,宣告多集群多云Kubernetes时代已来
  6. 下一代对话系统中的关键技术(上篇)
  7. Android之ActivityManage长用方法总结
  8. Flask扩展系列(八)–用户会话管理
  9. 设计模式学习笔记——单例(Singleton)模式
  10. 构造AlertDialog弹出框,根据量值实现然用户选择列表。
  11. 梅花传播业大展:Focussend将精准营销融入个性化邮件
  12. 计算机cpu天体图,电脑cpu天梯图2019|最新Intel/AMD处理器性能排行2019
  13. 日本警方称地震造成3676人死亡7843人下落不明
  14. IDA PRO 静态反汇编与OllyDbg动态调试实战技巧汇总
  15. 使用EasySysprep 4和ghost来封装、备份及还原Windows操作系统
  16. 免费收录网站的搜索引擎登录口大全
  17. 二级计算机vb答案,计算机二级VB考试练习题及答案
  18. 使用IDEA整合SpringMVC和Mybatis(SSM框架)(二)
  19. Linux学习笔记(一)
  20. 南大通用事务型数据库GBase 8s斩获2020年中国国际金融展“金鼎奖”

热门文章

  1. 安卓逆向:重温Thumb汇编指令细节
  2. Android中自定义弧形的seekbar
  3. 典型的FPGA芯片有哪些,看完这篇文章就知道了
  4. Denouncing Mafia
  5. erp系统服务器性能指标,ERP系统体检的三大关键指标
  6. 麦克风声源定位原理_使用麦克风阵列对声源定位的方法
  7. [行路难]——开始漫漫编程路
  8. vivo商城促销系统架构设计与实践-概览篇
  9. PAT-Day1-挖掘机技术哪家强
  10. [转载] Java是剑客-飘逸;.NET是刀客-霸道 (一)