Spark版本2.4.0

Spark的RadixSort基数排序实现的排序容器基于一个LongArray实现。

在LongArray中,一个元素的长度为8字节,当排序的时候,将是每8个字节确定一个元素。

public static int sort(LongArray array, long numRecords, int startByteIndex, int endByteIndex,boolean desc, boolean signed) {assert startByteIndex >= 0 : "startByteIndex (" + startByteIndex + ") should >= 0";assert endByteIndex <= 7 : "endByteIndex (" + endByteIndex + ") should <= 7";assert endByteIndex > startByteIndex;assert numRecords * 2 <= array.size();long inIndex = 0;long outIndex = numRecords;if (numRecords > 0) {long[][] counts = getCounts(array, numRecords, startByteIndex, endByteIndex);for (int i = startByteIndex; i <= endByteIndex; i++) {if (counts[i] != null) {sortAtByte(array, numRecords, counts[i], i, inIndex, outIndex,desc, signed && i == endByteIndex);long tmp = inIndex;inIndex = outIndex;outIndex = tmp;}}}return Ints.checkedCast(inIndex);
}

当调用sort()方法进行排序的时候,需要一个LongArray参数作为被排序的数组。

NumRecords代表数组中的元素个数,由于LongArray中一个元素是8字节,startByteIndex和endByteIndex代表参与基数排序的起始字节和结束字节,用来划分参与到排序的范围。两者在0到7之间来确定。

由于在排序中,中间过程需要在元素组中进行存放,,所以LongArray的大小必须是numRecords被排序数量的倍。

在关键参数确定完之后,需要通过getCounts()构建8字节各字节大小在整个数组中的数量直方图。通过二维数组counts来存放。

private static long[][] getCounts(LongArray array, long numRecords, int startByteIndex, int endByteIndex) {long[][] counts = new long[8][];// Optimization: do a fast pre-pass to determine which byte indices we can skip for sorting.// If all the byte values at a particular index are the same we don't need to count it.long bitwiseMax = 0;long bitwiseMin = -1L;long maxOffset = array.getBaseOffset() + numRecords * 8L;Object baseObject = array.getBaseObject();for (long offset = array.getBaseOffset(); offset < maxOffset; offset += 8) {long value = Platform.getLong(baseObject, offset);bitwiseMax |= value;bitwiseMin &= value;}long bitsChanged = bitwiseMin ^ bitwiseMax;// Compute counts for each byte index.for (int i = startByteIndex; i <= endByteIndex; i++) {if (((bitsChanged >>> (i * 8)) & 0xff) != 0) {counts[i] = new long[256];// TODO(ekl) consider computing all the counts in one pass.for (long offset = array.getBaseOffset(); offset < maxOffset; offset += 8) {counts[i][(int)((Platform.getLong(baseObject, offset) >>> (i * 8)) & 0xff)]++;}}}return counts;
}

这里会构造一个8行256列的二维数组用来作为数组中各个元素每个位置上字节大小出现数量的直方图,比如数组中各个元素第8个字节为0的情况共有三个元素,这样在数组counts[7][0]则为3,一次遍历LongArray中的所有元素,统计各个字节位置上大小统计的出现总数,形成一个直方图。上文提到的startByteIndex和endByteIndex参数用来确定统计的字节范围。

在完成各字节出现数量的直方图统计后,将会从高位开始一次进行基数排序。具体单字节的排序在sortAtByte()方法中。

private static void sortAtByte(LongArray array, long numRecords, long[] counts, int byteIdx, long inIndex, long outIndex,boolean desc, boolean signed) {assert counts.length == 256;long[] offsets = transformCountsToOffsets(counts, numRecords, array.getBaseOffset() + outIndex * 8L, 8, desc, signed);Object baseObject = array.getBaseObject();long baseOffset = array.getBaseOffset() + inIndex * 8L;long maxOffset = baseOffset + numRecords * 8L;for (long offset = baseOffset; offset < maxOffset; offset += 8) {long value = Platform.getLong(baseObject, offset);int bucket = (int)((value >>> (byteIdx * 8)) & 0xff);Platform.putLong(baseObject, offsets[bucket], value);offsets[bucket] += 8;}
}

具体的排序方式如下,当正在排序第1个字节的大小的时候,先获得被排序的元素第一个字节的大小,根据之前直方图中该字节大小前面个数的数量来确定当前轮次该元素的插入位置,比如该元素第一个字节为2,则直方图中该字节0和1共出现7次,则这轮排序这个元素将被插入到第8个位置当中,同时下一个该字节为2的元素将会被插入到第9个位置上,防止冲突。

