NightPxy 个人技术博客

Spark-分区器

Posted on By NightPxy

分区数

分区数是RDD的一个非常重要的数据特征,在很大程度会影响Spark任务的执行

**分区数的来源 **
如果HDFS,就是是由HDFS块决定的
如果是本地集合,就是Spark默认并行数决定 spark.default.parallelism
如果是本地文件,则默认是Min(默认并行数,2)

分区数的影响
分区数等于总任务数.简单来说就是,Spark是根据RDD的分区数来决定执行的总任务数
一个分区就会分配一个任务

分区数与核数
这两者其实并没有直接关系,但实际又紧密相连的

  • 分区数等于总执行任务数
  • 核数等于同一时间最多执行的总任务数
    严格说来,是一个任务最少需要一个核才能运行(但任务也可以多核),所以如果指定任务单核,则核数等于可以并行执行的总任务数

分区器

分区器也是RDD的重要组成部分,因为RDD本身是由分区构成,控制数据分布的核心就是分区器
控制好的分区器可以减少shuffle网络传输,或者一些特殊的数据分组等等

内置的分区器

HashPartitioner

HashPartitioner 的分区策略是,对于给定的Key,计算其HashCode,并除以分区的个数取余.如果其HashCode为负数,则用余数+分区个数

注意:
数组类型的HashCode,是基于其数组对象本身而不是内容,因此用HashPartitioner分区数组类型RDD,可能会生成错误的结果


//Key计算策略
def nonNegativeMod(x: Int, mod: Int): Int = {
    val rawMod = x % mod
    rawMod + (if (rawMod < 0) mod else 0)
}
  
class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

RangePartitioner

RangePartitioner 的分区策略是一种随机抽样策略.(水塘算法抽样)

水塘算法

在取第n个数据的时候,我们生成一个0到1的随机数p,如果p小于1/n,保留第n个数。大于1/n,继续保留前面的数。直到数据流结束,返回此数

大致算法如下:

  • 设定取出的行号为choice
  • 第一次直接以第一行作为取出行 choice
  • 而后第二次以二分之一概率决定是否用第二行替换 choice
  • 第三次以三分之一的概率决定是否以第三行替换 choice
  • 以此类推

这种抽验算法的好处是在不知道元素总个数的情况下进行抽样

自定义分区器

自定义分区器需要继承实现Partitioner抽象类

自定义分区器可以实现一些特殊数据分布相关的逻辑
比如下面这种 对指定分组内的多排序取前N

//对指定分组内的多排序取前N
//排序策略
case class OrderKey(time: Long, traffic: Long) extends Comparable[OrderKey] {
    //time升序 traffic降序
    override def compareTo(o: OrderKey): Int = {
      if (this.time > o.time) return 1
      else if (this.time < o.time) return -1
      else {
        if (this.traffic > o.traffic) return -1
        else if (this.traffic == o.traffic) return 0
        else return 1
      }
    }
  }
//自定义分区器  
//只以Key中time作为分区 
class OrderKeyPartitioner(partitions: Int) extends Partitioner {
    override def numPartitions: Int = partitions
    override def getPartition(key: Any): Int = {
      val k = key.asInstanceOf[OrderKey]
      //对数字类型的hash分区一定要谨慎
      //因为数字类型的HashCode等于它本身,一定要注意数字本身分别,不然很容易出现倾斜
      (Math.abs(k.time.toString.hashCode % numPartitions)).toInt
    }
  }


//Time分组下的traffic排序取前5
rdd
    .map(x => (OrderKey(x.time.getTime, x.traffic), (shortFormat(x.time), x.domain, x.traffic)))
    .repartitionAndSortWithinPartitions(new OrderKeyPartitioner(10))
    .mapPartitions(x=> x.toList.take(5).iterator)
    .collect()
    .map(x => println(x._2))