spark sql 并行查询

第一种使用指定分区列的方式

http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
partitionColumn must be a numeric, date, or timestamp column from the table in question.

partitionColumn, lowerBound, upperBound These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading.

numPartitions The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.

fetchsize The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows). This option applies only to reading.

batchsize The JDBC batch size, which determines how many rows to insert per round trip. This can help performance on JDBC drivers. This option applies only to writing. It defaults to 1000.

Dataset<Row> pgJdbcDF2 = spark.read().format("jdbc").option("url", "jdbc:postgresql://ip:15400/postgres?gssEncMode=disable").option("dbtable", "gaussdb.hotelevent").option("user", "gaussdb").option("password", "******").option("driver", "org.postgresql.Driver").option("numPartitions", 100).option("partitionColumn", "rzsj" ).option("lowerBound", 1000).option("upperBound", 2000).option("fetchSize", 5000).load();

dbtable:表名,可以是真实存在的关系表,也可以是通过查询语句 AS 出来的表。其实只要是在 SQL 语句里,FROM 后面能跟的语句用在 dbtable 属性都合法,其原理就是拼接 SQL 语句,dbtable 会填在 FROM 后面。
numPartitions:读、写的最大分区数,也决定了开启数据库连接的数目。使用 numPartitions 有一点点限制, 如果指定了 numPartitions 大于1的值,但是没有指定分区规则,仍只有一个 task 去执行查询。
partitionColumn, lowerBound, upperBound:指定读数据时的分区规则。要使用这三个参数,必须定义 numPartitions,而且这三个参数不能单独出现,要用就必须全部指定。而且 lowerBound, upperBound 不是过滤条件,只是用于决定分区跨度。在分区的时候,会根据s=(upperBound - lowerBound)/numPartitions,每个分区s条数据,最后超过upperBound 的分区的数据,单独作为一个分区 ,然后并行去执行查询。

第二种方式是使用自定义查询的方式

构造多个查询语句,并行多个查询,最后reduce成一个RDD,这种方式和spark自己提供的指定分区列主要区别在于这种并行是多个RDD,每个RDD一个分区,指定分区列的方式是一个RDD,有多个分区。使用这种方式时需要注意最后只有一个RDD,分区也只有一个,所以计算上就没有并行,spark的并行是根据分区数来决定,所以添加了重分区功能,但是这会导致shuffle,会有一定性能消耗,最后的结果并行写逻辑也要注意limit的使用,假如在计算的最后添加了limit截取写的数据量会导致RDD多个分区转换为一个分区[1]从而写的任务没有并行,所以在limit后面进行了重分区,这也会导致shuffle,消耗一定的性能。对于应用中调试节点计算,调研了下spark的缓存使用,spark的缓存不能跨应用使用,除非缓存在外部存储。调试节点,多个节点同时调试执行时可以在一个应用中执行,每个节点分别保存一部分调试的数据,一个应用多个job,job之间是串行执行,因为要保存同时调试的节点数据,所以一个节点就会生成最少一个job。

参考文献

  1. https://github.com/apache/spark/pull/7334
 Optional<Dataset<Row>> datasetOptional = partitionConfig.getSplitCondition().stream().map(v -> {DataFrameReader dataFrameReader = jdbcConfigData.getTableIdMapDataFrameReader().get(modelNode.getSrcTableId());String dbtable = "";if (DatabaseTypeEnum.ORACLE.getName().equalsIgnoreCase(partitionConfig.getDbType())) {String[] data = v.split("and");String first = data[0];String second;if (data.length == 1) {second = data[0];} else {second = data[1];}dbtable = String.format(partitionConfig.getDbTableSql(), second, first);} else {dbtable = String.format(partitionConfig.getDbTableSql(), v);}return dataFrameReader.option("dbtable", dbtable).load();}).reduce(Dataset::union);

