今天我们踩到一个collect_list的坑,collect_list的结果不包含null值

name city
张三 广州
null 广州
李四 深圳

对city作group by后collect_list(name)得到的结果中city='广州’为List(‘张三’),没有null值。跟踪源码:

  def collect_list(e: Column): Column = withAggregateFunction { CollectList(e.expr) }

collect_list使用CollectList计算

@ExpressionDescription(usage = "_FUNC_(expr) - Collects and returns a list of non-unique elements.")
case class CollectList(child: Expression,mutableAggBufferOffset: Int = 0,inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] {def this(child: Expression) = this(child, 0, 0)override lazy val bufferElementType = child.dataTypeoverride def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value)override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate =copy(mutableAggBufferOffset = newMutableAggBufferOffset)override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate =copy(inputAggBufferOffset = newInputAggBufferOffset)override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.emptyoverride def prettyName: String = "collect_list"override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {new GenericArrayData(buffer.toArray)}
}

CollectList继承于Collect[mutable.ArrayBuffer[Any]]


/*** A base class for collect_list and collect_set aggregate functions.** We have to store all the collected elements in memory, and so notice that too many elements* can cause GC paused and eventually OutOfMemory Errors.*/
abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImperativeAggregate[T] {val child: Expressionoverride def children: Seq[Expression] = child :: Niloverride def nullable: Boolean = trueoverride def dataType: DataType = ArrayType(child.dataType)// Both `CollectList` and `CollectSet` are non-deterministic since their results depend on the// actual order of input rows.override lazy val deterministic: Boolean = falseoverride def defaultResult: Option[Literal] = Option(Literal.create(Array(), dataType))protected def convertToBufferElement(value: Any): Anyoverride def update(buffer: T, input: InternalRow): T = {val value = child.eval(input)// Do not allow null values. We follow the semantics of Hive's collect_list/collect_set here.// See: org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMkCollectionEvaluatorif (value != null) {buffer += convertToBufferElement(value)}buffer}override def merge(buffer: T, other: T): T = {buffer ++= other}protected val bufferElementType: DataTypeprivate lazy val projection = UnsafeProjection.create(Array[DataType](ArrayType(elementType = bufferElementType, containsNull = false)))private lazy val row = new UnsafeRow(1)override def serialize(obj: T): Array[Byte] = {val array = new GenericArrayData(obj.toArray)projection.apply(InternalRow.apply(array)).getBytes()}override def deserialize(bytes: Array[Byte]): T = {val buffer = createAggregationBuffer()row.pointTo(bytes, bytes.length)row.getArray(0).foreach(bufferElementType, (_, x: Any) => buffer += x)buffer}
}

在update方法中可以看到判空的逻辑,注释内容说要与Hive的collect_list/collect_set方法保持一致。
如果collect_list再支持一个可选参数,用于控制是否过滤null就好了,于是我们自定义了collect_list函数

object CollectListWithNullUDAF extends UserDefinedAggregateFunction {// Data types of input arguments of this aggregate functionoverride def inputSchema: StructType = StructType(StructField("element", StringType) :: Nil)// Data types of values in the aggregation bufferoverride def bufferSchema: StructType = StructType(StructField("buffer", ArrayType(StringType, containsNull = true)) :: Nil)// The data type of the returned valueoverride def dataType: DataType = ArrayType(StringType, containsNull = true)// Whether this function always returns the same output on the identical inputoverride def deterministic: Boolean = true// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides// the opportunity to update its values. Note that arrays and maps inside the buffer are still// immutable.  override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = List.empty[String]}// Updates the given aggregation buffer `buffer` with new input data from `input`override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {implicit val defaultFormats: DefaultFormats.type = org.json4s.DefaultFormatsvar list = buffer.get(0).asInstanceOf[mutable.WrappedArray[String]].toListval value = input.get(0)if (value == null) {list = list.+:(null)} else {list = list.+:(value.toString)}buffer(0) = list}// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {val listResult = buffer1.get(0).asInstanceOf[mutable.WrappedArray[String]].toListval listTemp = buffer2.get(0).asInstanceOf[mutable.WrappedArray[String]].toListbuffer1(0) = listResult ++ listTemp}// Calculates the final resultoverride def evaluate(buffer: Row): List[String] = {val list = buffer.get(0).asInstanceOf[mutable.WrappedArray[String]].reverse.toListlist}
}// 方法注册
spark.udf.register("collect_list_with_null", CollectListWithNullUDAF)

