分区数
分区数是RDD的一个非常重要的数据特征,在很大程度会影响Spark任务的执行
**分区数的来源 **
如果HDFS,就是是由HDFS块决定的
如果是本地集合,就是Spark默认并行数决定 spark.default.parallelism
如果是本地文件,则默认是Min(默认并行数,2)
分区数的影响
分区数等于总任务数.简单来说就是,Spark是根据RDD的分区数来决定执行的总任务数
一个分区就会分配一个任务
分区数与核数
这两者其实并没有直接关系,但实际又紧密相连的
- 分区数等于总执行任务数
- 核数等于同一时间最多执行的总任务数
严格说来,是一个任务最少需要一个核才能运行(但任务也可以多核),所以如果指定任务单核,则核数等于可以并行执行的总任务数
分区器
分区器也是RDD的重要组成部分,因为RDD本身是由分区构成,控制数据分布的核心就是分区器
控制好的分区器可以减少shuffle网络传输,或者一些特殊的数据分组等等
内置的分区器
HashPartitioner
HashPartitioner 的分区策略是,对于给定的Key,计算其HashCode,并除以分区的个数取余.如果其HashCode为负数,则用余数+分区个数
注意:
数组类型的HashCode,是基于其数组对象本身而不是内容,因此用HashPartitioner分区数组类型RDD,可能会生成错误的结果
//Key计算策略
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
RangePartitioner
RangePartitioner 的分区策略是一种随机抽样策略.(水塘算法抽样)
水塘算法
在取第n个数据的时候,我们生成一个0到1的随机数p,如果p小于1/n,保留第n个数。大于1/n,继续保留前面的数。直到数据流结束,返回此数
大致算法如下:
- 设定取出的行号为choice
- 第一次直接以第一行作为取出行 choice
- 而后第二次以二分之一概率决定是否用第二行替换 choice
- 第三次以三分之一的概率决定是否以第三行替换 choice
- 以此类推
这种抽验算法的好处是在不知道元素总个数的情况下进行抽样
自定义分区器
自定义分区器需要继承实现Partitioner抽象类
自定义分区器可以实现一些特殊数据分布相关的逻辑
比如下面这种 对指定分组内的多排序取前N
//对指定分组内的多排序取前N
//排序策略
case class OrderKey(time: Long, traffic: Long) extends Comparable[OrderKey] {
//time升序 traffic降序
override def compareTo(o: OrderKey): Int = {
if (this.time > o.time) return 1
else if (this.time < o.time) return -1
else {
if (this.traffic > o.traffic) return -1
else if (this.traffic == o.traffic) return 0
else return 1
}
}
}
//自定义分区器
//只以Key中time作为分区
class OrderKeyPartitioner(partitions: Int) extends Partitioner {
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[OrderKey]
//对数字类型的hash分区一定要谨慎
//因为数字类型的HashCode等于它本身,一定要注意数字本身分别,不然很容易出现倾斜
(Math.abs(k.time.toString.hashCode % numPartitions)).toInt
}
}
//Time分组下的traffic排序取前5
rdd
.map(x => (OrderKey(x.time.getTime, x.traffic), (shortFormat(x.time), x.domain, x.traffic)))
.repartitionAndSortWithinPartitions(new OrderKeyPartitioner(10))
.mapPartitions(x=> x.toList.take(5).iterator)
.collect()
.map(x => println(x._2))