Spark中POJO与Dataset相互转换
0x0 Dataset转POJO
方法:
- 将查询出的结果转为RDD
- 将RDD创建为DataFrame,并传入schema参数
- 调用as方法,将Dataset转为相应的POJO Dataset
- 调用collectAsList()方法
代码如下:
1.表结构
+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
| id| string| null|
| name| string| null|
| class| string| null|
+--------+---------+-------+
2.POJO类
public class Student {String id;String name;String major;...
}
3.转换代码
SparkSession spark = CloudUtils.getSparkSession();// 查询原始数据Dataset<Row> student = spark.sql("select * from `event`.`student`");// 生成schemaList<StructField> fields = new ArrayList<>();fields.add(DataTypes.createStructField("id", DataTypes.StringType, true));fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));fields.add(DataTypes.createStructField("major", DataTypes.StringType, true));StructType schema = DataTypes.createStructType(fields);// 转换查询结果为POJO ListList<Student> students = spark.createDataFrame(student.toJavaRDD(), schema).as(Encoders.bean(Student.class)).collectAsList();System.out.println(students);
注意:
Dataset中的日期类型为timestamp和java中的Date类型不兼容,和Timestamp类型相互兼容。
为了解决上述问题,我们可以先将Dataset转为JSON,然后将JSON转为POJO,代码如下:
// 查出数据并转为json集合List<String> jsonList = spark.sql("select * from `event`.`user`").toJSON().collectAsList();// 将json转为pojo,这里使用的是FastJSON List<User> users = jsonList.stream().map(jsonString -> JSON.parseObject(jsonString, User.class)).collect(Collectors.toList());System.out.println(users);
0x1 POJO转Dataset
1.表结构
+---------+---------+-------+
|col_name |data_type|comment|
+---------+---------+-------+
| user_id | string| null|
|user_name| string| null|
|user_age | int | null|
+---------+---------+-------+
2.POJO类
public class User{String userId;String userName;Integer userAge;...
}
转换代码:
// 获取users列表List<User> users = createUsers();// 使用createDataFrame转为datasetDataset<Row> ds = spark.createDataFrame(users, User.class);// 将驼峰式列名改为下划线式列名,camelToUnderline方法网上搜索String[] columns = ds.columns();String[] newColumns = Arrays.stream(columns).map(column -> camelToUnderline(column)).toArray(String[]::new);// 转为新的df(重命名后的)ds.toDF(newColumns);ds.show();
同样注意:
对于有些类型无法转换的情况,仍然采用json过渡,代码如下:
// 创建user listList<User> users = createUsers();// 将user list转为json listList<String> jsonList = users.stream().map(JSON::toJSONString).collect(Collectors.toList());// 将json list转为json datasetDataset<String> jsonDataset = spark.createDataset(jsonList, Encoders.STRING());// 转换为row datasetDataset<Row> ds = spark.read().json(jsonDataset.toJavaRDD());ds.show();
输出结果:
+------------+---+----+
| birthday| id|name|
+------------+---+----+
|689875200000| 1| AAA|
|689875200000| 2| BBB|
+------------+---+----+
Spark中POJO与Dataset相互转换相关推荐
- 【大数据开发】SparkSQL——RDD、DataFrame、DataSet相互转换、DSL常用方法、SQL风格语法、Spark读写操作、获取Column对象的方式
take,takeAsList是Action操作 limit⽅法获取指定DataFrame的前n⾏记录,得到⼀个新的DataFrame对象.和take与head不同的是,limit⽅法不是Action ...
- Spark中RDD、DataFrame和DataSet的区别与联系
一.RDD.DataFrame和DataSet的定义 在开始Spark RDD与DataFrame与Dataset之间的比较之前,先让我们看一下Spark中的RDD,DataFrame和Dataset ...
- Spark中RDD与DataFrame与DataSet的区别与联系
1.概述 这是一个面试题 在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似传统数据库中的二维表格 DataFrame与RDD的主要区别在于,前者带有schema元数据信息,既 ...
- spark中local模式与cluster模式使用场景_Spark 知识点 ( 架构 RDD Task )
1. Spark介绍 Apache Spark是一个围绕速度.易用性和复杂分析构建的大数据处理框架,最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一 ...
- spark中的println失效问题解决
我们知道, spark中的println会被控制台忽略. 代码如下: import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSessio ...
- 什么是spark的惰性计算?有什么优势?_spark——spark中常说RDD,究竟RDD是什么?
本文始发于个人公众号:TechFlow,原创不易,求个关注 今天是spark专题第二篇文章,我们来看spark非常重要的一个概念--RDD. 在上一讲当中我们在本地安装好了spark,虽然我们只有lo ...
- spark中dataframe解析_SparkSql 中 JOIN的实现
Join作为SQL中一个重要语法特性,几乎所有稍微复杂一点的数据分析场景都离不开Join,如今Spark SQL(Dataset/DataFrame)已经成为Spark应用程序开发的主流,作为开发者, ...
- Spark中,RDD概述(五大属性,弹性介绍,5个特性)
1 什么是RDD RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark 中最基本的数据抽象,它代表一个不可变.可分区.里面的元素可并行计算的集合.在Spa ...
- Spark 中 RDD 的详细介绍
RDD ---弹性分布式数据集 RDD概述 RDD论文 中文版 : http://spark.apachecn.org/paper/zh/spark-rdd.html RDD产生背景 为了解决开发人员 ...
最新文章
- Linux RAR 安装和使用详细说明
- SAP RETAIL 使用MM41创建的物料不能使用MMSC扩展其存储地点
- python判断操作系统类型
- ML之RF/kNNC/LoRC/SVMC/RFC/GBDTC:利用Pipeline(客户年龄/职业/婚姻/教育/违约/余额/住房等)预测客户是否购买该银行的产品二分类(评估、调优、推理)
- hibernate注解的测试
- cloud foundry_Spring 3.1,Cloud Foundry和本地开发
- Windows Hook(2)调用DLL函数
- Socket 简易静态服务器 WPF MVVM模式(一)
- 【渝粤教育】21秋期末考试物权法10774k1
- oracle客户端sqlplus安装
- 2019 Multi-University Training Contest 6:Snowy Smile(线段树查询最大子段和)
- python中的函数
- POS标记——HMM模型
- Python OpenCV开发MR智能人脸识别打卡系统(四、服务模块设计)
- 通过ADB命令实现App的安装、卸载、覆盖
- 安装双系统后,将windows设置为默认启动选项的方法
- java爬虫系列第二讲-爬取最新动作电影《海王》迅雷下载地址
- 24小时切换简易时钟-51单片机
- bmijava_Java程序来计算体重指数(BMI)
- JewelCAD Pro 珠宝设计软件
热门文章
- phpstudy一直自动停止启动_phpStudy Apache和MySQL启动后又停止的有效解决办法
- python抽荣耀水晶_一次抽到5个荣耀水晶?很多人不信,但是有一种办法可以做到...
- 拖拽效果遇到的问题及解决方案
- 网络性能总不好?网络调优专家AOE帮你来“看看”
- 程序员自己的测试叫什么软件,程序员应该如何测试自己的程序代码
- 绝地求生自定义服务器租用,绝地求生自定义模式怎么玩 自定义模式介绍
- MacOS更新提示“安装需要下载重要内容。该内容此时无法下载。请稍后再试。”解决方法
- python 日期 格式转换 英文_python 中英文时间转换
- java编程实现迷你计算机功能,用JAVA编写一个迷你编辑器(续)
- iWatch还未面世,LG G Watch率先曝光