Spark开发注意: collect_list、collect_set会去除Null值相关推荐

  1. Intellij IDEA使用Maven搭建spark开发环境(scala)

    如何一步一步地在Intellij IDEA使用Maven搭建spark开发环境,并基于scala编写简单的spark中wordcount实例. 1.准备工作  首先需要在你电脑上安装jdk和scala ...

  2. Spark开发性能调优

    Spark开发性能调优 标签(空格分隔): Spark –Write By Vin 1. 分配资源调优 Spark性能调优的王道就是分配资源,即增加和分配更多的资源对性能速度的提升是显而易见的,基本上 ...

  3. spark开发及调优

    一.spark开发调优 1.避免重复RDD 原则一:避免创建重复的RDD 对同一份数据,只应该创建一个RDD,不能创建多个RDD来代表同一份数据 极大浪费内存 2.尽可能复用RDD 原则二:尽可能复用 ...

  4. IDEA2022 配置spark开发环境

    本人强烈建议在 linux环境下 学习 spark!!! Introduction Apache Spark是一个快速且通用的分布式计算引擎,可以在大规模数据集上进行高效的数据处理,包括数据转换.数据 ...

  5. Spark 开发总结

    Spark 开发总结 前言 spark UI Spark API Function Window.partitionBy Spark udf Spark 中禁止使用return Spark NullP ...

  6. spark开发环境搭建及部署

    spark开发环境搭建 1.下载开发工具luna eclipse 或者 Intellij IDEA(官网下载的 scala for eclipse如果不能用可以使用 luna) 2.安装jdk1.7配 ...

  7. 大规模数据处理Apache Spark开发

    大规模数据处理Apache Spark开发 Spark是用于大规模数据处理的统一分析引擎.它提供了Scala.Java.Python和R的高级api,以及一个支持用于数据分析的通用计算图的优化引擎.它 ...

  8. Windows环境下在IDEA编辑器中spark开发安装步骤

    以下是windows环境下安装spark的过程: 1.安装JDK(version:1.8.0.152) 2.安装scala(version:2.11/2.12) 3.安装spark(version:s ...

  9. oracle 让人抓狂的错误之 null值 与 无值(无结果)-开发系列(一)

    近期.在做开发.写存过的时候碰到一些问题,找了好长时间才发现原因.并且是曾经不知道的. 所以在这给记下来 给自己备忘和大家參考. 一 .null值 以下举个最简单的样例.寻常工作其中肯定比这个sql复 ...

最新文章

  1. c语言中如何设计和编写一个应用系统?
  2. E. coli Bacterial Assembly 大肠杆菌
  3. CSS3中box-shadow的用法介绍
  4. 松下伺服esi文件_松下贴片机操作教程
  5. Ubuntu:最简单的Ubuntu安装工具Wubi
  6. Mac OS X将CSV格式转换为Excel文档格式,Excel转CSV中文乱码问题
  7. 重新学习 React (二) Diffing 算法
  8. 微服务跨数据库联合查询_数据库跨库查询
  9. 雪城大学信息安全讲义 3.1 Set-UID 机制如何工作
  10. 华为RSTP和MSTP相关配置命令
  11. matlab半峰宽计算公式,半峰宽单位换算(峰宽与半峰宽转换公式)
  12. python拼音识别多音字的包_一个有意思还有用的Python包-汉字转换拼音
  13. JavaCC实现语法分析
  14. 电脑怎么装linux系统
  15. js练习(十一)实现一个打点计时器 setInterval()
  16. 离散选择模型(DCM)和深度神经网络(DNN)结合
  17. python使用Canny算法和HoughCiecle算法实现圆的检测与定位
  18. 12、【李宏毅机器学习(2017)】Semi-supervised Learning(半监督学习)
  19. C语言之tentative definition
  20. 计算机黑屏故障的原因及解决方法,电脑黑屏原因及解决方法

热门文章

  1. php 时间戳 24小时制,php如何将时间戳转换成小时制
  2. 《IDA Pro 代码破解解密》笔记一
  3. RS485和RS232的电路理解
  4. 【Grammy Nominees 2014】2014年格莱美音乐大奖提名名单
  5. 英语中代替 I think 的Debate回答
  6. 【有功-无功协调优化】基于改进多目标粒子群优化算法(小生境粒子群算法)的配电网有功-无功协调优化研究(Matlab代码实现)
  7. 树莓派 多USB设备名字绑定
  8. MFC实现CSGO汰换合同磨损计算器
  9. 一加Nord N300 5G什么时候发布 一加Nord N300 5G配置如何
  10. 云计算ACP弹性计算服务(一)