0x0 Dataset转POJO

方法:

  1. 将查询出的结果转为RDD
  2. 将RDD创建为DataFrame,并传入schema参数
  3. 调用as方法,将Dataset转为相应的POJO Dataset
  4. 调用collectAsList()方法

代码如下:

1.表结构

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|      id|   string|   null|
|    name|   string|   null|
|   class|   string|   null|
+--------+---------+-------+

2.POJO类

public class Student {String id;String name;String major;...
}

3.转换代码

SparkSession spark = CloudUtils.getSparkSession();// 查询原始数据Dataset<Row> student = spark.sql("select * from `event`.`student`");// 生成schemaList<StructField> fields = new ArrayList<>();fields.add(DataTypes.createStructField("id", DataTypes.StringType, true));fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));fields.add(DataTypes.createStructField("major", DataTypes.StringType, true));StructType schema = DataTypes.createStructType(fields);// 转换查询结果为POJO ListList<Student> students = spark.createDataFrame(student.toJavaRDD(), schema).as(Encoders.bean(Student.class)).collectAsList();System.out.println(students);

注意:
Dataset中的日期类型为timestamp和java中的Date类型不兼容,和Timestamp类型相互兼容。
为了解决上述问题,我们可以先将Dataset转为JSON,然后将JSON转为POJO,代码如下:

        // 查出数据并转为json集合List<String> jsonList = spark.sql("select * from `event`.`user`").toJSON().collectAsList();// 将json转为pojo,这里使用的是FastJSON        List<User> users = jsonList.stream().map(jsonString -> JSON.parseObject(jsonString, User.class)).collect(Collectors.toList());System.out.println(users);

0x1 POJO转Dataset

1.表结构

+---------+---------+-------+
|col_name |data_type|comment|
+---------+---------+-------+
| user_id |   string|   null|
|user_name|   string|   null|
|user_age |   int   |   null|
+---------+---------+-------+

2.POJO类

public class User{String userId;String userName;Integer userAge;...
}

转换代码:

        // 获取users列表List<User> users = createUsers();// 使用createDataFrame转为datasetDataset<Row> ds = spark.createDataFrame(users, User.class);// 将驼峰式列名改为下划线式列名,camelToUnderline方法网上搜索String[] columns = ds.columns();String[] newColumns = Arrays.stream(columns).map(column -> camelToUnderline(column)).toArray(String[]::new);// 转为新的df(重命名后的)ds.toDF(newColumns);ds.show();

同样注意:
对于有些类型无法转换的情况,仍然采用json过渡,代码如下:

        // 创建user listList<User> users = createUsers();// 将user list转为json listList<String> jsonList = users.stream().map(JSON::toJSONString).collect(Collectors.toList());// 将json list转为json datasetDataset<String> jsonDataset = spark.createDataset(jsonList, Encoders.STRING());// 转换为row datasetDataset<Row> ds = spark.read().json(jsonDataset.toJavaRDD());ds.show();

输出结果:

+------------+---+----+
|    birthday| id|name|
+------------+---+----+
|689875200000|  1| AAA|
|689875200000|  2| BBB|
+------------+---+----+

