一、Spark-sql创建外部分区表

1.使用spark-sql

spark-sql --queue spark --master yarn --deploy-mode client --num-executors 10  --executor-cores 2 --executor-memory 3G

2.spark-sql中创建parquet分区

create external table pgls.convert_parq(
bill_num string,
logis_id string,
store_id string,
store_code string,
creater_id string,
order_status INT,
pay_status INT,
order_require_varieties INT,
order_require_amount decimal(19,4),
order_rec_amount decimal(19,4),
order_rec_gpf decimal(19,4),
deli_fee FLOAT,
order_type INT,
last_modify_time timestamp,
order_submit_time timestamp
)
partitioned by(order_submit_date date)
row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
stored as parquetfile
location '/test/spark/convert/parquet/bill_parq/';

二、CSV转Parquet

代码:org.apache.spark.ConvertToParquet.scala

package org.apache.sparkimport com.ecfront.fs.operation.HDFSOperation
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._/**
* CSV 转换为 parquet
* 参数:输入路径, 输出路径, 分区数
*/
object ConvertToParquet{
def main(args: Array[String]) {
if(args.length != 3){
println("jar args: inputFiles outPath numpartitions")
System.exit(0)
}
val inputPath = args(0)
val outPath = args(1)
val numPartitions = args(2).toIntprintln("==========================================")
println("=========input: "+ inputPath )
println("=========output: "+ outPath )
println("==numPartitions: "+ numPartitions )
println("==========================================")//判断输出目录是否存在,存在则删除
val fo = HDFSOperation(new Configuration())
val existDir = fo.existDir(outPath)
if(existDir) {
println("HDFS exists outpath: " + outPath)
println("start to delete ...")
val isDelete = fo.deleteDir(outPath)
if(isDelete){
println(outPath +" delete done. ")
}
}val conf = new SparkConf()
val sc = new SparkContext(conf) //参数SparkConf创建SparkContext,
val sqlContext = new SQLContext(sc) //参数SparkContext创建SQLContextval schema = StructType(Array(
StructField("bill_num",DataTypes.StringType,false),
StructField("logis_id",DataTypes.StringType,false),
StructField("store_id",DataTypes.StringType,false),
StructField("store_code",DataTypes.StringType,false),
StructField("creater_id",DataTypes.StringType,false),
StructField("order_status",DataTypes.IntegerType,false),
StructField("pay_status",DataTypes.IntegerType,false),
StructField("order_require_varieties",DataTypes.IntegerType,false),
StructField("order_require_amount",DataTypes.createDecimalType(19,4),false),
StructField("order_rec_amount",DataTypes.createDecimalType(19,4),false),
StructField("order_rec_gpf",DataTypes.createDecimalType(19,4),false),
StructField("deli_fee",DataTypes.FloatType,false),
StructField("order_type",DataTypes.IntegerType,false),
StructField("last_modify_time",DataTypes.TimestampType,false),
StructField("order_submit_time",DataTypes.TimestampType,false),
StructField("order_submit_date",DataTypes.DateType,false)))convert(sqlContext, inputPath, schema, outPath, numPartitions)
}//CSV转换为parquet
def convert(sqlContext: SQLContext, inputpath: String, schema: StructType, outpath: String, numPartitions: Int) {
// 将text导入到DataFrame
val df = sqlContext.read.format("com.databricks.spark.csv").
schema(schema).option("delimiter", ",").load(inputpath)
// 转换为parquet
// df.write.parquet(outpath) // 转换时以block数为分区数
df.coalesce(numPartitions).write.parquet(outpath) //自定义分区数
}}
打包后jar上传至本地目录:
/soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar事先在HDFS上生成CSV文件,HDFS目录:
/test/spark/convert/data/order/2016-05-01/执行命令:
spark-submit --queue spark --master yarn --num-executors 10  --executor-cores 2 --executor-memory 3G --class org.apache.spark.ConvertToParquet --name ConvertToParquet file:/soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar  /test/spark/convert/data/order/2016-05-01/ /test/spark/convert/parquet/bill_parq/order_submit_date=2016-05-01

pom.xml相关内容:

1.依赖包:

<dependencies>
<!-- 操作HDFS -->
<dependency><groupId>com.ecfront</groupId><artifactId>ez-fs</artifactId><version>0.9</version></dependency><!--spark -->
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>1.6.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>1.6.1</version></dependency><!--spark csv-->
<dependency><groupId>com.databricks</groupId><artifactId>spark-csv_2.11</artifactId><version>1.4.0</version></dependency><!--hadoop -->
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.6.0</version></dependency>
</dependencies>

2.plugins(含打入依赖包)

<build><pluginManagement><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.1</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>2.0.2</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>1.4</version><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></plugin></plugins></pluginManagement><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>scala-test-compile</id><phase>process-test-resources</phase><goals><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><executions><execution><phase>compile</phase><goals><goal>compile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>1.4</version><configuration><createDependencyReducedPom>true</createDependencyReducedPom></configuration><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.apache.spark.ConvertToParquet</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins>
</build>