以此直到从起始字节排序到最后一个字节,一次基数排序也宣告结束。

spark RadixSort基数排序源码实现相关推荐

  1. Spark RPC框架源码分析(二)RPC运行时序

    前情提要: Spark RPC框架源码分析(一)简述 一. Spark RPC概述 上一篇我们已经说明了Spark RPC框架的一个简单例子,Spark RPC相关的两个编程模型,Actor模型和Re ...

  2. spark任务运行源码

    spark任务运行源码 spark是一个分布式计算引擎,任务的运行是计算引擎的核心. 一个spark任务怎么能运行起来呢? 1 spark服务启动(Master,Worker): 2 应用程序提交 3 ...

  3. Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

    Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadO ...

  4. Spark ALS recommendForAll源码解析实战之Spark1.x vs Spark2.x

    文章目录 Spark ALS recommendForAll源码解析实战 1. 软件版本: 2. 本文要解决的问题 3. 源码分析实战 3.1 Spark2.2.2 ALS recommendForA ...

  5. 《Spark商业案例与性能调优实战100课》第25课:Spark Hash Shuffle源码解读与剖析

    <Spark商业案例与性能调优实战100课>第25课:Spark Hash Shuffle源码解读与剖析

  6. 第25课 Spark Hash Shuffle源码解读与剖析

    第25课: Spark Hash Shuffle源码解读与剖析 Spark 2.1x 现在的版本已经没有Hash Shuffle的方式,那为什么我们还要讲解HashShuffle源码的内容呢?原因有3 ...

  7. Spark内核以及源码解析

    2019独角兽企业重金招聘Python工程师标准>>> 一:图RDD 1.上图groupBy,Join会产生shuffle,shuffle可以做性能优化. 2.stage1和stag ...

  8. spark读取文件源码分析-3

    本篇是spark read一个parquet源码分析的第三篇,这一篇主要介绍spark的默认的partition的设置逻辑,当然,这一篇实际上算不上源码分析了 第一篇 第二篇 1 . userProf ...

  9. spark读取文件源码分析-2

    文章目录 1. job1产生时机源码分析 1. DataSoure.getOrInferFileFormatSchema() 2. ParquetFileFormat.inferSchema 1. 简 ...

最新文章

  1. OpenCV入门要掌握的基本函数
  2. JS特效——鼠标跟随特效——动态背景线条跟随鼠标移动
  3. linux下tomcat部署
  4. Linux 中的文件压缩与解压
  5. [react] 在react中你是怎么进行状态管理的?
  6. 893. 特殊等价字符串组
  7. MaxCompute 图计算用户手册(下)
  8. 错误:'BasicLSTMCell' object has no attribute '_kernel'
  9. ydisk安卓版本_Y Disk HD
  10. excel2007如何增加控件?
  11. 下行法求最小割集案例_最小割集求法 -
  12. ORA-22285: 对不存在的目录或文件进行 FILEOPEN 操作 ORA-06512: 在 SYS.DBMS_LOB, line 523 ORA-06512: 在 line 6 查看错误堆
  13. c语言 拟合指数函数的代码,如何找到拟合指数函数的x?
  14. navicat连接mysql1205报错
  15. 中国市场 Android App 兼容性报告
  16. java实现客户端 与服务端的对话_Socket实现单客户端与服务器对话功能
  17. 今天的天气是多么的晴朗
  18. Java并发编程的艺术(一)
  19. 【C语言内功心法】__weak -- 示弱也是一种强大
  20. PHP一句话木马Webshell变形免杀总结

热门文章

  1. 诗与远方:无题(四十一)
  2. Linux CentOs6 命令学习
  3. JS new操作符执行之后背后的操作
  4. JAVA设计模式 - 抽象工厂模式
  5. [蓝桥杯][2018年第九届真题]约瑟夫环
  6. 微信小程序引入npm
  7. 无法解析的外部符号+_mysql_fetch_row_vs连接mysql出现以下错误,求解答,谢谢,不胜感激...
  8. 计算机网络基本操作命令的使用,计算机网络-路由器基本命令操作实验指导书--华为...
  9. c51单片机汇编语言电梯,C51单片机汇编语言指令集.doc
  10. Windows 2012 英文版系统安装中文语言包及时间格式设置