今天来详细说说spark中的一个比较底层的算子combineByKey。

熟悉spark的朋友应该知道,spark里面有很多类型的算子,有些比较基础,什么map,filter,可能看一眼就会了,有些稍微复杂点,但是也不难懂,看看算子解释,给一行代码也能懂,如reducebykey。还有一些较为晦涩难懂,像今天这个,因为算子的输入可能就好几项,理解起来比较困难,日常普通的数据处理可能根本就不会使用,若不是碰到特别复杂的业务场景以及数据问题处理,你可能根本想不起它。像我也是使用spark好几年,前几年基本不熟悉combineByKey,近一年改造业务中的特征系统,碰到很多问题, 既要考虑大数据,又要兼顾性能最优,普通的逻辑虽然能够实现,但是已经无法满足各方面要求了,通常此时就是这种底层算子大显神威了。并且当你用这个算子能较好较快的实现以前绞尽脑汁想要实现功能时,你会发现原来它是多么的牛逼。

话不多说,理解这个算子之前先来简单复习一下大数据计算中的几个通俗概念与问题。

  • 问题1,大数据大的原因:

首先我们要处理的对象是海量数据,往往以T(1T=1024G=1024*1024M)为单位,几个T几百个T的数据,那么显然1台机器装不下这么多数据,需要分布在n台机器上,比如说1000台吧。机器与机器之间的数据传输通常就比较慢了,只能通过数据io传输,好比你拷贝数据到u盘一样,比起单台电脑从c盘复制数据到d盘比,数据拷贝到外部u盘则慢得多了多。好了这里只是举个大概的例子,要理解的重要就2点:
(1)数据量海量,必须存储在n台机器上,没有这个特性,还要什么大数据框架;
(2)数据如果在机器与机器间进行传输是很慢的,这里是相对机器自己内部传输的速度。

  • 问题2:spark中的算子逻辑:

有了刚才对于大数据在存储与计算上的理解,顺着这个理解来大概说说spark中所有的算子大概有两种底层逻辑。
逻辑(1):对于不需要跨机器交互数据就能处理的逻辑,也就是说一个操作不用跨机器就能完成,那么这样的操作通常就是非常快速的,比如对一个数据乘以2操作。这个操作显然不用跨机器,每台机器上自己的数据自己就能完成。
逻辑(2):需要进行跨机器交互处理。比如说要统计一个海量文本数据中出现了名字包含“张三”的次数。首先海量文本就存在n台机器上,其次就是要统计所有机器上包含“张三”的次数和。那么显然要完成这整个操作,必然存在多机器的数据交互才能得到最后的累加结果。
这是任何大数据处理任务中必备的两个底层逻辑。

  • 问题3,combineByKey数据的处理对象:

spark中该算子处理的元数据对象是(k,v)形式的数据,当然kv不光可以是一个数或者字符串,也可以是数组、对象等;spark中(k,v)对可以说是一种比较基础的数据结构,所以也会有好多算子都是以这种形式的数据为输入的,如reducebykey等。那么为什么kv形式的数据在spark中这么常见呢?试想一下,spark里面很多聚合操作,那么就需要以某个key进行聚合,这样自然而然就衍生了大量的以kv进行算子。

先用一个例子引入:

还是以上述的统计某个姓名出现次数为例,我们假设将数据极端一点,现在有1000台机器,分布着1000份数据,每份数据的结构一样:(姓名,1)。如(张三,1),(李四,1)等等。但是不同key的数据是严重不均衡的,假设每台机器都是1T的(张三,1),1M的(李四,1)。那么要怎么统计所有机器上不同姓名出现的次数呢?

方法1:按照key进行聚合,即将名字相同的数据全部汇聚到一台机器上,然后进行统计数量;那么这样会有一个经典问题:某些key的数据巨大,如这里的“张三”作为key,99%都是这个数据,那么就不可能汇聚到一个机器上去计算。很多spark任务失败的原因也是因为某些key的数据巨大,同时又要做聚合到一台机器上的操作导致的。

方法2:那么既然方法1无法实现,一个简单的思路是先单机器单独计算,然后再汇合。在这里就是1000台机器各自先单独统计各自机器上的不同姓名的数量,如只对key=张三,机器1可以得到(张三,998),机器2可以得到(张三,335)等等,然后再将(张三,998),(张三,335)等1000个数汇聚到一台机器上,这样就不会出现海量数据汇聚到一台机器上的问题。这样对每个key都这么操作,即在单机器上分别各自统计,然后多机器间的结果再汇合统计。这也是分布式处理的精髓之处:先分处理,再合并处理。
而上述方法2的整个过程,其实就是reducebykey算子的过程。