三、表添加分区

spark-sql下执行

alter table pgls.convert_parq add partition(order_submit_date='2016-05-01');

可通过sql查询到相应数据:

select * from pgls.convert_parq where order_submit_date='2016-05-01' limit 5;

转载于:https://my.oschina.net/pengli/blog/708657

Spark-SQL CSV转换为Parquet文件,设置默认为block分区数或自定义分区数相关推荐

  1. Cinchoo ETL——快速入门:将CSV转换为JSON文件

    目录 1.简介 2. 要求 3. 如何使用 3.1 样本数据 3.2 安装库 3.3 快速转换 3.4 选择性列转换 3.3 使用POCO对象 下载 Cinchoo ETL 源码 下载 Cinchoo ...

  2. 【Spark实训】-- Spark SQL结构化数据文件处理 ②

    目录 题目:统计分析航空公司客户数据的空值以及异常值. 1.训练要点 2.需求说明 3.实现思路及步骤 4.具体实现过程代码与截图: 题目:统计分析航空公司客户数据的空值以及异常值. 1.训练要点 ( ...

  3. Spark SQL与外部数据源的操作(Spark SQL ——> CSV/JSON/Parquet/hive/mysql)

    目录 一.Spark SQL支持的外部数据源 二.Spark SQL -> CSV 2.1 读CSV文件 a.有列名 b.无列名 2.2 写CSV文件 三.Spark SQL -> JSO ...

  4. spark sql保存hive表时的压缩设置

    根据查看spark sql源码(3.1.3)的源码,找到hive表输出文件压缩格式的设定方式: 结论: 1. 如果hive输出表的属性里定义了压缩格式,则直接使用表定义的格式,具体属性为: 文件输出格 ...

  5. 选择 Parquet for Spark SQL 的 5 大原因

    列式存储 (columnar storage) 在处理大数据的时候可以有效地节省时间和空间.例如,与使用文本相比,Parquet 让 Spark SQL 的性能平均提高了 10 倍,这要感谢初级的读取 ...

  6. spark SQL读取ORC文件从Driver启动到开始执行Task(或stage)间隔时间太长(计算Partition时间太长)且产出orc单个文件中stripe个数太多问题解决方案...

    1.背景: 控制上游文件个数每天7000个,每个文件大小小于256M,50亿条+,orc格式.查看每个文件的stripe个数,500个左右,查询命令:hdfs fsck viewfs://hadoop ...

  7. Spark SQL玩起来

    标签(空格分隔): Spark [toc] 前言 Spark SQL的介绍只包含官方文档的Getting Started.DataSource.Performance Tuning和Distribut ...

  8. spark-sql建表语句限制_第三篇|Spark SQL编程指南

    在<第二篇|Spark Core编程指南>一文中,对Spark的核心模块进行了讲解.本文将讨论Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的基础之上 ...

  9. hive编程指南_第三篇|Spark SQL编程指南

    在<第二篇|Spark Core编程指南>一文中,对Spark的核心模块进行了讲解.本文将讨论Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的基础之上 ...

最新文章

  1. 突破极限 解决大硬盘上安装Sco Unix新思路
  2. 腾讯最新开源IoT操作系统登上GitHub热榜,最小体积仅1.8 KB,标星1200+
  3. 过滤器(Filter)
  4. 【Vue】 Error with Vue lazy loading components: “Failed to resolve async component“
  5. MySQL数据库如何杀死会话_如何彻底杀掉不良用户会话
  6. Python元组tuple(不可变)
  7. docker-compose部署kafka
  8. AutoMapper2
  9. matlab strfind用法,findstr和strfind区别
  10. 使用Jetty设置JNDI(嵌入式)
  11. python开发总结
  12. 在C#中ParameterizedThreadStart和ThreadStart区别
  13. Data too long for column ‘blobtext‘ at row 1 问题解决
  14. 在PyCharm中自动添加文件头、时间日期等信息
  15. SonarQube 7.7默认数据库连接方法
  16. 三行代码让你轻松下载全网任意视频-Python小知识
  17. 18-移动端等比例缩放rem
  18. eclipse pull异常 Pulling 1 respository (The pull operation was canceled)
  19. 软件过程与建模学习之:Quality Management
  20. COM组件开发(四)——VC++调用COM组件的方法

热门文章

  1. Oracle数据库误删数据恢复方法
  2. Fiddler抓包工具使用(一)
  3. 2020-03-29
  4. 每日技巧分享:网页转语音工具有哪些?
  5. 我的二进制漏洞挖掘学习路线
  6. NodeList和Array数组的区别
  7. 西蒙菲莎大学计算机专业学费,加拿大西蒙菲莎大学一年学费多少?
  8. 用Python一秒自动美化表格|python的1024种玩法(3)
  9. webgl与opengl的区别和联系
  10. azkaban安装使用