Spark开发注意: collect_list、collect_set会去除Null值
今天我们踩到一个collect_list的坑,collect_list的结果不包含null值
name | city |
---|---|
张三 | 广州 |
null | 广州 |
李四 | 深圳 |
对city作group by后collect_list(name)得到的结果中city='广州’为List(‘张三’),没有null值。跟踪源码:
def collect_list(e: Column): Column = withAggregateFunction { CollectList(e.expr) }
collect_list使用CollectList计算
@ExpressionDescription(usage = "_FUNC_(expr) - Collects and returns a list of non-unique elements.")
case class CollectList(child: Expression,mutableAggBufferOffset: Int = 0,inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] {def this(child: Expression) = this(child, 0, 0)override lazy val bufferElementType = child.dataTypeoverride def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value)override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate =copy(mutableAggBufferOffset = newMutableAggBufferOffset)override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate =copy(inputAggBufferOffset = newInputAggBufferOffset)override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.emptyoverride def prettyName: String = "collect_list"override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {new GenericArrayData(buffer.toArray)}
}
CollectList继承于Collect[mutable.ArrayBuffer[Any]]
/*** A base class for collect_list and collect_set aggregate functions.** We have to store all the collected elements in memory, and so notice that too many elements* can cause GC paused and eventually OutOfMemory Errors.*/
abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImperativeAggregate[T] {val child: Expressionoverride def children: Seq[Expression] = child :: Niloverride def nullable: Boolean = trueoverride def dataType: DataType = ArrayType(child.dataType)// Both `CollectList` and `CollectSet` are non-deterministic since their results depend on the// actual order of input rows.override lazy val deterministic: Boolean = falseoverride def defaultResult: Option[Literal] = Option(Literal.create(Array(), dataType))protected def convertToBufferElement(value: Any): Anyoverride def update(buffer: T, input: InternalRow): T = {val value = child.eval(input)// Do not allow null values. We follow the semantics of Hive's collect_list/collect_set here.// See: org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMkCollectionEvaluatorif (value != null) {buffer += convertToBufferElement(value)}buffer}override def merge(buffer: T, other: T): T = {buffer ++= other}protected val bufferElementType: DataTypeprivate lazy val projection = UnsafeProjection.create(Array[DataType](ArrayType(elementType = bufferElementType, containsNull = false)))private lazy val row = new UnsafeRow(1)override def serialize(obj: T): Array[Byte] = {val array = new GenericArrayData(obj.toArray)projection.apply(InternalRow.apply(array)).getBytes()}override def deserialize(bytes: Array[Byte]): T = {val buffer = createAggregationBuffer()row.pointTo(bytes, bytes.length)row.getArray(0).foreach(bufferElementType, (_, x: Any) => buffer += x)buffer}
}
在update方法中可以看到判空的逻辑,注释内容说要与Hive的collect_list/collect_set方法保持一致。
如果collect_list再支持一个可选参数,用于控制是否过滤null就好了,于是我们自定义了collect_list函数
object CollectListWithNullUDAF extends UserDefinedAggregateFunction {// Data types of input arguments of this aggregate functionoverride def inputSchema: StructType = StructType(StructField("element", StringType) :: Nil)// Data types of values in the aggregation bufferoverride def bufferSchema: StructType = StructType(StructField("buffer", ArrayType(StringType, containsNull = true)) :: Nil)// The data type of the returned valueoverride def dataType: DataType = ArrayType(StringType, containsNull = true)// Whether this function always returns the same output on the identical inputoverride def deterministic: Boolean = true// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides// the opportunity to update its values. Note that arrays and maps inside the buffer are still// immutable. override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = List.empty[String]}// Updates the given aggregation buffer `buffer` with new input data from `input`override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {implicit val defaultFormats: DefaultFormats.type = org.json4s.DefaultFormatsvar list = buffer.get(0).asInstanceOf[mutable.WrappedArray[String]].toListval value = input.get(0)if (value == null) {list = list.+:(null)} else {list = list.+:(value.toString)}buffer(0) = list}// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {val listResult = buffer1.get(0).asInstanceOf[mutable.WrappedArray[String]].toListval listTemp = buffer2.get(0).asInstanceOf[mutable.WrappedArray[String]].toListbuffer1(0) = listResult ++ listTemp}// Calculates the final resultoverride def evaluate(buffer: Row): List[String] = {val list = buffer.get(0).asInstanceOf[mutable.WrappedArray[String]].reverse.toListlist}
}// 方法注册
spark.udf.register("collect_list_with_null", CollectListWithNullUDAF)
Spark开发注意: collect_list、collect_set会去除Null值相关推荐
- Intellij IDEA使用Maven搭建spark开发环境(scala)
如何一步一步地在Intellij IDEA使用Maven搭建spark开发环境,并基于scala编写简单的spark中wordcount实例. 1.准备工作 首先需要在你电脑上安装jdk和scala ...
- Spark开发性能调优
Spark开发性能调优 标签(空格分隔): Spark –Write By Vin 1. 分配资源调优 Spark性能调优的王道就是分配资源,即增加和分配更多的资源对性能速度的提升是显而易见的,基本上 ...
- spark开发及调优
一.spark开发调优 1.避免重复RDD 原则一:避免创建重复的RDD 对同一份数据,只应该创建一个RDD,不能创建多个RDD来代表同一份数据 极大浪费内存 2.尽可能复用RDD 原则二:尽可能复用 ...
- IDEA2022 配置spark开发环境
本人强烈建议在 linux环境下 学习 spark!!! Introduction Apache Spark是一个快速且通用的分布式计算引擎,可以在大规模数据集上进行高效的数据处理,包括数据转换.数据 ...
- Spark 开发总结
Spark 开发总结 前言 spark UI Spark API Function Window.partitionBy Spark udf Spark 中禁止使用return Spark NullP ...
- spark开发环境搭建及部署
spark开发环境搭建 1.下载开发工具luna eclipse 或者 Intellij IDEA(官网下载的 scala for eclipse如果不能用可以使用 luna) 2.安装jdk1.7配 ...
- 大规模数据处理Apache Spark开发
大规模数据处理Apache Spark开发 Spark是用于大规模数据处理的统一分析引擎.它提供了Scala.Java.Python和R的高级api,以及一个支持用于数据分析的通用计算图的优化引擎.它 ...
- Windows环境下在IDEA编辑器中spark开发安装步骤
以下是windows环境下安装spark的过程: 1.安装JDK(version:1.8.0.152) 2.安装scala(version:2.11/2.12) 3.安装spark(version:s ...
- oracle 让人抓狂的错误之 null值 与 无值(无结果)-开发系列(一)
近期.在做开发.写存过的时候碰到一些问题,找了好长时间才发现原因.并且是曾经不知道的. 所以在这给记下来 给自己备忘和大家參考. 一 .null值 以下举个最简单的样例.寻常工作其中肯定比这个sql复 ...
最新文章
- c语言中如何设计和编写一个应用系统?
- E. coli Bacterial Assembly 大肠杆菌
- CSS3中box-shadow的用法介绍
- 松下伺服esi文件_松下贴片机操作教程
- Ubuntu:最简单的Ubuntu安装工具Wubi
- Mac OS X将CSV格式转换为Excel文档格式,Excel转CSV中文乱码问题
- 重新学习 React (二) Diffing 算法
- 微服务跨数据库联合查询_数据库跨库查询
- 雪城大学信息安全讲义 3.1 Set-UID 机制如何工作
- 华为RSTP和MSTP相关配置命令
- matlab半峰宽计算公式,半峰宽单位换算(峰宽与半峰宽转换公式)
- python拼音识别多音字的包_一个有意思还有用的Python包-汉字转换拼音
- JavaCC实现语法分析
- 电脑怎么装linux系统
- js练习(十一)实现一个打点计时器 setInterval()
- 离散选择模型(DCM)和深度神经网络(DNN)结合
- python使用Canny算法和HoughCiecle算法实现圆的检测与定位
- 12、【李宏毅机器学习(2017)】Semi-supervised Learning(半监督学习)
- C语言之tentative definition
- 计算机黑屏故障的原因及解决方法,电脑黑屏原因及解决方法