为什么一定要有combineByKey?

那么是不是什么问题都用reducebykey就可以了呢?其实不是。reducebykey的使用是有条件的,即对同一个key来说,只有当机器内的计算规则和机器间的计算规则相同的时候才行。比如上面的例子,单个机器内是求次数和,多个机器间,也是求次数和,所以他们的规则是一样的,但凡不一样就不行。

举个复杂点的例子:还是1000台机器,分布着1000份数据,数据的结构变一下:(姓名,分数)。如(张三,80),(李四,97)等等。数据分布依然不均衡,张三占据着99%的数据,且每台机器的数据依然很大。现在问题是求出不同姓名的平均分。

1分钟单独可以想一下怎么办。

分析发现,首先对于每个key需要得到对应的总次数,其次需要得到总分数,然后总分数/总次数才是平均数。那么怎么办的?首选一个reducebykey无法搞定,你没法找到一个通用的函数,即可以单机器上计算也可以多机器上计算并得出正确结果的。如果非要用reducebykey不是不可以。

这就是方法1: 先用一个reducebykey用来统计不同key的总次数,这就退化为上面的例子了。再对这份数据,用一个reducebykey来计算不同key各自的和。然后就能求出不同key的平均数了。不过显然方法1虽然好理解,但是过程多。
方法2:使用combineByKey。
另外一个方法使用一个combineByKey搞定。首先我们来看一下combineByKey的函数定义:

def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)}

注意combineByKey处理的数据对象是(k,v)对,以k为聚合,对v进行处理的。
那么这个函数有三个子处理函数:

  • createCombiner: V => C ,即将一个v变换成另外一种数据结构c;比如把1变成[1,1],即一个数变成一个数组。此过程在单机器内变,很好理解,单纯的数据结构变化,无需其他数据参与。
  • mergeValue: (C, V) => C,这个操作是在单机器分区内操作。即将一个结构为c的数和v进行合并,并也形成一个结构为c的数。比如[1,1] 和 1经过某个函数处理,变成[2,2];重要的是,这一步完全是在单机器分区内进行。
  • mergeCombiners: (C, C) => C,这个操作则完全是对不同分区进行操作。即将不同分区结构为c的数据经过函数变化成另外一个c。比如[1,1]和[2,2]经过变化形成[3,3]。
    至此,整个函数处理完后,最终形成一个[k, c]的数据结构。
    基于此我们重写上面求平均数代码:
object combineKeyTest {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[*]")conf.setAppName("CombineByKeyJob")val sc = new SparkContext(conf)var rdd = sc.parallelize(Array(("A",100.0),("A",80.3),("A",380.3),("A",50.2), ("B",100.3),("B",80.1),("B",380.45)))val collect: Array[(String, (Double, Int))] = rdd.combineByKey((v: Double) => (v, 1), // (分数, 出现次数1)(c: (Double, Int), v: Double) => (c._1 + v, c._2 + 1), // 同一分区内, (分数和,次数+1)(c1: (Double, Int), c2: (Double, Int)) => (c1._1 + c2._1, c1._2 + c2._2) // 分区间,(分数和,次数和)).collect()val mean_data = collect.map(x => x._1 -> x._2._1 / x._2._2) // [key, 分数和 / 次数和]mean_data.foreach(println)}
}

解释一下核心的三个步骤:
首先输入是(k,v)进函数,此时进入处理的只是v,k默认已经被分组了。

  • 步骤1: 将v进行变换,形成一个(分数和,记录次数)的结构c。因为第一步每次只对一个v操作,所以次数就默认是个1
  • 步骤2: 将一个c和一个v进行合并。很显然分数和与分数相加,而次数多了一次,所以加1.
  • 步骤3: 将一个c与c合并,此时分区内的合并了,分数加分数,次数加次数。

那么最终我们得到了的结果进行简单变换,就是不同key的平均数了。
至此仔细体会下上述三个过程,分区内先自己计算,减少海量数据,然后才进行分区间的计算。
而我们常用的reducebykey,底层其实就是combineByKey实现,先分区内合并计算,再分区间合并计算,要不然reducebykey能这么快且不怎么出现内存不够的问题。只不过reducebykey这里,底层的步骤2与步骤3的变换函数是一样的罢了。

实际生产中,对于步骤2与步骤3中的两个函数,通常也更复杂。大逻辑就是尽可能的先分区内计算减少数据量,然后再分区间合并,并最终完成任务要求,而combineByKey的三个逻辑则是完完全全将这个过程体现出来,如果完全理解了这个算子的逻辑,我想以后不会再出现因为数据量大等原因发生错误的时候了,并且也能让你的spark任务运算性能更好。