Spark中POJO与Dataset相互转换相关推荐

  1. 【大数据开发】SparkSQL——RDD、DataFrame、DataSet相互转换、DSL常用方法、SQL风格语法、Spark读写操作、获取Column对象的方式

    take,takeAsList是Action操作 limit⽅法获取指定DataFrame的前n⾏记录,得到⼀个新的DataFrame对象.和take与head不同的是,limit⽅法不是Action ...

  2. Spark中RDD、DataFrame和DataSet的区别与联系

    一.RDD.DataFrame和DataSet的定义 在开始Spark RDD与DataFrame与Dataset之间的比较之前,先让我们看一下Spark中的RDD,DataFrame和Dataset ...

  3. Spark中RDD与DataFrame与DataSet的区别与联系

    1.概述 这是一个面试题 在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似传统数据库中的二维表格 DataFrame与RDD的主要区别在于,前者带有schema元数据信息,既 ...

  4. spark中local模式与cluster模式使用场景_Spark 知识点 ( 架构 RDD Task )

    1. Spark介绍 Apache Spark是一个围绕速度.易用性和复杂分析构建的大数据处理框架,最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一 ...

  5. spark中的println失效问题解决

    我们知道, spark中的println会被控制台忽略. 代码如下: import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSessio ...

  6. 什么是spark的惰性计算?有什么优势?_spark——spark中常说RDD,究竟RDD是什么?

    本文始发于个人公众号:TechFlow,原创不易,求个关注 今天是spark专题第二篇文章,我们来看spark非常重要的一个概念--RDD. 在上一讲当中我们在本地安装好了spark,虽然我们只有lo ...

  7. spark中dataframe解析_SparkSql 中 JOIN的实现

    Join作为SQL中一个重要语法特性,几乎所有稍微复杂一点的数据分析场景都离不开Join,如今Spark SQL(Dataset/DataFrame)已经成为Spark应用程序开发的主流,作为开发者, ...

  8. Spark中,RDD概述(五大属性,弹性介绍,5个特性)

    1 什么是RDD RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark 中最基本的数据抽象,它代表一个不可变.可分区.里面的元素可并行计算的集合.在Spa ...

  9. Spark 中 RDD 的详细介绍

    RDD ---弹性分布式数据集 RDD概述 RDD论文 中文版 : http://spark.apachecn.org/paper/zh/spark-rdd.html RDD产生背景 为了解决开发人员 ...

最新文章

  1. Linux RAR 安装和使用详细说明
  2. SAP RETAIL 使用MM41创建的物料不能使用MMSC扩展其存储地点
  3. python判断操作系统类型
  4. ML之RF/kNNC/LoRC/SVMC/RFC/GBDTC:利用Pipeline(客户年龄/职业/婚姻/教育/违约/余额/住房等)预测客户是否购买该银行的产品二分类(评估、调优、推理)
  5. hibernate注解的测试
  6. cloud foundry_Spring 3.1,Cloud Foundry和本地开发
  7. Windows Hook(2)调用DLL函数
  8. Socket 简易静态服务器 WPF MVVM模式(一)
  9. 【渝粤教育】21秋期末考试物权法10774k1
  10. oracle客户端sqlplus安装
  11. 2019 Multi-University Training Contest 6:Snowy Smile(线段树查询最大子段和)
  12. python中的函数
  13. POS标记——HMM模型
  14. Python OpenCV开发MR智能人脸识别打卡系统(四、服务模块设计)
  15. 通过ADB命令实现App的安装、卸载、覆盖
  16. 安装双系统后,将windows设置为默认启动选项的方法
  17. java爬虫系列第二讲-爬取最新动作电影《海王》迅雷下载地址
  18. 24小时切换简易时钟-51单片机
  19. bmijava_Java程序来计算体重指数(BMI)
  20. JewelCAD Pro 珠宝设计软件

热门文章

  1. phpstudy一直自动停止启动_phpStudy Apache和MySQL启动后又停止的有效解决办法
  2. python抽荣耀水晶_一次抽到5个荣耀水晶?很多人不信,但是有一种办法可以做到...
  3. 拖拽效果遇到的问题及解决方案
  4. 网络性能总不好?网络调优专家AOE帮你来“看看”
  5. 程序员自己的测试叫什么软件,程序员应该如何测试自己的程序代码
  6. 绝地求生自定义服务器租用,绝地求生自定义模式怎么玩 自定义模式介绍
  7. MacOS更新提示“安装需要下载重要内容。该内容此时无法下载。请稍后再试。”解决方法
  8. python 日期 格式转换 英文_python 中英文时间转换
  9. java编程实现迷你计算机功能,用JAVA编写一个迷你编辑器(续)
  10. iWatch还未面世,LG G Watch率先曝光