spark自定义分区案例
在hadoop的mapreduce中默认patitioner是HashPartitioner,我们可以自定义Partitioner可以有效防止数据倾斜, 在Spark里面也是一样,在Spark里也是默认的HashPartitioner, 如果自己想自己定义Partitioner继承org.apache.spark里面的Partitioner并且重写它里面的两个方法就行了。
模板如下:
//只需要继承Partitioner,重写两个方法
class MyPartitioner(val num: Int) extends Partitioner {
//这里定义partitioner个数
override def numPartitions: Int = ???
//这里定义分区规则
override def getPartition(key: Any): Int = ???
}
案例1:单词统计
object xy {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("urlLocal").setMaster("local[2]")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(List("lijie hello lisi", "zhangsan wangwu mazi", "hehe haha nihaoa heihei lure hehe hello word"))
val rdd2 = rdd1.flatMap(_.split(" ")).map(x => { (x, 1) }).reduceByKey(_ + _)
//这里指定自定义分区,然后输出
val rdd3 = rdd2.sortBy(_._2).partitionBy(new MyPartitioner(4)).mapPartitions(x => x)
.saveAsTextFile("C:\\Users\\Administrator\\Desktop\\out01")
println(rdd2.collect().toBuffer)
sc.stop()
}
}
class MyPartitioner(val num: Int) extends Partitioner {
override def numPartitions: Int = num override
def getPartition(key: Any): Int = { val len = key.toString.length
//根据单词长度对分区个数取模
len % num
}
}
案例来源:https://blog.csdn.net/qq_20641565/article/details/76130724
案例2:统计网址
package day02
import java.net.URL
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
import scala.collection.mutable
object UserD_Partitioner {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("UserD_Partitioner").setMaster("local[2]")
val sc = new SparkContext(conf)
//rdd1将数据切分,元组中放的是(URL, 1)
val rdd1 = sc.textFile("c://itcast.log").map(line => {
val f = line.split("\t")
(f(1), 1)
})
val rdd2 = rdd1.reduceByKey(_ + _)
val rdd3 = rdd2.map(t => {
val url = t._1
val host = new URL(url).getHost
(host, (url, t._2))
})
val ints = rdd3.map(_._1).distinct().collect()
val hostParitioner = new HostParitioner(ints)
//val rdd4 = rdd3.partitionBy(new HashPartitioner(ints.length))
val rdd4 = rdd3.partitionBy(hostParitioner).mapPartitions(it => {
it.toList.sortBy(_._2._2).reverse.take(2).iterator
})
rdd4.saveAsTextFile("c://out4")
//println(rdd4.collect().toBuffer)
sc.stop()
}
}
/** 自定义分区:
* 决定了数据到哪个分区里面
* @param ins
*/
class HostParitioner(ins: Array[String]) extends Partitioner {
val parMap = new mutable.HashMap[String, Int]()
var count = 0
for(i <- ins){
parMap += (i -> count)
count += 1
}
//获取分区数量
override def numPartitions: Int = ins.length
//数据分区规则
override def getPartition(key: Any): Int = {
parMap.getOrElse(key.toString, 0)
}
}
案例来源:https://blog.csdn.net/freefish_yzx/article/details/77542526
Partitioner抽象类:
package org.apache.spark
/**
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
* Maps each key to a partition ID, from 0 to `numPartitions - 1`.
*/
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
自定义分区,要继承Partitioner抽象类,重写里面的numPartitions和getPartitioner方法。
def numPartitions : Int 获取分区数量
def getPartitioner(key : Any) : Int 获取定义分区规则
传入一个key,返回一个Int类型的value。这个函数需要对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1。
注意:在这里,key就是对(根据)什么进行操作(分区),什么就是key。具体取决于什么方法使用该分区类,该方法获取到的Key是什么。
案例3:根据学科分区,传入的是一个去重后的学科数组。
/**
* 自定义分区器
* @param subjects
*/
class SubjectPartitioner2(val subjects:Array[String]) extends Partitioner{
//主构造器里面的代码,new的时候就立即执行。
//分区规则 HashMap(学科,编号) 编号为:0 ---> 学科数量-1
val rules = new mutable.HashMap[String,Int]()
var i= 0
//自定义学科的编号
for(sub <- subjects){
rules(sub) = i //等价于:rules += (sub -> i)
i += 1
}
/**
* 获取分区的数量(在这里即为学科数量)
* @return
*/
override def numPartitions: Int = subjects.length
/**
* 数据分区的规则(传入一个key,返回一个Int类型的value)
* def getPartition(key: Any): Int:这个函数需要对输入的key做计算,
* 然后返回该key的分区ID,范围一定是0到numPartitions-1。
* @param key
* @return
*/
override def getPartition(key: Any): Int = {
//强转asInstanceOf
val tuple: (String, String) = key.asInstanceOf[Tuple2[String,String]]
val sub = tuple._1 //取出元组里面的学科
rules(sub)
//rules(key.toString)
}
3个应用分区类的算子:
reduceByKey()
reduceByKey()的三种参数形式:
reduceByKey(func) 函数
reduceByKey(func,numPartitions) 函数,分区数量
reduceByKey(partitioner,func) 分区器,函数
partitionBy()
如:partitionBy(new partitioner)
repartitionAndSortWithinPartitions()
如:repartitionAndSortWithinPartitions(new partitioner)
案例3详细说明:
数据样式:
http://UI.test.cn/laowang
http://php.test.cn/laoli
http://U-3D.test.cn/laowang
要求:统计各个学科内点击次数topN的老师
代码:
package lwj.sparkDay2
import java.net.URL
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import scala.collection.mutable
object FavTeacherInSubject4 {
def main(args: Array[String]): Unit = {
//设置日志打印级别(可选)
Logger.getLogger("org").setLevel(Level.ERROR)
//1、设置配置信息
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
//2、获取SparkContext上下文
val sc: SparkContext = new SparkContext(conf)
//3、读取文件
val lines: RDD[String] = sc.textFile("C://Users//xxx//Desktop//1.log")
//4、数据清洗方法:(数据格式:http://bigdata.test.cn/laozhang)
val subjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => {
val index1: Int = line.lastIndexOf("/")
val teacherName: String = line.substring(index1 + 1)
val host: String = new URL(line).getHost //URL解析,获取网址bigdata.test.cn/laozhang
val index2: Int = host.indexOf(".") //int indexOf(String str) :返回第一次出现的指定子字符串在此字符串中的索引。
val subject: String = host.substring(0, index2)
((subject, teacherName), 1)
})
//先触发任务,计算有多少个学科
val subjectRDD: RDD[String] = subjectTeacherAndOne.map(_._1._1).distinct()
//触发计算,获得有多少个具体的学科
val subjects: Array[String] = subjectRDD.collect()
//先分区再聚合
val reduced: RDD[((String, String), Int)] = subjectTeacherAndOne.reduceByKey(new SubjectPartitioner1(subjects),_+_)
//通过自定义分区器将相同学科的数据都放在一个分区当中
//val partitionesRDD: RDD[((String, String), Int)] = reduced.partitionBy(new SubjectPartitioner(subjects))
//再排序(mapPartitions():一个分区一个分区的拿,传入一个迭代器,返回一个迭代器。)
val sorted: RDD[((String, String), Int)] = reduced.mapPartitions(_.toList.sortBy(x => -x._2).take(2).iterator)
//收集结果,打印
val rules: Array[((String, String), Int)] = sorted.collect()
println(rules.toBuffer)
//释放资源
sc.stop()
}
}
/**
* 自定义分区器
* @param subjects
*/
class SubjectPartitioner1(val subjects:Array[String]) extends Partitioner{
//分区规则 HashMap(学科,编号)
val rules = new mutable.HashMap[String,Int]()
var i= 0
//自定义学科的编号
for(sub <- subjects){
rules(sub) = i
i += 1
}
/**
* 获取分区的数量(在这里即为学科数量)
* @return
*/
override def numPartitions: Int = subjects.length
/**
* 数据分区的规则(传入一个key,返回一个Int类型的value)
* @param key
* @return
*/
override def getPartition(key: Any): Int = {
//强转asInstanceOf
val tuple: (String, String) = key.asInstanceOf[Tuple2[String,String]]
val sub = tuple._1 //取出元组里面的学科
rules(sub)
}
}
spark自定义分区案例相关推荐
- Spark自定义分区(Partitioner)
我们都知道Spark内部提供了HashPartitioner和RangePartitioner两种分区策略(这两种分区的代码解析可以参见:<Spark分区器HashPartitioner和Ran ...
- Spark自定义分区器
spark目前支持两个分区器,分别是HashPartitioner和RangePartitioner. 均继承自Partitioner,有共同方法 - def numPartitions --分区器的 ...
- 大数据之-Hadoop3.x_MapReduce_自定义分区案例---大数据之hadoop3.x工作笔记0112
1.可以看到上面就是分区的案例,需求. 2.可以看到我们要实现上面的需求,我们把136开头的放到分区0,文件1,137开头的放到分区1,文件1, 138开头的放到分区2,文件2,139开头的放到分区3 ...
- 自定义实现spark的分区函数
有时自己的业务需要自己实现spark的分区函数 以下代码是实现一个自定义spark分区的demo 实现的功能是根据key值的最后一位数字,写到不同的文件 例如: 10写入到part-00000 11写 ...
- 07_clickhouse、自定义分区及底层存储合并机制、自定义分区键、分区目录的命名规则、分区目录的合并过程、分区目录的合并过程、分区表达式指定、分区案例
4.自定义分区及底层存储合并机制 4.1.自定义分区键 4.2.分区目录的命名规则 4.3.分区目录的合并过程 4.4.分区目录的合并过程 4.5.分区表达式指定 4.6.分区案例 4.自定义分区及底 ...
- IDEA本地运行Spark项目[演示自定义分区器]并查看HDFS结果文件
文章目录 一.提出问题 二.解决问题 (一)添加IP到主机名的映射 (二)在本地准备Spark库文件 (三)在IDEA里创建Scala项目 (四)添加Spark库文件到项目 (五)创建自定义分区器 ( ...
- Spark数据分区(partitionBy分区、partitioner获取分区方式、自定义分区)
数据分区 partitionBy分区 在分布式程序中,通信的代价是很大的,因此控制数据分布以获得最少的网络传输可以极大地提升整体性能.和单节点的程序需要为记录集合选择合适的数据结构一样,Spark 程 ...
- 21,spark sql 测试 : 1.4G 文件实战,测试耗时多少,先分区,再在分区内计算,用列内容分区( 这是一个很魔幻的问题 ),自定义分区
一 ,常规问题 : 1 ,表关联,数据过滤 : sql select stock.area,goods.smallLei,goods.typeColorId, weekofyear(to_date(s ...
- 【大数据开发】SparkCore——自定义排序、实现序列化、自定义分区器
文章目录 一.自定义排序四种方式.实现序列化 二.案例:自定义分区器 一.自定义排序四种方式.实现序列化 前面两种是样例类实现.普通类实现 第三种方式可以不实现序列化接口 用的最多的还是第四种方式,第 ...
- 大数据技术之_19_Spark学习_07_Spark 性能调优 + 数据倾斜调优 + 运行资源调优 + 程序开发调优 + Shuffle 调优 + GC 调优 + Spark 企业应用案例
大数据技术之_19_Spark学习_07 第1章 Spark 性能优化 1.1 调优基本原则 1.1.1 基本概念和原则 1.1.2 性能监控方式 1.1.3 调优要点 1.2 数据倾斜优化 1.2. ...
最新文章
- saltstack之(十二)配置管理mount
- 2020最新点云深度学习综述
- Tomcat如何添加管理员
- iOS微博web网页出现的天坑 (斗鱼App 当前版本5.600有这个bug)
- phpcms URL修改
- Swift和Objective-C混编注意事项
- 结构体自动化转为char数组的实现
- [html] 请使用canvas画一个椭圆
- 征服 Ajax 应用程序的安全威胁
- Maven学习总结(28)——Maven+Nexus+Myeclipse集成
- Linux下SVN创建新的项目
- 在xcode6中使用矢量图(iPhone6置配UI)
- 8-BIT OPTIMIZERS VIA BLOCK-WISE QUANTIZATION--通过块级量化的8位优化器
- Mega网盘来下载外国友人分享的资源
- 自己动手写CPU(8)——简单算术操作指令的实现
- Html5前景分析发展,HTML5未来发展的5大趋势
- 疫情下技术人的宅家指南
- 人工智能顶级会议与国际期刊总结
- 关于const A* f(const A* pSrc,A* const pDst,int v=2,...) const throw();
- 如何区分正反馈,负反馈放大电路?【模电02课】
热门文章
- 7-11 三对三篮球赛 (100 分)
- k8s之基本环境准备
- 软件测试职业发展必经之路! 为什么别人3年可以成为高级测试工程师,为何你还在点点点!
- 荣耀关闭浏览器自动复制建议打开的功能--关闭复制直达功能
- Java SE 7新增特效
- 飞信现状原因分析及脱困策略
- HDU1180 诡异的楼梯(bfs)[C,C++]
- elasticsearch理论、集群、常用命令、插件使用
- 寄生参数相关文件(itf, ict, tluplus, capTable, nxtgrd, qrcTechFile)
- 大家如何批量查询物流,分享最简单的方法