此文已由作者叶林宝授权网易云社区发布。

欢迎访问网易云社区,了解更多网易技术产品运营经验。

方案四:Sort on Cell Values

简述:

上述方案三, 当数据行数较多, 情况下, 在二次排序还是可能出现oom情况, 而且, 不同的field_index的数据可能shuffle到同一个分区,这样就加大了oom的概率。当field_index本身取值较多 情况下, 增加分区数是其中一种解决方法。但是field_index取值本身就少于分区数的情况下, 增加分区数对缓解oom就没任何作用了。 如果 当field_value相比field_index较为分散, 且值较多的情况下, 不妨换个思维, 按field_value分区。 具体算法如下:

算法:

(1)将df 转换为(field_value, field_index)

(2)对分区内的数据, 用sortByKey根据 field_value排序 (rangPartition排序)

(3)利用mapPartitions确定每个分区内的每个field_index共有多少数据(不同分区中的filed_value相对有序, 例如partiiton1 中的filed_value比partition2中的field_value小)

(4)利用第(3)步数据, 确定每个field_index中所需要的排名的数据在哪个分区以及分区内第几条数据。例如要输出field_index_6的13th位数据,假设第一个分区已经包含10条数据, 则目标数据在第二个分区的第3条数据

(5)转换(4)计算结果为标准输出格式

代码:

(1)

/*** 将数据源df转换为(field_value, field_index)格式的rdd* @param dataFrame* @return*/def getValueColumnPairs(dataFrame : DataFrame): RDD[(Double, Int)] ={dataFrame.rdd.flatMap{row: Row => row.toSeq.zipWithIndex.map{case (v, index) => (v.toString.toDouble, index)}}}

(3)

