NightPxy 个人技术博客

Spark-内存管理

Posted on By NightPxy

概述

Spark是一个高度依赖内存的计算引擎.这也是它比传统MapReduce计算速度更快的原因之一
所以Spark的内存分配和管理,是Spark一个比较重要的模块

Spark 内存分配

Spark 的 executor ,本质是一个JVM进程,所以其内存分配必然是基于JVM的(堆内内存,On-heap)
但同时,Spark也同时引入了堆外内存(Off-heap),使之可以在JVM之外在节点内存中直接开辟

堆内内存

堆内内存的大小,就是启动时–executor-memory或者配置spark.executor.memory参数配置
所有该executor内执行的Task共享堆内内存
堆内内存可能被用于executor数据存储的被称为存储内存,比如缓存,或者是广播变量存储等,也可能被用于计算,比如shuffle中间数据存储等等,被称为计算内存.(后面详解)

所谓的堆内内存的本质就是JVM内存,所以这部分的内存是由JVM进行管理
这种管理体现在两部分 申请和释放

  • 内存申请
    堆内内存的申请,是向JVM进行申请,由JVM进行分配,并且会存储引用根等等
  • 内存释放
    堆内内存的释放也是由JVM进行自动释放.(就是GC那一套)

堆内内存也会有GC的一切特征,比如

  • 对象的释放并不是精准释放,
  • 对象的释放是有延迟的

堆外内存

堆内内存的劣势在于GC,即每一个对象的释放都不是精准释放而是必须通过GC扫描
所以Spark也利用Java-UnSafe Api引入了堆外内存.直接从JVM之外的操作系统中开辟内存

在默认情况下堆外内存并不启用,可通过配置spark.memory.offHeap.enabled参数启用,并由spark.memory.offHeap.size参数设定堆外空间的大小。除了没有other空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存

堆外与堆内的选择

堆外内存的好处是

  • 可以精准释放(谁申请谁释放),而无需通过频繁的GC扫描
  • 堆外内存的序列化的数据占用的空间可以被精确计算

堆外内存的坏处是

  • 堆外内存只能存储二进制数据
    这意味着要使用堆外内存,必须要经过数据序列化为二进制和二进制反序列化数据

所以堆外内存并不一定会比堆内内存效率高.
具体如何选择在于权衡 GC的压力更大还是序列化的压力更大

  • 如果小内存的大量的小对象或者大量超大对象等等GC的压力会非常大的时候,就可以考虑堆外内存
  • 反之,如果堆内的GC运行的非常正常,就无需再耗费序列化去启用堆外内存

executor 内存管理

概述

org.apache.spark.memory.UnifiedMemoryManager

Spark executor 中的内存分为三大块 Spark内存,用户内存和保留内存

核心源码如下

object UnifiedMemoryManager {

  private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
  ....
  private def getMaxMemory(conf: SparkConf): Long = {
    val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
    val reservedMemory = conf.getLong("spark.testing.reservedMemory",
    .......
    val usableMemory = systemMemory - reservedMemory
    val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
    (usableMemory * memoryFraction).toLong
  }

预留内存

保留内存是留出一定数量的内存用于非存储和非执行目的. 这是一个固定保留值 300M
ps:spark.testing.memory其实可以调节,但这明显是个测试配置,所以不应该使用它

可用内存 = systemMemory(申请资源) - reservedMemory(固定值300)

val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
if (systemMemory < minSystemMemory) {
      throw new IllegalArgumentException(s"System memory $systemMemory must " +
        s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
        s"option or spark.driver.memory in Spark configuration.")
}

有一个隐含条件是 必须是1.5倍保留内存,即至少450M,才能启动

用户内存

用户内存是指用户产生的,或者说应用程序本身主导的数据存储.比如使用的一些临时数据或者数据结构等等.

用户内存占比 (申请内存-预留内存300M)(1-spark.memory.fraction) spark.memory.fraction 1.X版本是0.75 2.x版本是0.6
如果如果默认申请 2G 则在一个Stage内部Task算子执行最大内存 在2.x版本中不能超过 (2048-300)
0.4=699M
在1.x版本不能超过(2048-300)*0.25=437M

有一个面试题
100个Executor,每个4G.处理100G数据,则每个executor分配1G数据
1G远小于4G为什么使用mapPartition会报OOM
这就是因为4G真正用户能用的只有949M,而mapPartition加载一次分区数据需要1G,所以会OOM
注意: 这是1.x的情况 2.x 版本用户内存系数默认提升至40% 所以可用为1518M

Spark内存

这部分内存是由Spark控制和使用,由spark.memory.fraction控制,默认0.6(2.x版本)
由Spark使用,主要是指两种情况

  • RDD缓存以及广播变量存储等.这部分被称为存储内存
  • shuffle中间数据存储,连接,排序等用于Spark计算的存储,这部分被称为执行存储

存储内存占比,由 spark.memory.storageFraction 控制,默认0.5
剩余部分为执行内存,默认为 1- spark.memory.storageFraction

在Spark2.x,Spark内存管理已经是以统一内存管理方式进行分配
它的意思是

  • 如果是存储内存少,而执行内存需求大,执行内存可以借用存储内存进行执行
  • 存储内存可以设定一个最小阈值,保证在被借用时始终保持一下最小存储内存使用量
  • 如果是执行内存小,存储内存大,同上
  • 在借用的过程中,如果执行内存因为被借出而导致内存不够,执行内存会杀死部分存储内存而夺回资源,但是如果存储内存因为借出而导致的内存不够,它会等待执行内存用完释放而不会杀死对方