深入理解spark高阶算子combineByKey相关推荐

  1. react组件类型及深入理解react高阶组件

    React中常见的组件类型及分类: 1.展示组件(Presentational component) 与 容器组件(Container component) 2.类组件(Class component ...

  2. 通俗理解 React 高阶函数

    定义:高阶组件就是一个函数,且该函数接受一个组件作为参数,并返回一个新的组件. A higher-order component is a function that takes a componen ...

  3. 深入理解 React 高阶组件

    在目前的前端社区,『推崇组合,不推荐继承(prefer composition than inheritance)』已经成为了比较好的实践,mixin 也因为自身的一些问题而渐渐不被推荐.高阶组件(H ...

  4. [react] 写一个react的高阶组件并说明你对高阶组件的理解

    [react] 写一个react的高阶组件并说明你对高阶组件的理解 定义高阶组件 import React, { Component } from 'react';const simpleHoc = ...

  5. Koltin 高阶函数

    高阶函数 高阶函数是将函数用作参数或返回值的函数. 在java中函数和方法是同一个概念, 我就把高阶函数理解为:高阶函数是将方法用作参数或返回值的方法, java中我们要调用方法里面的参数一般需要实现 ...

  6. React高阶组件探究

    React高阶组件探究 在使用React构建项目的过程中,经常会碰到在不同的组件中需要用到相同功能的情况.不过我们知道,去这些组件中到处编写同样的代码并不优雅. 在以往,为了解决以上的情况,我们会用到 ...

  7. React(精读官方文档) - 高级指引 -高阶组件

    高阶组件(HOC) 概述 是React复用组件逻辑的一种高级技巧,是一种基于React组合特性而形成的设计模式 高阶组件是参数为组件,返回值为新组件的函数 简单理解: 高阶组件本身是 函数,传参数是组 ...

  8. Kotlin-简约之美-进阶篇(十一):高阶函数

    Lambda是什么 在讲高阶函数之前,必须要明白什么是Lambda,简单来讲,Lambda是一种函数的表示方式(言外之意也就是说一个Lambda表达式等于一个函数).更确切的说:Lambda是一个未声 ...

  9. 一天一个小知识:KT高阶函数

    让我们从匿名函数聊起 我们听说过有匿名类,那作为一等公民的函数就也会有匿名函数 什么是一等公民? 我们知道Java的一等公民是类,就连一个普通的程序入口也要用类包一下.而kotlin中除了类是一等公民 ...

最新文章

  1. 基于全局场景背景图和关系优化的全景3D场景理解(ICCV 2021)
  2. python爬虫入门实例-终于领会python爬虫入门示例
  3. Hadoop集群 MapReduce初级案例
  4. django项目简单调取百度翻译接口
  5. vue class与style绑定
  6. wordpress后台无法登录问题
  7. 算法:数组中的逆序对
  8. 《Scikit-Learn与TensorFlow机器学习实用指南》 第1章 机器学习概览
  9. eclipse-java-2018-09-win32-x86_64配置tomcat(内含更新eclipse,如何解决添加时找不到最新tomcat版本)...
  10. 电话号码以185****3547显示demo
  11. 锅炉正反平衡计算热效率
  12. matlab对5个矩阵循环求均值,MATLAB循环求数组的平均值 每隔几个数据求一下平均值...
  13. 不会做PPT图表?1000个高大上的PPT图表,0门槛0套路,想要就给你
  14. 笔记-MFC更换鼠标图片
  15. 企业微信三方开发:注册企业微信服务商
  16. 《烟花》个人深度解读
  17. Single Tree Segmentation and Diameter at Breast Height Estimation With Mobile LiDAR
  18. 大漠老师:2022 年的 CSS,到底有哪些特性
  19. 找学习资料的网址/地方
  20. Linux安装ST-Link GDBServer

热门文章

  1. Java 进行微信公众号开发遇到的一些坑
  2. 国外引进的技术?NO!这款最酷的垃圾分拣机器人是“三国产品”
  3. Python3自动化打包项目发布到pypi
  4. 数值分析笔记 - L2 - Floating Point Arithmetic(国外资料)
  5. 国家一级建造师—工程经济—第一章—第三节
  6. 第三章 多级放大电路
  7. 腾讯云携手信通院启动“云原生开源白皮书”编写,深度解读云原生
  8. 计算机网络协议 —— TCPUDP
  9. 如何在IGV上使用BLAT搜索非模式物种(续)
  10. 9月4日服务器例行维护公告,9月14日服务器例行维护公告