/*** 对按照field_value排序后的sortedValueColumnPairs, 计算出每个分区上, 每个field_index分别有多少数据* @param sortedValueColumnPairs* @param numOfColumns* @return*/def getColumnsFreqPerPartition(sortedValueColumnPairs: RDD[(Double, Int)],numOfColumns : Int): Array[(Int, Array[Long])] = {val zero = Array.fill[Long](numOfColumns)(0)    def aggregateColumnFrequencies (partitionIndex : Int, valueColumnPairs : Iterator[(Double, Int)]) = {val columnsFreq : Array[Long] = valueColumnPairs.aggregate(zero)((a : Array[Long], v : (Double, Int)) => {val (value, colIndex) = v          //increment the cell in the zero array corresponding to this columna(colIndex) = a(colIndex) + 1La},(a : Array[Long], b : Array[Long]) => {a.zip(b).map{ case(aVal, bVal) => aVal + bVal}})Iterator((partitionIndex, columnsFreq))}sortedValueColumnPairs.mapPartitionsWithIndex(aggregateColumnFrequencies).collect()}

举例说明:

假设对(1)中转换后的数据, 按照field_value排序后, 各个分区的数据如下所示

Partition 1: (1.5, 0) (1.75, 1) (2.0, 2) (5.25, 0)

Partition 2: (7.5, 1) (9.5, 2)

则(2)的输出结果为:

[(0, [2, 1, 1]), (1, [0, 1, 1])]

(4)

/*** 计算每个field_index所需排位数据在第几个分区的第几条数据* @param targetRanks 排位数组* @param partitionColumnsFreq 每个分区的每个field_index包含多少数据* @param numOfColumns field个数* @return*/def getRanksLocationsWithinEachPart(targetRanks : List[Long],partitionColumnsFreq : Array[(Int, Array[Long])],numOfColumns : Int) : Array[(Int, List[(Int, Long)])] = {    // 二维数组, 存储当前每个field_index, 遍历到到第几条数据val runningTotal = Array.fill[Long](numOfColumns)(0)    // The partition indices are not necessarily in sorted order, so we need// to sort the partitionsColumnsFreq array by the partition index (the// first value in the tuple).partitionColumnsFreq.sortBy(_._1).map {      // relevantIndexList 存储分区上, 满足排位数组的field_index在该分区的第几条数据case (partitionIndex, columnsFreq) => val relevantIndexList = new mutable.MutableList[(Int, Long)]()columnsFreq.zipWithIndex.foreach{ case (colCount, colIndex) =>          // 当天field_index(即colIndex), 遍历到第几条数据val runningTotalCol = runningTotal(colIndex)          //  当前field_index(即colIndex),排位数组中哪些排位位于当前分区val ranksHere: List[Long] = targetRanks.filter(rank =>runningTotalCol < rank && runningTotalCol + colCount >= rank)          // 计算出当前分区,当前field_index(即colIndex), 满足排位数组的field_value在当前分区的位置relevantIndexList ++= ranksHere.map(rank => (colIndex, rank - runningTotalCol))runningTotal(colIndex) += colCount}(partitionIndex, relevantIndexList.toList)}}

举个例子:

假如目标排位:targetRanks: [5]

各分区各feild_index数据量:partitionColumnsFreq: [(0, [2, 3]), (1, [4, 1]), (2, [5, 2])]

字段个数:numOfColumns: 2

输出结果: [(0, []), (1, [(0, 3)]), (2, [(1, 1)])]

(5)

/*** 过滤出每个field_index 所需排位的数值* @param sortedValueColumnPairs* @param ranksLocations (4)中计算出的满足排位数组要求的每个分区上,每个field_index在该分区的第几条数据* @return*/def findTargetRanksIteratively( sortedValueColumnPairs : RDD[(Double, Int)], ranksLocations : Array[(Int, List[(Int, Long)])]):RDD[(Int, Double)] = {sortedValueColumnPairs.mapPartitionsWithIndex((partitionIndex : Int, valueColumnPairs : Iterator[(Double, Int)]) => {        // 当前分区上, 满足排位数组的feild_index及其在该分区上的位置val targetsInThisPart: List[(Int, Long)] = ranksLocations(partitionIndex)._2        if (targetsInThisPart.nonEmpty) {          // map中的key为field_index, value为该feild_index在当前分区中的哪些位置上的数据满足排位数组要求val columnsRelativeIndex: Map[Int, List[Long]] = targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2))val columnsInThisPart = targetsInThisPart.map(_._1).distinct          // 存储各个field_index, 在分区遍历了多少条数据val runningTotals : mutable.HashMap[Int, Long]= new mutable.HashMap()runningTotals ++= columnsInThisPart.map(columnIndex => (columnIndex, 0L)).toMap          // 遍历当前分区的数据源, 格式为(field_value, field_index), 过滤出满足排位数据要求的数据valueColumnPairs.filter{            case(value, colIndex) =>lazy val thisPairIsTheRankStatistic: Boolean = {                // 每遍历一条数据, runningTotals上对应的field_index 当前已遍历数据量+1val total = runningTotals(colIndex) + 1LrunningTotals.update(colIndex, total)columnsRelativeIndex(colIndex).contains(total)}(runningTotals contains colIndex) && thisPairIsTheRankStatistic}.map(_.swap)} else {Iterator.empty}})}

分析:

(1)这种方法代码可读性较差

(2)需要遍历两遍原始数据

(3)相比于方案三, 更加有效避免executor内oom

(4)当field_value分布较离散的情况下, 这种方案相比于前三种, 效率更高

(5)上述算法中, 有两个潜在的问题, 当field_value倾斜情况下(即某个范围的值特别多),算法效率严重依赖于算法描述中的步骤(2)是否能将所有的field_value均匀的分配到各个partition;另一个问题是,当某些field_value重复现象比较多时, 是否可以合并对这些field_value的计数,而不是在一个partition中的iterator中挨个遍历这些重复数据。

备注:上述内容(问题背景、解决算法)取自《High Performance Spark Best Practices for Scaling and Optimizing Apache Spark》(作者: Holden Karau and Rachel Warren)

免费体验云安全(易盾)内容安全、验证码等服务

更多网易技术、产品、运营经验分享请点击。

相关文章:
【推荐】 [翻译]pytest测试框架(一)
【推荐】 浅谈js拖拽
【推荐】 HBase最佳实践-集群规划

转载于:https://www.cnblogs.com/163yun/p/9881058.html

大数据算法:排位问题(2)相关推荐

  1. 大数据算法系列——布隆过滤器

    大数据算法系列--布隆过滤器 一.简介 Bloom filter介绍 Bloom Filter(BF)是一种空间效率很高的随机数据结构,它利用位数组很简洁地表示一个集合,并能判断一个元素是否属于这个集 ...

  2. 《大数据算法》一1.2 大数据算法

    本节书摘来华章计算机<大数据算法>一书中的第1章 ,第1.2节,王宏志 编著, 更多章节内容可以访问云栖社区"华章计算机"公众号查看. 1.2 大数据算法 这一节我们概 ...

  3. 基于PyTorch重写sklearn,《现代大数据算法》

    HyperLearn是一个基于PyTorch重写的机器学习工具包Scikit Learn,它的一些模块速度更快.需要内存更少,效率提高了一倍. 专为大数据而设计,HyperLearn可以使用50%以下 ...

  4. 大数据算法与分析技术国家工程实验室将建设

    国家发展改革委近日正式下发通知,同意由西安交通大学作为承担单位,国家电网公司全球能源互联网研究院作为联合共建单位,筹建"大数据算法与分析技术国家工程实验室". 国网信通部落实公司党 ...

  5. 大数据算法_【中科大】大数据算法(2020年春季)

    算法与理论是计算机科学的核心领域之一.随着大数据时代的来临,传统的算法理论已经不能很好地解决人工智能. 物联网.工业制造等领域所遇到的实际问题.本门课程主要介绍基于大数据的新型算法技术,如随机采样.数 ...

  6. 漫画趣解大数据算法建模:买瓜

    大数据开发如何转型算法? 算法建模主要做什么?调参为什么玄学? 如何通俗理解算法建模过程.. 夕阳下的村东头,有一人来买瓜. 1 引子(买瓜) 忙碌的一天刚刚结束,村里的小张就匆匆的骑上车,准备买个西 ...

  7. 郦旭东小可爱的大数据算法课程期末复习

    郦旭东小可爱的大数据算法课程期末复习 kmeans问题 kmeans原始问题和kmeans alg算法问题 kmeans问题 kmeans原始问题和kmeans alg算法问题 kmeans 原始问题 ...

  8. 大数据算法培养计划!

    立春节气已经过去了一个多月,但职场却迎来了真正的就业寒冬."旺季不旺" ,职场人期待在年后招聘季实现的跳槽.涨薪.转行,只能被暂时搁置. 根据智联招聘最新发布的调研数据来看:春节后 ...

  9. 大数据算法_看过来!2019“神气”大数据算法与应用赛决赛在即

    前方注意! 第二届智慧气象服务创新大赛 2019"神气"大数据算法与应用赛 决赛就要开始啦~ 快来看看都有哪些队伍来参赛吧! 第二届智慧气象服务创新大赛--2019"神气 ...

  10. 大数据算法识别高自杀风险人群?准确率高得吓人

    每年4月1日,追忆张国荣,几乎成了固定的"节目",之所以如此"执着"地纪念,很大程度上就在于张国荣的自杀身亡,令人扼腕. 据统计,全世界每年有约80万人自杀死亡 ...

最新文章

  1. python使用fpdf生成pdf文件:配置多种语言字体写入多种文字
  2. 最新版动手学习深度学习和GAN电子书免费下载!
  3. WIN7盗版的警告!你见过吗?
  4. 74HC573锁存器用法
  5. Selenium爬携程酒店评论+jieba数据分析实战
  6. 性能不同的服务器可以组成云,多个服务器组成云
  7. php基础标签大全,HTML基础之HTML常用标签
  8. 15.10.4 捕获异常
  9. Luogu3350 ZJOI2016 旅行者 最短路、分治
  10. springboot 关于 Class path contains multiple SLF4J bindings.警告的解决
  11. Unity文件操作路径
  12. 最常见30种NLP任务练手项目汇总
  13. 洛谷 P1168 中位数(优先队列)
  14. kafka-client 版本兼容问题
  15. 【内网学习笔记】14、发现主机缺失补丁
  16. Linux之恢复删除的数据
  17. CPU卡开发指南(四)原理解析
  18. 源码解析:Spring源码解析笔记(五)接口设计总览
  19. 神经网络实现逻辑运算,神经网络 最小二乘法
  20. Java | 参数(Parameter)

热门文章

  1. Mybatis框架实现简单的学生管理系统
  2. c语言剪刀石头布课程设计,C++剪刀石头布游戏课程设计方案.doc
  3. mysql 垂直拆分 原因_mysql的水平拆分和垂直拆分 (转)
  4. 关于Django将数据映射到Html中的操作
  5. MobaXterm 12中文版
  6. myeclipse 2019中文版
  7. jQuery Mobile
  8. B1091 N-自守数 (15分)
  9. VS 打包升成可自动升级的安装包
  10. Redis5.0:这些场景下使用,高效还降低成本!