spark sql并行读取实践相关推荐

  1. 实验5 Spark SQL编程初级实践

    今天做实验[Spark SQL 编程初级实践],虽然网上有答案,但在自己的环境下并不能够顺利进行 在第二题中,要求编程实现将 RDD 转换为 DataFrame.根据所谓标准答案,在进行sbt 打包时 ...

  2. Spark SQL来读取现有Hive中的数据

    Spark SQL主要目的是使得用户可以在Spark上使用SQL,其数据源既可以是RDD,也可以是外部的数据源(比如Parquet.Hive.Json等). Spark SQL的其中一个分支就是Spa ...

  3. Spark SQL 编程初级实践

    1.Spark SQL 基本操作 将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json. { "id":1 , "name&qu ...

  4. 使用Spark SQL读取Hive上的数据

    Spark SQL主要目的是使得用户可以在Spark上使用SQL,其数据源既可以是RDD,也可以是外部的数据源(比如Parquet.Hive.Json等).Spark SQL的其中一个分支就是Spar ...

  5. Spark SQL玩起来

    标签(空格分隔): Spark [toc] 前言 Spark SQL的介绍只包含官方文档的Getting Started.DataSource.Performance Tuning和Distribut ...

  6. Spark SQL: Relational Data Processing in Spark

    Spark SQL: Relational Data Processing in Spark Spark SQL : Spark中关系型处理模块 说明: 类似这样的说明并非是原作者的内容翻译,而是本篇 ...

  7. Spark SQL操作Hive表

    Spark SQL支持从Hive存储中读写数据.然而,Hive存在很多的依赖,而这些依赖又不包含在默认的各类Spark发型版本中.如果将Hive的依赖放入classpath中,Spark将自动加载它们 ...

  8. Spark SQL 核心编程

    文章目录 Spark SQL 核心编程 1.新的起点 2.SQL 语法 1) 读取 json 文件创建 DataFrame 2) 对 DataFrame 创建一个临时表 3) 通过SQL语句实现查询全 ...

  9. Spark SQL Guide——Data Sources

    文章目录 Parquet Files Partition Discovery(解析分区信息) Schema Merging(模式归并) Hive metastore Parquet table con ...

最新文章

  1. win7如何启动计算机,win7开机启动项怎么设置 电脑开机启动项在哪里设置
  2. Cortana 的谢幕,不一定是产品问题
  3. html5 接东西游戏,html5手机触屏接红包小游戏代码
  4. H5移动端页面设计心得分享
  5. bzoj 2330: [SCOI2011]糖果
  6. 写一个方法判断一个字符串是否对称_判断一个男生是否好色的方法
  7. LeetCode刷题——无重复字符的最长子串
  8. django添加字典格式的数据
  9. python水印倾斜_python中图像特定位置的水印算法
  10. 职员)2015-11-09 星期一 日志
  11. 用python海龟制图画花瓣_Python教程:使用Turtles画出带有花瓣的花
  12. SQL Server将DataTable传入存储过程(Table Value Parameter)
  13. Python爬虫_宅男福利?妹纸勿点__一蓑烟雨任平生
  14. SBC在企业IP通信系统中的应用
  15. [自娱自乐]玫瑰骑士结束了
  16. 抖音上热门规则优化及矩阵爆粉秘籍
  17. 怎么用python编写心形图案,python编程爱心形状turtle
  18. (一)LAMP (CGI,fastcgi, PHP,基于php的LAMP架构,php连接数据库)
  19. Sexagenary Cycle(天干地支法表示农历年份)
  20. 原来做炫酷图表这么容易

热门文章

  1. HTML5 布加迪威龙跑车自动化制造过程模拟
  2. 金蝶云星空对接打通精诚ERP应付单查询1接口与执行操作接口接口
  3. java打印ascii码_JAVA实现打印ascii码表的方法是什么
  4. Flutter——头像上传功能,实现照片选择及裁剪
  5. 1036 Boys vs Girls (25 分)
  6. 行业轮动从动量因子说起
  7. java zlib 位运算_Zlib 函数 - [ php中文手册 ] - 在线原生手册 - php中文网
  8. android的k歌
  9. 从零开始弄懂LightGBM_参数篇
  10. Oracle 中取周别 IW和WW 有何差别